In [29]:
import pandas as pd
import numpy as np
from typing import Dict, List, Optional
import re

In [None]:
class Cleansing:
    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()
        
        self.duplicate_groups = {
            'account_name': ['Account Name', 'account_name', 'AccountName'],
            'contact_email': ['Contact Email', 'contact_email'],
            'created_date': ['Created Date', 'created_date'],
            'lead_source': ['Lead Source', 'lead_source'],
            'opportunity_amount': ['Opportunity Amount', 'opportunity_amount'],
            'is_active': ['Is Active', 'is_active'],
            'sfdc_id': ['SFDC ID', 'sfdc_id'],
            'annual_revenue': ['Annual Revenue', 'annual_revenue']
        }
    
    
    def is_valid_email(self, email: str) -> bool:
        # Check if email is one of the valid emails
        if pd.isna(email) or email is None:
            return False
        return email in ['help@globex.com', 'contact@acme.com']
    
    def is_placeholder_email(self, email: str) -> bool:
        # Check if email is a placeholder 
        if pd.isna(email) or email is None or email == '':
            return True
        placeholders = ['noemail', 'invalid@', 'user@', 'missing.com', 'placeholder']
        return any(placeholder in str(email).lower() for placeholder in placeholders)
    
    def get_corresponding_account(self, email: str) -> Optional[str]:
        # Get the corresponding account name for a valid email
        if email == 'help@globex.com':
            return 'Globex'
        elif email == 'contact@acme.com':
            return 'Acme Corp'
        return None
    
    def generate_email_for_account(self, account_name: str) -> Optional[str]:
        # Generate corresponding email for Globex or Acme Corp
        if account_name == 'Globex':
            return 'help@globex.com'
        elif account_name == 'Acme Corp':
            return 'contact@acme.com'
        return None
    
    def consolidate_account_and_email(self) -> pd.DataFrame:

        result_df = self.df.copy()
        
        result_df['consolidated_account_name'] = None
        result_df['consolidated_contact_email'] = None
        
        account_columns = ['account_name', 'AccountName', 'Account Name']
        
        for idx, row in result_df.iterrows():

            contact_email_lower_val = row.get('contact_email')
            contact_email_val = row.get('Contact Email')
            
            contact_email_lower_valid = self.is_valid_email(contact_email_lower_val)
            contact_email_valid = self.is_valid_email(contact_email_val)
            
            account_values = {}
            for col in account_columns:
                if col in result_df.columns:
                    account_values[col] = row.get(col)
            
            final_account = None
            final_email = None
            
            for col in account_columns:
                if (col in account_values and 
                    pd.notna(account_values[col]) and 
                    account_values[col] != ''):
                    final_account = account_values[col]
                    break
            
            # If no account found, derive from valid email (all accounts empty case)
            if final_account is None:
                # Check contact_email first (priority)
                if contact_email_lower_valid:
                    final_account = self.get_corresponding_account(contact_email_lower_val)
                    final_email = contact_email_lower_val
                # If contact_email not valid, check Contact Email
                elif contact_email_valid:
                    final_account = self.get_corresponding_account(contact_email_val)
                    final_email = contact_email_val
            
            if final_account is not None and final_email is None:
                email_matched = False
                
                if contact_email_lower_valid:
                    expected_account = self.get_corresponding_account(contact_email_lower_val)
                    if final_account == expected_account:
                        final_email = contact_email_lower_val
                        email_matched = True
                
                if not email_matched and contact_email_valid:
                    expected_account = self.get_corresponding_account(contact_email_val)
                    if final_account == expected_account:
                        final_email = contact_email_val
                        email_matched = True
                
                # If no email match but account is Globex or Acme Corp, generate email
                if not email_matched:
                    generated_email = self.generate_email_for_account(final_account)
                    if generated_email:
                        final_email = generated_email
            
            result_df.at[idx, 'consolidated_account_name'] = final_account
            result_df.at[idx, 'consolidated_contact_email'] = final_email
        
        return result_df

    def consolidate_created_date(self) -> pd.DataFrame:
    
        result_df = self.df.copy()
        
        result_df['consolidated_created_date'] = None
        
        created_date_columns = ['created_date', 'Created Date']
        
        for idx, row in result_df.iterrows():
            final_created_date = None
            
            for col in created_date_columns:
                if col in result_df.columns:
                    raw_date = row.get(col)
                    if (pd.isna(raw_date) or raw_date == '' or 
                        str(raw_date).lower() in ['nat', 'not_a_date', 'none']):
                        continue
                    
                    formatted_date = self.format_date_to_standard(raw_date)
                    if formatted_date:
                        final_created_date = formatted_date
                        break
            
            result_df.at[idx, 'consolidated_created_date'] = final_created_date
        
        return result_df

    def format_date_to_standard(self, date_value) -> Optional[str]:
        if pd.isna(date_value) or date_value is None or date_value == '':
            return None
        
        try:
            parsed_date = pd.to_datetime(date_value, errors='coerce')
            
            if pd.isna(parsed_date):
                return None
            
            return parsed_date.strftime('%Y-%m-%d')
        
        except (ValueError, TypeError):
            return None
    
    def consolidate_lead_source(self) -> pd.DataFrame:
        result_df = self.df.copy()
        
        result_df['consolidated_lead_source'] = None
        
        lead_source_columns = ['lead_source', 'Lead Source']
        
        for idx, row in result_df.iterrows():
            final_lead_source = None
            
            for col in lead_source_columns:
                if col in result_df.columns:
                    lead_source_val = row.get(col)
                    if (pd.notna(lead_source_val) and 
                        lead_source_val != '' and 
                        str(lead_source_val).lower() not in ['nat', 'none', 'null']):
                        final_lead_source = lead_source_val
                        break
            
            result_df.at[idx, 'consolidated_lead_source'] = final_lead_source
        
        return result_df
    
    def consolidate_is_active(self) -> pd.DataFrame:
        result_df = self.df.copy()
        
        result_df['consolidated_is_active'] = None
        
        is_active_columns = ['is_active', 'Is Active']
        
        for idx, row in result_df.iterrows():
            final_is_active = None
            
            for col in is_active_columns:
                if col in result_df.columns:
                    is_active_val = row.get(col)
                    if (pd.notna(is_active_val) and 
                        is_active_val != '' and 
                        str(is_active_val).lower() not in ['nat', 'none', 'null']):
                        standardized_is_active = self.standardize_is_active(is_active_val)
                        if standardized_is_active is not None:
                            final_is_active = standardized_is_active
                            break
            
            result_df.at[idx, 'consolidated_is_active'] = final_is_active
        
        return result_df
    
    def standardize_is_active(self, is_active_value) -> Optional[bool]:
        if pd.isna(is_active_value) or is_active_value is None or is_active_value == '':
            return None
        
        value_str = str(is_active_value).strip().lower()
        
        if value_str in ['nat', 'none', 'null', '']:
            return None
        
        if value_str in ['true', 't', 'yes', 'y', '1', 'active', 'on']:
            return True
        elif value_str in ['false', 'f', 'no', 'n', '0', 'inactive', 'off']:
            return False
        
        return None
    
    def consolidate_sfdc_id(self) -> pd.DataFrame:
        result_df = self.df.copy()
        
        result_df['consolidated_sfdc_id'] = None
        
        sfdc_id_columns = ['SFDC ID', 'sfdc_id']
        
        for idx, row in result_df.iterrows():
            final_sfdc_id = None
            
            for col in sfdc_id_columns:
                if col in result_df.columns:
                    sfdc_id_val = row.get(col)
                    if (pd.notna(sfdc_id_val) and 
                        sfdc_id_val != '' and 
                        str(sfdc_id_val).lower() not in ['nat', 'none', 'null']):
                        standardized_sfdc_id = self.standardize_sfdc_id(sfdc_id_val)
                        if standardized_sfdc_id is not None:
                            final_sfdc_id = standardized_sfdc_id
                            break
            
            result_df.at[idx, 'consolidated_sfdc_id'] = final_sfdc_id
        
        return result_df

    def standardize_sfdc_id(self, sfdc_id_value) -> Optional[str]:
        if pd.isna(sfdc_id_value) or sfdc_id_value is None or sfdc_id_value == '':
            return None
        
        value_str = str(sfdc_id_value).strip()
        
        if value_str.lower() in ['nat', 'none', 'null', '']:
            return None
        
        # Check for placeholder values
        placeholder_patterns = ['abc123', 'xyz-00001', '12345', 'bad_id']
        if value_str.lower() in placeholder_patterns:
            return None
        
        return value_str
    
    def consolidate_monetary(self, field_name: str) -> pd.DataFrame:
        result_df = self.df.copy()
        
        consolidated_column = f'consolidated_{field_name}'
        result_df[consolidated_column] = None
        
        field_columns = self.duplicate_groups.get(field_name, [field_name])
        
        for idx, row in result_df.iterrows():
            final_value = None
            
            for col in field_columns:
                if col in result_df.columns:
                    raw_value = row.get(col)
                    if (pd.notna(raw_value) and 
                        raw_value != '' and 
                        str(raw_value).lower() not in ['nat', 'none', 'null']):
                        standardized_value = self.standardize_monetary(raw_value)
                        if standardized_value is not None:
                            final_value = standardized_value
                            break
            
            result_df.at[idx, consolidated_column] = final_value
        
        return result_df
    
    def standardize_monetary(self, monetary_value) -> Optional[float]:
        if pd.isna(monetary_value) or monetary_value is None or monetary_value == '':
            return None
        
        try:
            value_str = str(monetary_value).strip()
            
            if value_str.lower() in ['nat', 'none', 'null', '', 'not available', 'n/a']:
                return None
            
            # Check if it's already a valid number
            if isinstance(monetary_value, (int, float)) and not pd.isna(monetary_value):
                if monetary_value < 0:
                    return None
                return round(float(monetary_value), 2)
            
            # Word version
            if re.match(r'^[a-zA-Z\s]+$', value_str):
                converted_float = self.convert_text_to_number(value_str)
                if converted_float is None:
                    return None
                if converted_float < 0:
                    return None
                return round(converted_float, 2)
            
            # Remove currency symbols and common formatting for numeric values
            value_str = re.sub(r'[$£€¥₹]', '', value_str)
            value_str = value_str.replace(',', '').replace(' ', '') 
            
            if value_str == '':
                return None
            
            multiplier = 1
            value_str_lower = value_str.lower()
            if value_str_lower.endswith('k'):
                multiplier = 1000
                value_str = value_str[:-1]
            elif value_str_lower.endswith('m'):
                multiplier = 1000000
                value_str = value_str[:-1]
            elif value_str_lower.endswith('b'):
                multiplier = 1000000000
                value_str = value_str[:-1]
            
            try:
                final_float = float(value_str) * multiplier
            except ValueError:
                return None
            
            if final_float < 0:
                return None
            
            return round(final_float, 2)
        
        except (ValueError, TypeError):
            return None

    def convert_text_to_number(self, text_value: str) -> Optional[float]:
        if not text_value:
            return None
            
        text_value = text_value.lower().strip()
        
        number_words = {
            'zero': 0, 'one': 1, 'two': 2, 'three': 3, 'four': 4, 'five': 5,
            'six': 6, 'seven': 7, 'eight': 8, 'nine': 9, 'ten': 10,
            'eleven': 11, 'twelve': 12, 'thirteen': 13, 'fourteen': 14, 'fifteen': 15,
            'sixteen': 16, 'seventeen': 17, 'eighteen': 18, 'nineteen': 19, 'twenty': 20,
            'thirty': 30, 'forty': 40, 'fifty': 50, 'sixty': 60, 'seventy': 70,
            'eighty': 80, 'ninety': 90, 'hundred': 100, 'thousand': 1000, 'million': 1000000,
            'billion': 1000000000
        }
        
        try:
            words = text_value.split()
            total = 0
            current = 0
            found_valid_word = False
            
            for word in words:
                word = word.strip()
                if word in number_words:
                    found_valid_word = True
                    value = number_words[word]
                    if value == 100:
                        current = current * 100 if current > 0 else 100
                    elif value >= 1000:
                        total += current * value
                        current = 0
                    else:
                        current += value
            
            total += current
            
            if not found_valid_word or (total == 0 and 'zero' not in text_value):
                return None
            
            return float(total)
        
        except Exception:
            return None

    def consolidate_last_activity(self) -> pd.DataFrame:
        
        result_df = self.df.copy()
        
        result_df['consolidated_last_activity'] = None
        
        last_activity_columns = ['Last Activity']
        
        # Define placeholder values to filter out
        placeholder_values = [
            '42', 42, 'Called Client', 'called client', 'CALLED CLIENT', ''
        ]
        
        for idx, row in result_df.iterrows():
            final_last_activity = None
            
            for col in last_activity_columns:
                if col in result_df.columns:
                    raw_activity = row.get(col)
                    
                    # Skip if NaN, empty, or placeholder value
                    if (pd.isna(raw_activity) or 
                        raw_activity == '' or 
                        raw_activity in placeholder_values or
                        str(raw_activity).strip().lower() in [str(p).lower() for p in placeholder_values]):
                        continue
                    
                    # Try to format as date using existing method
                    formatted_date = self.format_date_to_standard(raw_activity)
                    if formatted_date:
                        final_last_activity = formatted_date
                        break
                    
            
            result_df.at[idx, 'consolidated_last_activity'] = final_last_activity
        
        return result_df
    
    def consolidate_custom_field(self) -> pd.DataFrame:
        result_df = self.df.copy()
        
        result_df['consolidated_custom_field'] = None
        
        custom_field_columns = ['Custom Field']
        
        placeholder_values = ['N/A', '{"type": null}', None, '', 'null', 'nat', 'none']
        
        for idx, row in result_df.iterrows():
            final_custom_field = None
            
            for col in custom_field_columns:
                if col in result_df.columns:
                    custom_field_val = row.get(col)
                    
                    if (pd.notna(custom_field_val) and 
                        custom_field_val != '' and 
                        custom_field_val not in placeholder_values and
                        str(custom_field_val).lower() not in [str(p).lower() for p in placeholder_values if p is not None]):
                        final_custom_field = custom_field_val
                        break
            
            result_df.at[idx, 'consolidated_custom_field'] = final_custom_field
        
        return result_df

    def consolidate_region(self) -> pd.DataFrame:
        result_df = self.df.copy()
        
        result_df['consolidated_region'] = None
        
        region_columns = ['Region']
        
        north_america_values = ['North America', 'NA', 'N.A.', 'United States', 'US']
        
        for idx, row in result_df.iterrows():
            final_region = None
            
            for col in region_columns:
                if col in result_df.columns:
                    region_val = row.get(col)
                    
                    if (pd.notna(region_val) and 
                        region_val != '' and 
                        str(region_val).strip() in north_america_values):
                        final_region = 'North America'
                        break
            
            result_df.at[idx, 'consolidated_region'] = final_region
        
        return result_df
    
    def consolidate_random_notes(self) -> pd.DataFrame:
        result_df = self.df.copy()
        
        result_df['consolidated_random_notes'] = None
        
        random_notes_columns = ['Random Notes']
        
        for idx, row in result_df.iterrows():
            final_notes_flag = None
            
            for col in random_notes_columns:
                if col in result_df.columns:
                    notes_val = row.get(col)
                    
                    # Check if value is "See notes" or "Valid"
                    if (pd.notna(notes_val) and 
                        notes_val != '' and 
                        str(notes_val).strip() in ['See notes', 'Valid']):
                        final_notes_flag = 'Notes_Flag'
                        break
            
            result_df.at[idx, 'consolidated_random_notes'] = final_notes_flag
        
        return result_df
    

    def consolidate_deal_score(self) -> pd.DataFrame:

        result_df = self.df.copy()
        result_df['consolidated_deal_score'] = None
        
        for idx, row in result_df.iterrows():
            deal_score_val = row.get('Deal Score')
            
            if pd.notna(deal_score_val):
                result_df.at[idx, 'consolidated_deal_score'] = deal_score_val / 100.0
            else:
                result_df.at[idx, 'consolidated_deal_score'] = None
        
        return result_df
    
    def consolidate_engagement_level(self) -> pd.DataFrame:
        result_df = self.df.copy()
        result_df['consolidated_engagement_level'] = None
        
        for idx, row in result_df.iterrows():
            engagement_val = row.get('Engagement Level')
            
            if pd.notna(engagement_val):
                result_df.at[idx, 'consolidated_engagement_level'] = round(engagement_val, 4)
            else:
                result_df.at[idx, 'consolidated_engagement_level'] = None
        
        return result_df
    
    def create_clean_dataset(self) -> pd.DataFrame:
        consolidated_df = self.consolidate_account_and_email()
        
        self.df = consolidated_df
        
        consolidated_df = self.consolidate_created_date()
        
        self.df = consolidated_df
        
        consolidated_df = self.consolidate_lead_source()

        self.df = consolidated_df
        
        consolidated_df = self.consolidate_monetary('opportunity_amount')

        self.df = consolidated_df
        
        consolidated_df = self.consolidate_is_active()

        self.df = consolidated_df
        
        consolidated_df = self.consolidate_sfdc_id()
        
        self.df = consolidated_df
        
        consolidated_df = self.consolidate_monetary('annual_revenue')

        self.df = consolidated_df
        
        consolidated_df = self.consolidate_last_activity()
        
        self.df = consolidated_df
        
        consolidated_df = self.consolidate_custom_field()
        
        self.df = consolidated_df
        
        consolidated_df = self.consolidate_region()

        self.df = consolidated_df
    
        consolidated_df = self.consolidate_random_notes()

        self.df = consolidated_df

        consolidated_df = self.consolidate_deal_score()

        self.df = consolidated_df

        consolidated_df = self.consolidate_engagement_level()
        
        clean_df = consolidated_df.copy()
        
        clean_df['account_name'] = consolidated_df['consolidated_account_name']
        clean_df['contact_email'] = consolidated_df['consolidated_contact_email']
        clean_df['created_date'] = consolidated_df['consolidated_created_date']
        clean_df['lead_source'] = consolidated_df['consolidated_lead_source']
        clean_df['opportunity_amount'] = consolidated_df['consolidated_opportunity_amount']
        clean_df['is_active'] = consolidated_df['consolidated_is_active']
        clean_df['sfdc_id'] = consolidated_df['consolidated_sfdc_id']
        clean_df['annual_revenue'] = consolidated_df['consolidated_annual_revenue']
        clean_df['last_activity'] = consolidated_df['consolidated_last_activity']
        clean_df['custom_field'] = consolidated_df['consolidated_custom_field']
        clean_df['region'] = consolidated_df['consolidated_region']
        clean_df['notes_flag'] = consolidated_df['consolidated_random_notes']
        clean_df['deal_score'] = consolidated_df['consolidated_deal_score']
        clean_df['engagement_level'] = consolidated_df['consolidated_engagement_level']
        
        columns_to_drop = [
            'consolidated_account_name', 'consolidated_contact_email', 'consolidated_created_date',
            'consolidated_lead_source', 'consolidated_opportunity_amount', 'consolidated_is_active', 'consolidated_engagement_level',
            'consolidated_sfdc_id', 'consolidated_last_activity', 'consolidated_annual_revenue', 'consolidated_deal_score',
            'consolidated_custom_field', 'consolidated_region', 'consolidated_random_notes',
            'Account Name', 'AccountName', 'Contact Email', 'Created Date', 'Lead Source', 
            'Opportunity Amount', 'Is Active', 'SFDC ID', 'Last Activity', 'Annual Revenue', 'Deal Score', 'Engagement Level',
            'Custom Field', 'Region', 'Random Notes', 'Unnamed: 0', 'Unnamed: 21'  
        ]
        
        columns_to_drop = [col for col in columns_to_drop if col in clean_df.columns]
        clean_df = clean_df.drop(columns=columns_to_drop)
        
        return clean_df
            
            
            

In [92]:
df = pd.read_csv('data/DirtySalesforceData.csv')

print("Original Dataset Info:")
print(f"Shape: {df.shape}")
print(f"Columns: {list(df.columns)}")

cleaner = Cleansing(df)
cleaner.create_clean_dataset()

clean_df = cleaner.create_clean_dataset()

clean_df.to_csv('CleanSalesforceData.csv', index=False)
print(f"\nClean dataset saved as 'CleanSalesforceData.csv'")
print(f"Clean dataset shape: {clean_df.shape}")

print(f"\nSample of consolidated data:")
print(clean_df.head(30))


Original Dataset Info:
Shape: (500, 30)
Columns: ['Account Name', 'account_name', 'AccountName', 'Contact Email', 'contact_email', 'Created Date', 'created_date', 'Lead Source', 'lead_source', 'Opportunity Amount', 'opportunity_amount', 'Is Active', 'is_active', 'SFDC ID', 'sfdc_id', 'Annual Revenue', 'annual_revenue', 'Last Activity', 'Custom Field', 'Region', 'Unnamed: 0', 'Unnamed: 21', 'Random Notes', 'Deal Score', 'Engagement Level', 'Num Calls', 'Time on Page (sec)', 'City', 'State', 'Country']

Clean dataset saved as 'CleanSalesforceData.csv'
Clean dataset shape: (500, 19)

Sample of consolidated data:
   account_name     contact_email created_date     lead_source  \
0     Acme Corp  contact@acme.com   2020-01-01      Trade Show   
1        Globex   help@globex.com   2022-12-01    Social Media   
2        Globex   help@globex.com         None  Email Campaign   
3     Acme Corp  contact@acme.com         None    Social Media   
4      Umbrella              None   2024-03-26       