# PAWS Data Pipeline
The objective of this script is to create a master data table that links all the PAWS datasources together.
## Pipeline sections
0. Import libraries
1. Create & populate database 
2. Create ***metadata master table*** schema to link all source tables together & populate with one of the dataset (e.g. SalesForce)
3. For each dataset, merge each record with the ***metadata master table***. If a match is found, link the two sources. If not, create a new record. <br/>
    a. Petpoint<br/>
    b. Volgistics<br/>
    c. Other - TBD<br/>
4. Write the new table to the database

### 0. Import libraries

In [1]:
import sqlite3
import pandas as pd
import numpy as np
import re
from fuzzywuzzy import fuzz

### 1. Create & populate database 

In [2]:
# connect to or create database

conn = sqlite3.connect("./sample_data/paws.db")

In [3]:
# function for loading a csv into a database table or "updating" the table by dropping it and recreating it with the csv

def load_to_sqlite(csv_name, table_name, connection, drop_first_col=False, manual_index_name=None):
    
    # load csv into a dataframe
    df = pd.read_csv(csv_name, encoding='cp1252')
    
    # drop the first column - so far all csvs have had a first column that's an index and doesn't have a name
    if drop_first_col:
        df = df.drop(df.columns[0], axis=1)
    
    # strip whitespace and periods from headers, convert to lowercase
    df.columns = df.columns.str.lower().str.strip()
    df.columns = df.columns.str.replace(' ', '_')
    df.columns = df.columns.map(lambda x: re.sub(r'\.+', '_', x))
    
    # create a cursor object, and use it to drop the table if it exists
    cursor = connection.cursor()
    cursor.execute(f'DROP TABLE {table_name}')
    connection.commit()
    cursor.close()
    
    # optionally add an ID number for linking against master data.
    # Ideally we would implement this feature by using sqlite's PRIMARY KEY constraint, but table schemas or
    # constraints cannot be modified in sqlite3 after they're defined.  The norm is altering the table name,
    # rebuilding the schema with the primary key constraint, then inserting all of the original data.
    if manual_index_name is not None:
        df[manual_index_name] = range(df.shape[0])
    
    # load dataframe into database table
    df.to_sql(table_name, connection, index=False,)

In [4]:
# load petpoint

load_to_sqlite('./sample_data/CfP_PDP_petpoint_deidentified.csv', 'petpoint', conn, True)

In [5]:
# load volgistics

load_to_sqlite('./sample_data/CfP_PDP_volgistics_deidentified.csv', 'volgistics', conn, True, 'volgistics_id')

In [6]:
# load salesforce contacts

load_to_sqlite('./sample_data/CfP_PDP_salesforceContacts_deidentified.csv', 'salesforcecontacts', conn, True)

In [7]:
# load salesforce donations

load_to_sqlite('./sample_data/CfP_PDP_salesforceDonations_deidentified.csv', 'salesforcedonations', conn, True)

  if self.run_code(code, result):


### 2. Create ***metadata master table*** schema to link all source tables together & populate with one of the datasets (e.g. SalesForce)

In [8]:
pd.read_sql('select * from salesforcecontacts', conn).tail()

Unnamed: 0,account_name,contact_id,first_name,last_name,title,mailing_street,mailing_city,mailing_state_province,mailing_zip_postal_code,mailing_country,phone,fax,mobile,email,account_owner,account_id
60182,Angelica el-Ashraf Bistro,0033p00002UO8dB,Angelica,el-Ashraf,,1417 Estate,Fontana,Pennsvania,19119-3111,US,,,,pxm@bnygeuzhvo.ewu,PAWS Development,0013p00001pVtVy
60183,Cassondra el-Kamal Household,0033p00002UO8ed,Cassondra,el-Kamal,,2210 S. 14st Street,West Portsmouth,NH,19125-3329,US,,,,ske@ciqgr.ndf,PAWS Development,0013p00001pVtWX
60184,Justin Campbell Bistro,0033p00002UO8oS,Justin,Campbell,,4074 S. 41rd St.,Ocean,Texas,19474-0204,,,,,faxh@lcume.enj,Jared Hupp,0013p00001pVtaP
60185,Aslam Wilson Household,0033p00002UO8q2,Aslam,Wilson,,222 n Columbus blvd,New Haven,BC,17009,US,4146143364.0,,,tpik@wotkn.qwi,PAWS Development,0013p00001pVtaj
60186,Dashawn Patterson Household,0033p00002UO8tB,Dashawn,Patterson,,311,High Bridge,WA,19064-3130,,,,,nobcuvj@blyh.zva,Jared Hupp,0013p00001pVtbS


In [9]:
pd.read_sql('select * from volgistics', conn).tail()

Unnamed: 0,last_name_first_name,first_name_last_name,title_first_name_last_name,last_name,first_name,middle_name,title,nickname,status,type,...,spare_checkbox_6,volunteer_distribution_list,general_volunteer_emails,schedule_reminders,my_availability_is,from,to,i_would_like_to_serve_up_to,hours,volgistics_id
1237,"Lo, Max",Max Lo,Ms. Max Lo,Lo,Max,,Ms.,,Active,,...,,Yes,,Yes,,,,0,,1237
1238,"Johnson, Jessica",Jessica Johnson,Jessica Johnson,Johnson,Jessica,,,,Active,,...,,Yes,,Yes,,,,0,,1238
1239,"Williams, Bryce",Bryce Williams,Bryce Williams,Williams,Bryce,,,They/them pronouns please,Active,,...,,Yes,,Yes,,,,0,,1239
1240,"Turner, Kaelyn",Kaelyn Turner,Ms. Kaelyn Turner,Turner,Kaelyn,,Ms.,,Active,,...,,Yes,,Yes,,,,0,,1240
1241,"el-Majeed, Carolina",Carolina el-Majeed,Ms. Carolina el-Majeed,el-Majeed,Carolina,,Ms.,,Active,,...,,Yes,,Yes,,,,0,,1241


In [10]:
def clean_entry(entry):
    """
    Function to clean up all values returned from the SQL statement, so this 
    should be performed on every entry in the dataframe with an applymap
    
    1 Change 'None' or 'NaN' value to an empty string
    2 Cast value as string
    3 Lowercase value
    3 Strip leading and trailing white space
    4 Remove punctuation by only keeping letters, numbers and white space
    5 Replace internal multiple consecutive white spaces with a single white space
    """
    
    # convert None and NaN to an empty string
    if entry ==  None or entry == np.nan:
        entry = ''
    
    # convert to string, lowercase, and strip leading and trailing whitespace
    entry = str(entry).lower().strip()
    
#    # remove all non alphanumeric characters except white space
#    alphanumeric_and_space = ' 1234567890abcdefghijklmnopqrstuvwxyz'
#    entry = ''.join([c for c in entry if c in alphanumeric_and_space])
    
    # cut down (internal) consecutive whitespaces to one white space
    entry = re.sub(r'\s+', ' ', entry)
    
    return entry

In [11]:
def create_user_master_df(connection, query, *addl_columns):
    """
    Creates a pandas dataframe placeholder with key meta-data to fuzzy-match
    the users from different datasets.
    
    Pseudo-code:
        Create a blank pandas dataframe (e.g. pd.DataFrame) with columns for
        Name (last, first), address, zip code, phone number, email, etc.
        
        Include "ID" fields for each of the datasets that will be merged.
        
        Populate/Initialize the dataframe with data from one of the datasets
        (e.g. Salesforce)
    """
    
    # pull the dataframe from SQL database, call cleaning function, 
    # and add empty columns for the datasets that will be merged
    df = pd.read_sql(query, connection)
    df = df.applymap(clean_entry)
    
    for col_name in addl_columns:
        df[col_name] = np.nan
    
    return df

In [12]:
def standardize_states(state,  min_ratio=80):
    """
    Taking a state or territory's name as its argument, this function returns 
    the 2 letter postal abbreviation. Since the data is human input and 
    often misspelled, it relies on a fuzzy match based on the Levenshtein 
    Distance. 
    
    If the fuzzy match ratio is above a minimum (defaulting to 80%) it 
    selects the top match, otherwise it returns a blank.
    """
    
    state_abbr_dict = {'alabama': 'al',
                     'alaska': 'ak',
                     'arizona': 'az',
                     'arkansas': 'ar',
                     'california': 'ca',
                     'colorado': 'co',
                     'connecticut': 'ct',
                     'delaware': 'de',
                     'florida': 'fl',
                     'georgia': 'ga',
                     'hawaii': 'hi',
                     'idaho': 'id',
                     'illinois': 'il',
                     'indiana': 'in',
                     'iowa': 'ia',
                     'kansas': 'ks',
                     'kentucky': 'ky',
                     'louisiana': 'la',
                     'maine': 'me',
                     'maryland': 'md',
                     'massachusetts': 'ma',
                     'michigan': 'mi',
                     'minnesota': 'mn',
                     'mississippi': 'ms',
                     'missouri': 'mo',
                     'montana': 'mt',
                     'nebraska': 'ne',
                     'nevada': 'nv',
                     'new hampshire': 'nh',
                     'new jersey': 'nj',
                     'new mexico': 'nm',
                     'new york': 'ny',
                     'north carolina': 'nc',
                     'north dakota': 'nd',
                     'ohio': 'oh',
                     'oklahoma': 'ok',
                     'oregon': 'or',
                     'pennsylvania': 'pa',
                     'rhode island': 'ri',
                     'south carolina': 'sc',
                     'south dakota': 'sd',
                     'tennessee': 'tn',
                     'texas': 'tx',
                     'utah': 'ut',
                     'vermont': 'vt',
                     'virginia': 'va',
                     'washington': 'wa',
                     'west virginia': 'wv',
                     'wisconsin': 'wi',
                     'wyoming': 'wy',
                     'american samoa': 'as',
                     'district of columbia': 'dc',
                     'washington dc': 'dc',
                     'washington district of columbia': 'dc',
                     'federated states of micronesia': 'fm',
                     'guam': 'gu',
                     'marshall islands': 'mh',
                     'northern mariana islands': 'mp',
                     'palau': 'pw',
                     'puerto rico': 'pr',
                     'virgin islands': 'vi'}
    
    # check if it's 2 letters long and if it is, return it
    if len(state) <= 2:
        return state
    
    # get levenshtein ratio score for every state or territory in the dictionary
    # and keep only those above the minimum ratio
    fuzz_ratios = pd.Series(list(state_abbr_dict.keys()), index = state_abbr_dict.keys())
    fuzz_ratios = fuzz_ratios.apply(lambda x: fuzz.ratio(x, state))
    fuzz_ratios = fuzz_ratios[fuzz_ratios >= min_ratio]
    
    # either return the abbreviation for the top scoring state, or if nothing is 
    # over the minimum score return an empty string
    if len(fuzz_ratios > 0):
        best_abbr = fuzz_ratios.idxmax()
        return state_abbr_dict[best_abbr]
    else:
        return ''

In [13]:
# create master dataframe using the 'salesforcecontacts' table

sf_cont_query = """SELECT    last_name
                             , first_name 
                             , mailing_street as street
                             , mailing_city as city
                             , mailing_state_province as state_etc 
                             , mailing_zip_postal_code as zipcode
                             , mailing_country as country
                             , phone
                             , mobile
                             , email
                    FROM     salesforcecontacts"""

### cleanup still to do in pandas ###
# street needs to have formatting standardized (eg 19th st vs 19 st, n vs north)

master_df = create_user_master_df(conn, sf_cont_query, 'volgistics_id', 'petpoint_id', 'sf_donations_id')

# combine last and first names to make a single name column
master_df['name'] = master_df['last_name'] + ', ' + master_df['first_name']

# standardize state and territory names to their 2 letter postal abbreviation
master_df['state_etc'] = master_df['state_etc'].apply(standardize_states)

# keep only 5 digit zip code
master_df['zipcode'] = master_df['zipcode'].str[:5]

# combine address columns to make a single address column
master_df['address'] = (master_df['street'] + ' ' + master_df['city'] + ' ' + master_df['state_etc'] + ' ' + master_df['zipcode'] + ' ' + master_df['country']).str.strip()
master_df['address'] = master_df['address'].apply(lambda addy: re.sub(r'\s+', ' ', addy.strip()))

# drop extraneous address columns
master_df = master_df[['name', 'address', 'phone', 'mobile', 'email', 'volgistics_id', 'petpoint_id', 'sf_donations_id']]

master_df.tail(10)

Unnamed: 0,name,address,phone,mobile,email,volgistics_id,petpoint_id,sf_donations_id
60177,"tahlo, angelo",142 south 17th street olyphant ne 94602 us,3076661667,,b@hdsgy.iqf,,,
60178,"al-dar, alesha",3250 federal st colpey yt 19148 us,,,o@cir.bfh,,,
60179,"price, fiona",55 monument rd barnwell 19142 us,1024144111,,xogqmfwesz@lkhiq.puv,,,
60180,"duran, lars",2026 luff lane cave creek va 19152 us,227-207-3223,,vzqgekdrc@tfh.wyi,,,
60181,"musso, edward",2222 rancocas road bethesda ct 6026 us,1222104212,,gwua@tkx.jch,,,
60182,"el-ashraf, angelica",1417 estate fontana pa 19119 us,,,pxm@bnygeuzhvo.ewu,,,
60183,"el-kamal, cassondra",2210 s. 14st street west portsmouth nh 19125 us,,,ske@ciqgr.ndf,,,
60184,"campbell, justin",4074 s. 41rd st. ocean tx 19474,,,faxh@lcume.enj,,,
60185,"wilson, aslam",222 n columbus blvd new haven bc 17009 us,4146143364,,tpik@wotkn.qwi,,,
60186,"patterson, dashawn",311 high bridge wa 19064,,,nobcuvj@blyh.zva,,,


### 3. For each dataset, merge each record with the ***metadata master table***
If a match is found, link the two sources. If not, create a new record. <br/>

In [14]:
def fuzzy_merge(new_df, master_df):
    """
    This function merges each new dataset with the metadata master table by
    going line-by-line on the new dataset and looking for a match in the 
    existing metadata master dataset. If a match is found
    
    Pseudo-code:
        LOOP: For each line in the new_df, compare that line against all lines in 
        the master_df. 
        
        LOGIC: For each comparison, generate (a) a fuzzy-match score on name,
        (b) T/F on whether zip-code matches, (c) T/F on whether email matches,
        (d) T/F on whether phone number matches.
        
        OUTPUT: For each comparison if the fuzzy-match score is above a threshold (e.g. >=90%)
        and (b), (c) or (d) matches, consider it a match and add the new dataset 
        id to the existing record. If it doesn't match, create a new record in the
        master dataset.
        
    Note: there's probably a more efficient way to do this (vs. going line-by-line)
    """

#### 3.A Petpoint merge 
Apply function above the Petpoint dataset

In [15]:
# TODO: "Compare Master to PetPoint: If [fuzzy match on name above threshold] and [match on email] → combine records in Master"

#### 3.B Volgistics merge
Apply function above the Volgistics dataset

In [17]:
# Compare Volgistics to Master: If [fuzzy match on name above threshold] and [match on email] → combine records in Master
master_df.head()
volgistics = pd.read_sql('select * from volgistics', conn)
# Note: we could add foreign key support to the master data table referencing the volgistics primary key
pd.options.display.max_columns = 999
#volgistics.head()

In [19]:
# Verify that we obtain records that match on email
(
    volgistics
    [['last_name_first_name', 'email', 'volgistics_id']].rename(columns={'last_name_first_name': 'volgistics_name'})
    .merge(master_df[['name', 'email']].rename(columns={'name': 'master_name'}), how='inner')
)

Unnamed: 0,volgistics_name,email,volgistics_id,master_name
0,"Ipsum, Lorem",lorem@gmail.com,0,"ipsum, lorem"
1,"Bean, Jim",jimb3@gmail.com,10,"bean, jim"


In [59]:
# Instead of merging master_df to volgistics, merge the relevant volgistics ID's/email/name into master_df.
# Assign a fuzzy name score, apply a threshold to separate the data for review, etc.
# Then drop the extra volgistics_name field at the end.
master_with_volgistics = (
    master_df
    .drop(['volgistics_id'], axis=1)  # in newer pandas, this can be written more concisely: .drop(columns='volgistics_id')
    .merge(
        volgistics[['last_name_first_name', 'email', 'volgistics_id']].rename(columns={'last_name_first_name': 'volgistics_name'}),
        how='left'
    )
    .assign(volgistics_name=lambda df: df['volgistics_name'].map(clean_entry))
)
master_with_volgistics.head()

Unnamed: 0,name,address,phone,mobile,email,petpoint_id,sf_donations_id,volgistics_name,volgistics_id
0,"ipsum, lorem",704 wynnemoor way orinda co 7701 us,222-444-5555,,lorem@gmail.com,,,"ipsum, lorem",0.0
1,"doe, jane",moore rd,333-555-6666,,jane@gmail.com,,,,
2,"thomas, jade",220 annin st malvern pa 20009 us,1276261767,714 - 711-1110,mvkbtwogp@rgvqkued.egp,,,,
3,"rascon, hannah",150 chestnut st scotch plain in 18640 us,544 - 555-4550,141 - 343-1454,xebqfclvop@qfrhgzkuo.xzi,,,,
4,"flores, robert",5818 bristol 19123 us,235-235-5555,,rapwxnko@ltkp.ect,,,,


In [60]:
# Calculate fuzzy matching with fuzzywuzzy or another package
master_with_volgistics['fuzzy_score'] = master_with_volgistics.apply(
    lambda row: fuzz.ratio(row['name'], row['volgistics_name']),
    axis=1  # rowwise
)
master_with_volgistics.head()

# Forward the results for review, only keeping results with suitable emails/names
fuzzy_name_cutoff = 70  # FIXME: arbitrarily picked a value for now.  Will need to tune this variable
#fuzzy_name_cutoff = 170  # impossible: uncomment to test the csv outputs here
(
    master_with_volgistics
    [(~master_with_volgistics['volgistics_id'].isnull()) & (master_with_volgistics['fuzzy_score'] < fuzzy_name_cutoff)]
    .to_csv("temp/logging_volgistics_names_for_review.csv", index=False)
)
(
    master_with_volgistics
    [(~master_with_volgistics['volgistics_id'].isnull()) & (master_with_volgistics['fuzzy_score'] >= fuzzy_name_cutoff)]
    .to_csv("temp/logging_volgistics_names_matched.csv", index=False)
)

# Apply the fuzzy name requirements to the join
master_with_volgistics['volgistics_id'] = master_with_volgistics.apply(
    lambda row: row['volgistics_id'] if (row['fuzzy_score'] >= fuzzy_name_cutoff) else np.nan,
    axis=1
)

In [61]:
master_with_volgistics.head()

Unnamed: 0,name,address,phone,mobile,email,petpoint_id,sf_donations_id,volgistics_name,volgistics_id,fuzzy_score
0,"ipsum, lorem",704 wynnemoor way orinda co 7701 us,222-444-5555,,lorem@gmail.com,,,"ipsum, lorem",0.0,100
1,"doe, jane",moore rd,333-555-6666,,jane@gmail.com,,,,,33
2,"thomas, jade",220 annin st malvern pa 20009 us,1276261767,714 - 711-1110,mvkbtwogp@rgvqkued.egp,,,,,13
3,"rascon, hannah",150 chestnut st scotch plain in 18640 us,544 - 555-4550,141 - 343-1454,xebqfclvop@qfrhgzkuo.xzi,,,,,35
4,"flores, robert",5818 bristol 19123 us,235-235-5555,,rapwxnko@ltkp.ect,,,,,0


In [62]:
# Save the volgistics ID's back to the master data
master_df = master_with_volgistics.drop(['volgistics_name', 'fuzzy_score'], axis=1)

In [64]:
# Taking a quick look at the two volgistics ID's successfully linked in master_df
master_with_volgistics[~master_with_volgistics['volgistics_id'].isnull()]

Unnamed: 0,name,address,phone,mobile,email,petpoint_id,sf_donations_id,volgistics_name,volgistics_id,fuzzy_score
0,"ipsum, lorem",704 wynnemoor way orinda co 7701 us,222-444-5555,,lorem@gmail.com,,,"ipsum, lorem",0.0,100
5,"bean, jim",6555 north hartland 60612 us,,3355333533.0,jimb3@gmail.com,,,"bean, jim",10.0,100


#### 3.C Other - TBD - Merge

### 4. Write the new table to the database

In [None]:
raise ValueError("Joins above not yet finished")
# load_to_sqlite(master_df, master_table, conn)

## Other - placeholder - graveyard
Graveyard/placeholder code from previous sections

In [None]:
# simple join to check that it worked and the tables can be queried

df = pd.read_sql('''select * from petpoint as pp 
                    join volgistics as vol 
                    on pp."unnamed:_0" = vol."unnamed:_0"

                    join (SELECT * FROM salesforcecontacts AS sf_contacts
                            JOIN salesforcedonations AS sf_donations
                            ON sf_contacts."Account_ID" = sf_donations."Account_ID") as sf
                    on pp."unnamed:_0" = sf."unnamed:_0"
                    
                    ''', conn)

df.head()

In [None]:
# get all data matching on (first name + last name)

df2 = pd.read_sql('''SELECT * FROM petpoint AS pp
                     INNER JOIN volgistics AS vol ON pp."Intake_Record_Owner" = vol."First_name_Last_name"
                     INNER JOIN (SELECT * FROM salesforcecontacts AS sf_contacts
                            JOIN salesforcedonations AS sf_donations
                            ON sf_contacts."Account_ID" = sf_donations."Account_ID") AS sf
                     ON pp."Intake_Record_Owner" = (sf."First_Name" + " " + sf."Last_Name")
                  ''', conn)
df2.head()

In [None]:
# close database connection

conn.close()