In [1]:
import pandas as pd
import glob
from natsort import natsorted
from tabulate import tabulate
from typing import Dict

In [2]:
def describe_df(df: pd.DataFrame):
    print(f'Number of rows: {len(df)}')
    print(f'Number of columns: {len(df.columns)}')
    print(f'Column names: {str(df.columns.tolist())}')
    print(f'Column names: {str(df.dtypes.tolist())}')

    print('DataFrame information:')
    print(df.info())

    print('Summary statistics:')
    print(df.describe())

In [3]:
# Read the Leads Parquet file
leads_df = pd.read_parquet('leads.parquet')
leads_df = leads_df.rename({'lead_UUID': 'lead_uuid'}, axis='columns')
leads_df = leads_df.convert_dtypes()
describe_df(leads_df)

Number of rows: 4364
Number of columns: 3
Column names: ['lead_uuid', 'phone_hash', 'email_hash']
Column names: [string[python], string[python], string[python]]
DataFrame information:
<class 'pandas.core.frame.DataFrame'>
Index: 4364 entries, 0 to 4433
Data columns (total 3 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   lead_uuid   4364 non-null   string
 1   phone_hash  4364 non-null   string
 2   email_hash  4364 non-null   string
dtypes: string(3)
memory usage: 136.4 KB
None
Summary statistics:
                                       lead_uuid  \
count                                       4364   
unique                                      4364   
top     c437ece50c0163479b180de9c217179bfed94e7d   
freq                                           1   

                                      phone_hash  \
count                                       4364   
unique                                      4364   
top     6e5345441aaeb5816ee08728

In [4]:
# Read the SFTP reports data
paths = glob.glob('SFTP/*.csv')
reports = {}

for path in natsorted(paths):
    reports[path] = pd.read_csv(path)

print(f'Total CSV reports: {len(reports)}')

Total CSV reports: 22


In [5]:
# check the number and names of columns across reports
def check_columns(dict: Dict[str, pd.DataFrame]):
    rows = [(path, len(df.columns), str(natsorted(df.columns))) for path, df in dict.items()]
    print(tabulate(rows, headers=['Path', 'Columns Count', 'Columns']))

check_columns(reports)

Path                Columns Count  Columns
----------------  ---------------  --------------------------------------------------------------------------------------------------------------------------------------------------
SFTP\data_1.csv                12  ['Appt Date', 'CITY', 'Demo', 'Dispo', 'ENTRYDATE', 'Job Status', 'LEADNUMBER', 'STATE', 'Set', 'ZIP', 'email_hash', 'phone_hash']
SFTP\data_2.csv                12  ['Appt Date', 'CITY', 'Demo', 'Dispo', 'ENTRYDATE', 'Job Status', 'LEADNUMBER', 'STATE', 'Set', 'ZIP', 'email_hash', 'phone_hash']
SFTP\data_3.csv                12  ['Appt Date', 'CITY', 'Demo', 'Dispo', 'ENTRYDATE', 'Job Status', 'LEADNUMBER', 'STATE', 'Set', 'ZIP', 'email_hash', 'phone_hash']
SFTP\data_4.csv                12  ['Appt Date', 'CITY', 'Demo', 'Dispo', 'ENTRYDATE', 'Job Status', 'LEADNUMBER', 'STATE', 'Set', 'ZIP', 'email_hash', 'phone_hash']
SFTP\data_5.csv                12  ['Appt Date', 'CITY', 'Demo', 'Dispo', 'ENTRYDATE', 'Job Status', 'LEADNUMBE

In [6]:
# Check the top/tail records on empty or broken rows
def check_sample(df):
    rows = []
    for path, df in df.items():
        sample = pd.concat([df.head(2), df.tail(2)])
        for _, row in sample.iterrows():
            rows.append((path, row.values.tolist()))
    print(tabulate(rows, headers=['Path', 'Sample']))
    
check_sample(reports)

Path              Sample
----------------  ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SFTP\data_1.csv   ['02/05/2023', 289764, '3b7c33e84ea3ae8b249dc1e1578fc84cb4047ac4', '6e5345441aaeb5816ee08728681bbb5e18cbf365', 'Bremerton                   ', 'WA', 98311.0, nan, 0, 0, 'Data', nan]
SFTP\data_1.csv   ['12/17/2022', 217893, '137ac7f6012cc1165ccff9348abe55bb5fe86713', '8f2055fe6a78a8136b5e62e82d6b54b00d5c6ef9', 'Surprise', 'AZ', 85379.0, '01/02/2023  5:30PM', 1, 0, 'CXL-BT', nan]
SFTP\data_1.csv   ['03/06/2023', 298848, 'f5e7dcb53b1a1efd0e59a20eee5737a88f6fb636', '6ca904d6be88d4d97bcca6e2572ca0cc358f91d5', 'Yelm', 'WA', 98597.0, nan, 0, 0, 'DQ-Build a Home', nan]
SFTP\data_1.csv   ['01/30/2023', 227931, '2f0fa54d71d7d2fbcef3a0850d85a1d5f83bab67', '9ef11a6a337abdf924dbec0e8874f88c77c7f701', 'Las Vegas', 

In [7]:
# Fix the DataFrames
for path, df in reports.items():
    # remove rows with invalid ENTRYDATEs
    fixed_df = df[pd.to_datetime(df['ENTRYDATE'], format='%m/%d/%Y', errors='coerce').notna()]

    if 'location' in fixed_df:
        # remove broken columns
        fixed_df = fixed_df.drop(['CityName', 'STATE'], errors='coerce', axis='columns')

        # populate broken columns from 'location'
        fixed_df[['CITY', 'STATE']] = fixed_df['location'].str.split('|', expand=True)

        # remove 'location'
        fixed_df = fixed_df.drop(['location'], errors='coerce', axis='columns')

    fixed_df = fixed_df.astype({'LEADNUMBER': int})
    fixed_df['ENTRYDATE'] = pd.to_datetime(fixed_df['ENTRYDATE'], format='%m/%d/%Y').dt.date
    fixed_df['Appt Date'] = pd.to_datetime(fixed_df['Appt Date'], format='%m/%d/%Y  %I:%M%p') # 01/02/2023  5:30PM
    fixed_df['STATE'] = fixed_df['STATE'].str.upper() # upper 'nu' states
    fixed_df = fixed_df.applymap(lambda x: x.strip() if isinstance(x, str) else x) # trim whitespaces

    # Normalize booleans
    map = {'True': 1, 'False': 0 }
    fixed_df = fixed_df.applymap(lambda x: bool(map[x]) if (isinstance(x, str) and x in map) else x)
    fixed_df = fixed_df.astype({'Set': int, 'Demo': int})
    fixed_df = fixed_df.astype({'Set': bool, 'Demo': bool})

    fixed_df = fixed_df.rename({
        'CityName': 'city',
        'CITY': 'city',
        'ENTRYDATE': 'entry_date',
        'LEADNUMBER': 'lead_number',
        'STATE': 'state',
        'ZIP': 'zip',
        'Appt Date': 'appt_datetime',
        'Set': 'set',
        'Demo': 'demo',
        'Dispo': 'dispo',
        'Job Status': 'job_status'
    }, axis='columns') # correct naming

    reports[path] = fixed_df

# Check the columns again
assert(all([len(df.columns) == 12 for df in reports.values()]))
check_columns(reports)

# Check the sample again
check_sample(reports)

Path                Columns Count  Columns
----------------  ---------------  ----------------------------------------------------------------------------------------------------------------------------------------
SFTP\data_1.csv                12  ['appt_datetime', 'city', 'demo', 'dispo', 'email_hash', 'entry_date', 'job_status', 'lead_number', 'phone_hash', 'set', 'state', 'zip']
SFTP\data_2.csv                12  ['appt_datetime', 'city', 'demo', 'dispo', 'email_hash', 'entry_date', 'job_status', 'lead_number', 'phone_hash', 'set', 'state', 'zip']
SFTP\data_3.csv                12  ['appt_datetime', 'city', 'demo', 'dispo', 'email_hash', 'entry_date', 'job_status', 'lead_number', 'phone_hash', 'set', 'state', 'zip']
SFTP\data_4.csv                12  ['appt_datetime', 'city', 'demo', 'dispo', 'email_hash', 'entry_date', 'job_status', 'lead_number', 'phone_hash', 'set', 'state', 'zip']
SFTP\data_5.csv                12  ['appt_datetime', 'city', 'demo', 'dispo', 'email_hash', 'entr

In [8]:
reports_df = pd.concat(reports, ignore_index=True)
reports_df = reports_df.convert_dtypes()
describe_df(reports_df)

Number of rows: 82287
Number of columns: 12
Column names: ['entry_date', 'lead_number', 'email_hash', 'phone_hash', 'city', 'state', 'zip', 'appt_datetime', 'set', 'demo', 'dispo', 'job_status']
Column names: [dtype('O'), Int64Dtype(), string[python], string[python], string[python], string[python], dtype('O'), dtype('<M8[ns]'), BooleanDtype, BooleanDtype, string[python], string[python]]
DataFrame information:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 82287 entries, 0 to 82286
Data columns (total 12 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   entry_date     82287 non-null  object        
 1   lead_number    82287 non-null  Int64         
 2   email_hash     82287 non-null  string        
 3   phone_hash     82287 non-null  string        
 4   city           82043 non-null  string        
 5   state          82287 non-null  string        
 6   zip            82022 non-null  object        
 7   appt_datetime 

In [9]:
# Join reports with leads

# option 1
# from pandasql import sqldf
# test = sqldf('''SELECT *
#                 FROM reports_df r
#                 LEFT JOIN leads_df l ON l.phone_hash = r.phone_hash OR l.email_hash = r.email_hash''')

# option 2
emails_dict = dict(zip(leads_df.email_hash, leads_df.lead_uuid))
phones_dict = dict(zip(leads_df.phone_hash, leads_df.lead_uuid))
def find_lead(row):  
    if row['email_hash'] in emails_dict:
        return emails_dict[row['email_hash']]
    elif row['phone_hash'] in phones_dict:
        return phones_dict[row['phone_hash']]
    else:
        raise LookupError('No match')

reports_df['lead_uuid'] = reports_df.apply(find_lead, axis='columns')
describe_df(reports_df)

Number of rows: 82287
Number of columns: 13
Column names: ['entry_date', 'lead_number', 'email_hash', 'phone_hash', 'city', 'state', 'zip', 'appt_datetime', 'set', 'demo', 'dispo', 'job_status', 'lead_uuid']
Column names: [dtype('O'), Int64Dtype(), string[python], string[python], string[python], string[python], dtype('O'), dtype('<M8[ns]'), BooleanDtype, BooleanDtype, string[python], string[python], dtype('O')]
DataFrame information:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 82287 entries, 0 to 82286
Data columns (total 13 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   entry_date     82287 non-null  object        
 1   lead_number    82287 non-null  Int64         
 2   email_hash     82287 non-null  string        
 3   phone_hash     82287 non-null  string        
 4   city           82043 non-null  string        
 5   state          82287 non-null  string        
 6   zip            82022 non-null  object   

In [10]:
reports_df.to_csv('raw_leads.csv', index=False)