# Data Wrangling

Target time: 60 mins

_**Reminder:** Before you get started on this task, verify that you can log into
the online platform as described in [Setup](#setup)._

## Overview

We are working with new partners, Vendor X and Vendor Y, and need to ingest
their data into our data warehouse. Vendor X sends us a csv file each day.
Vendor Y makes their data available via their API. We need to download the
files, merge them and then load them into our database.

The goal is to set up an automated process to ingest this data. We need our Data
Engineer to write a script that will:

### 0. Notebook Preparation

In [44]:
import psycopg2
import csv
import requests
import pandas as pd
import numpy as np

# credentials
user = "u_173f722a36d72a4e353ee93dacf4872c"
password = "p_173f722a36d72a4e353ee93dacf4872c"
database = "sqlpad"
host = "68.183.51.176"
schema = "s_173f722a36d72a4e353ee93dacf4872c"
port = 5432

# connection database
def get_connection():
    conn = psycopg2.connect(
        user=user,
        password=password,
        host=host,
        port=port,
        database=database,
        options=f"-c search_path={schema}")
    return conn


### 1. Download Vendor Y's data from the API
   API URL: https://k4clzaf58d.execute-api.us-east-1.amazonaws.com/default/handle_users

In [45]:
r_vendors_y = requests.get("https://k4clzaf58d.execute-api.us-east-1.amazonaws.com/default/handle_users")
vendors_y = r_vendors_y.json()['data']
df_vendors_y = pd.DataFrame.from_records(vendors_y)
print(df_vendors_y.shape)
df_vendors_y.head()

(147, 55)


Unnamed: 0,registration_id,status,tracking_source,tracking_id,dob,email,citizenship_confirmed,salutation,first_name,middle_name,...,has_mailing_address_standardized,has_state_license_standardized,has_ssn_standardized,predicted_gender,org,evc_id,program_state,partner_id,field_start,field_end
0,d95bafc8-f2a4-427b-9cf4-bb99f4bea973,Step 1,ffxkVZQtq,MnMcLRkBO,60-05-29,amy78yahoo.com,True,Dr.,Steven,Lucas,...,True,False,True,female,org_5,PdrLoRgWH,DC,4355,2022-02-09 00:00:00,2022-02-09 02:00:00
1,e16dce72-f18e-4598-b5e1-f291d322a735,Step 1,BTWcdxxlp,RbfhebcUb,57-08-17,david22hotmail.com,True,Dr.,Bradley,Veronica,...,False,True,True,other,org_1,pggblWogn,AK,8531,2022-07-24 00:00:00,2022-07-24 02:00:00
2,8918b682-4f4a-453e-b430-051376e31f5a,Step 4,nRWnUZBBG,bLLdAHLlg,97-02-14,hdiaz5gmail.com,False,Dr.,Kyle,Nancy,...,True,False,True,male,org_1,ZfRZPcisy,IN,5397,2022-04-25 00:00:00,2022-04-25 02:00:00
3,1cb0a312-4978-4137-8624-857a2c2af60d,Step 2,IWBgvHpTG,qkkDTpzwY,84-11-22,stevenhiggins3hotmail.com,False,Misc.,Brian,Joshua,...,False,False,True,female,org_1,sJDaxcIyK,NV,3349,2022-10-10 00:00:00,2022-10-10 02:00:00
4,a62923cd-7f8f-4441-8ef5-4c3dad29b40a,Step 2,ESITtequt,vPYtPPzHf,76-01-19,petermorenogmail.com,True,Mr.,Jesse,Stephen,...,True,False,True,female,org_5,yIsEOjxum,NM,1557,2021-12-31 00:00:00,2021-12-31 02:00:00


### 2. Merge Vendor Y's data with Vendor X's data
    Vendor X's data: `data/vendor_x_data.csv`

In [46]:
df_vendors_x = pd.read_csv('data/vendor_x_data.csv')
# df_vendors_x = pd.read_csv('data/vendor_x_data.csv', index_col=["VENDOR_ID"])
print(df_vendors_x.shape)
df_vendors_x.head()

(147, 55)


Unnamed: 0,VENDOR_ID,STATUS,TRACKING_SOURCE,TRACKING_ID,DATE_OF_BIRTH,EMAIL_ADDRESS,CITIZENSHIP_CONFIRMED,SALUTATION,FIRST_NAME,MIDDLE_NAME,...,HAS_MAILING_ADDRESS_STANDARDIZED,HAS_STATE_LICENSE_STANDARDIZED,HAS_SSN_STANDARDIZED,PREDICTED_GENDER,ORG,EVC_ID,PROGRAM_STATE,PARTNER_ID,FIELD_START,FIELD_END
0,d95bafc8-f2a4-427b-9cf4-bb99f4bea973,Step 1,ffxkVZQtq,MnMcLRkBO,1960-05-29,amy78yahoo.com,True,Dr.,Steven,Lucas,...,True,False,True,female,org_5,PdrLoRgWH,DC,4355,2022-02-09 00:00:00,2022-02-09 02:00:00
1,e16dce72-f18e-4598-b5e1-f291d322a735,Step 1,BTWcdxxlp,RbfhebcUb,1957-08-17,david22hotmail.com,True,Dr.,Bradley,Veronica,...,False,True,True,other,org_1,pggblWogn,AK,8531,2022-07-24 00:00:00,2022-07-24 02:00:00
2,8918b682-4f4a-453e-b430-051376e31f5a,Step 4,nRWnUZBBG,bLLdAHLlg,1997-02-14,hdiaz5gmail.com,False,Dr.,Kyle,Nancy,...,True,False,True,male,org_1,ZfRZPcisy,IN,5397,2022-04-25 00:00:00,2022-04-25 02:00:00
3,1cb0a312-4978-4137-8624-857a2c2af60d,Step 2,IWBgvHpTG,qkkDTpzwY,1984-11-22,stevenhiggins3hotmail.com,False,Misc.,Brian,Joshua,...,False,False,True,female,org_1,sJDaxcIyK,NV,3349,2022-10-10 00:00:00,2022-10-10 02:00:00
4,a62923cd-7f8f-4441-8ef5-4c3dad29b40a,Step 2,ESITtequt,vPYtPPzHf,1976-01-19,petermorenogmail.com,True,Mr.,Jesse,Stephen,...,True,False,True,female,org_5,yIsEOjxum,NM,1557,2021-12-31 00:00:00,2021-12-31 02:00:00


In [47]:
# compare that the columns are in order, and they are the same
for c in range(len(df_vendors_x.columns)):
    print(df_vendors_x.columns.values.tolist()[c], ' < -- > ', df_vendors_y.columns.values.tolist()[c])

VENDOR_ID  < -- >  registration_id
STATUS  < -- >  status
TRACKING_SOURCE  < -- >  tracking_source
TRACKING_ID  < -- >  tracking_id
DATE_OF_BIRTH  < -- >  dob
EMAIL_ADDRESS  < -- >  email
CITIZENSHIP_CONFIRMED  < -- >  citizenship_confirmed
SALUTATION  < -- >  salutation
FIRST_NAME  < -- >  first_name
MIDDLE_NAME  < -- >  middle_name
LAST_NAME  < -- >  last_name
NAME_SUFFIX  < -- >  name_suffix
HOME_ADDRESS  < -- >  home_address
HOME_UNIT  < -- >  home_unit
HOME_CITY  < -- >  home_city
HOME_COUNTY  < -- >  home_county
HOME_STATE  < -- >  home_state
HOME_ZIP_CODE  < -- >  home_zip_code
MAILING_ADDRESS  < -- >  mailing_address
MAILING_UNIT  < -- >  mailing_unit
MAILING_CITY  < -- >  mailing_city
MAILING_COUNTY  < -- >  mailing_county
MAILING_STATE  < -- >  mailing_state
MAILING_ZIP_CODE  < -- >  mailing_zip_code
PARTY  < -- >  party
RACE  < -- >  race
PHONE  < -- >  phone
PHONE_TYPE  < -- >  phone_type
OPT_IN_TO_VENDOR_EMAIL  < -- >  opt_in_to_vendor_email
OPT_IN_TO_VENDOR_SMS  < -- >  o

In [48]:
# merge the dataframes
df_vendors = pd.DataFrame( np.concatenate( (df_vendors_x.values, df_vendors_y.values), axis=0 ) )
df_vendors.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,45,46,47,48,49,50,51,52,53,54
0,d95bafc8-f2a4-427b-9cf4-bb99f4bea973,Step 1,ffxkVZQtq,MnMcLRkBO,1960-05-29,amy78yahoo.com,True,Dr.,Steven,Lucas,...,True,False,True,female,org_5,PdrLoRgWH,DC,4355,2022-02-09 00:00:00,2022-02-09 02:00:00
1,e16dce72-f18e-4598-b5e1-f291d322a735,Step 1,BTWcdxxlp,RbfhebcUb,1957-08-17,david22hotmail.com,True,Dr.,Bradley,Veronica,...,False,True,True,other,org_1,pggblWogn,AK,8531,2022-07-24 00:00:00,2022-07-24 02:00:00
2,8918b682-4f4a-453e-b430-051376e31f5a,Step 4,nRWnUZBBG,bLLdAHLlg,1997-02-14,hdiaz5gmail.com,False,Dr.,Kyle,Nancy,...,True,False,True,male,org_1,ZfRZPcisy,IN,5397,2022-04-25 00:00:00,2022-04-25 02:00:00
3,1cb0a312-4978-4137-8624-857a2c2af60d,Step 2,IWBgvHpTG,qkkDTpzwY,1984-11-22,stevenhiggins3hotmail.com,False,Misc.,Brian,Joshua,...,False,False,True,female,org_1,sJDaxcIyK,NV,3349,2022-10-10 00:00:00,2022-10-10 02:00:00
4,a62923cd-7f8f-4441-8ef5-4c3dad29b40a,Step 2,ESITtequt,vPYtPPzHf,1976-01-19,petermorenogmail.com,True,Mr.,Jesse,Stephen,...,True,False,True,female,org_5,yIsEOjxum,NM,1557,2021-12-31 00:00:00,2021-12-31 02:00:00


### 3. Standardize the column names and the data
   See Data section below for morecons_email = pd.read_csv('cons_email.csv') info

In [49]:
# put columns names according to schema
df_vendors.columns = [str(col).strip().lower() for col in df_vendors_x.columns.to_list()]
df_vendors.head()

Unnamed: 0,vendor_id,status,tracking_source,tracking_id,date_of_birth,email_address,citizenship_confirmed,salutation,first_name,middle_name,...,has_mailing_address_standardized,has_state_license_standardized,has_ssn_standardized,predicted_gender,org,evc_id,program_state,partner_id,field_start,field_end
0,d95bafc8-f2a4-427b-9cf4-bb99f4bea973,Step 1,ffxkVZQtq,MnMcLRkBO,1960-05-29,amy78yahoo.com,True,Dr.,Steven,Lucas,...,True,False,True,female,org_5,PdrLoRgWH,DC,4355,2022-02-09 00:00:00,2022-02-09 02:00:00
1,e16dce72-f18e-4598-b5e1-f291d322a735,Step 1,BTWcdxxlp,RbfhebcUb,1957-08-17,david22hotmail.com,True,Dr.,Bradley,Veronica,...,False,True,True,other,org_1,pggblWogn,AK,8531,2022-07-24 00:00:00,2022-07-24 02:00:00
2,8918b682-4f4a-453e-b430-051376e31f5a,Step 4,nRWnUZBBG,bLLdAHLlg,1997-02-14,hdiaz5gmail.com,False,Dr.,Kyle,Nancy,...,True,False,True,male,org_1,ZfRZPcisy,IN,5397,2022-04-25 00:00:00,2022-04-25 02:00:00
3,1cb0a312-4978-4137-8624-857a2c2af60d,Step 2,IWBgvHpTG,qkkDTpzwY,1984-11-22,stevenhiggins3hotmail.com,False,Misc.,Brian,Joshua,...,False,False,True,female,org_1,sJDaxcIyK,NV,3349,2022-10-10 00:00:00,2022-10-10 02:00:00
4,a62923cd-7f8f-4441-8ef5-4c3dad29b40a,Step 2,ESITtequt,vPYtPPzHf,1976-01-19,petermorenogmail.com,True,Mr.,Jesse,Stephen,...,True,False,True,female,org_5,yIsEOjxum,NM,1557,2021-12-31 00:00:00,2021-12-31 02:00:00


In [50]:
# verify data types
df_vendors.dtypes

vendor_id                           object
status                              object
tracking_source                     object
tracking_id                         object
date_of_birth                       object
email_address                       object
citizenship_confirmed               object
salutation                          object
first_name                          object
middle_name                         object
last_name                           object
name_suffix                         object
home_address                        object
home_unit                           object
home_city                           object
home_county                         object
home_state                          object
home_zip_code                       object
mailing_address                     object
mailing_unit                        object
mailing_city                        object
mailing_county                      object
mailing_state                       object
mailing_zip

In [51]:
# convert and verify integer data types
df_vendors = df_vendors.astype({"home_unit":"int","mailing_unit":"int","shift_id":"int","shift_type":"int",
                                "vendor_a_shift_id":"int","partner_id":"int"})
df_vendors.dtypes

vendor_id                           object
status                              object
tracking_source                     object
tracking_id                         object
date_of_birth                       object
email_address                       object
citizenship_confirmed               object
salutation                          object
first_name                          object
middle_name                         object
last_name                           object
name_suffix                         object
home_address                        object
home_unit                            int32
home_city                           object
home_county                         object
home_state                          object
home_zip_code                       object
mailing_address                     object
mailing_unit                         int32
mailing_city                        object
mailing_county                      object
mailing_state                       object
mailing_zip

In [52]:
# convert and verify date data types

# I add '19' in years without four digits. After reviewed I think all dates are 19th century. Something to double-check for future tasks!
df_vendors['date_of_birth'] = df_vendors['date_of_birth'].map(lambda x: x if len(x)==10 else '19'+x)

df_vendors = df_vendors.astype({"date_of_birth":"datetime64", "registration_date":"datetime64",
                                "field_start":"datetime64", "field_end":"datetime64",})
df_vendors.dtypes

vendor_id                                   object
status                                      object
tracking_source                             object
tracking_id                                 object
date_of_birth                       datetime64[ns]
email_address                               object
citizenship_confirmed                       object
salutation                                  object
first_name                                  object
middle_name                                 object
last_name                                   object
name_suffix                                 object
home_address                                object
home_unit                                    int32
home_city                                   object
home_county                                 object
home_state                                  object
home_zip_code                               object
mailing_address                             object
mailing_unit                   

In [53]:
# convert and verify bool data types
df_vendors = df_vendors.astype({"citizenship_confirmed":"bool", "opt_in_to_vendor_email":"bool",
                                "opt_in_to_vendor_sms":"bool", "opt_in_to_partner_email":"bool",
                                "opt_in_to_partner_smsrobocall":"bool", "volunteer_for_vendor":"bool",
                                "volunteer_for_partner":"bool", "pre_registered":"bool",
                                "finish_with_state":"bool", "built_via_api":"bool",
                                "submitted_via_state_api":"bool", "has_mailing_address_standardized":"bool", "has_state_license_standardized":"bool", "has_ssn_standardized":"bool"})
df_vendors.dtypes

vendor_id                                   object
status                                      object
tracking_source                             object
tracking_id                                 object
date_of_birth                       datetime64[ns]
email_address                               object
citizenship_confirmed                         bool
salutation                                  object
first_name                                  object
middle_name                                 object
last_name                                   object
name_suffix                                 object
home_address                                object
home_unit                                    int32
home_city                                   object
home_county                                 object
home_state                                  object
home_zip_code                               object
mailing_address                             object
mailing_unit                   

In [54]:
# final dataframe
df_vendors.head()

Unnamed: 0,vendor_id,status,tracking_source,tracking_id,date_of_birth,email_address,citizenship_confirmed,salutation,first_name,middle_name,...,has_mailing_address_standardized,has_state_license_standardized,has_ssn_standardized,predicted_gender,org,evc_id,program_state,partner_id,field_start,field_end
0,d95bafc8-f2a4-427b-9cf4-bb99f4bea973,Step 1,ffxkVZQtq,MnMcLRkBO,1960-05-29,amy78yahoo.com,True,Dr.,Steven,Lucas,...,True,False,True,female,org_5,PdrLoRgWH,DC,4355,2022-02-09,2022-02-09 02:00:00
1,e16dce72-f18e-4598-b5e1-f291d322a735,Step 1,BTWcdxxlp,RbfhebcUb,1957-08-17,david22hotmail.com,True,Dr.,Bradley,Veronica,...,False,True,True,other,org_1,pggblWogn,AK,8531,2022-07-24,2022-07-24 02:00:00
2,8918b682-4f4a-453e-b430-051376e31f5a,Step 4,nRWnUZBBG,bLLdAHLlg,1997-02-14,hdiaz5gmail.com,False,Dr.,Kyle,Nancy,...,True,False,True,male,org_1,ZfRZPcisy,IN,5397,2022-04-25,2022-04-25 02:00:00
3,1cb0a312-4978-4137-8624-857a2c2af60d,Step 2,IWBgvHpTG,qkkDTpzwY,1984-11-22,stevenhiggins3hotmail.com,False,Misc.,Brian,Joshua,...,False,False,True,female,org_1,sJDaxcIyK,NV,3349,2022-10-10,2022-10-10 02:00:00
4,a62923cd-7f8f-4441-8ef5-4c3dad29b40a,Step 2,ESITtequt,vPYtPPzHf,1976-01-19,petermorenogmail.com,True,Mr.,Jesse,Stephen,...,True,False,True,female,org_5,yIsEOjxum,NM,1557,2021-12-31,2021-12-31 02:00:00


#### 4. Save data to a file
   `all_vendors.csv`

In [55]:
df_vendors = df_vendors.set_index("vendor_id")
df_vendors.to_csv('data/all_vendors.csv')

### 5. Import data in to the database
   1. Schema name (the result of this query using your username):
      `select 's_<hash>;` (use the schema name shared with you)
   2. Table name: `all_vendors`

In [56]:
# 1. Create table all_vendors according to schema
create_table = f'''
    DROP TABLE IF EXISTS {schema}.all_vendors;
    CREATE TABLE "all_vendors"(
    "vendor_id"	varchar(1024) NULL,
    "status"	varchar(1024) NULL,
    "tracking_source"	varchar(1024) NULL,
    "tracking_id"	varchar(1024) NULL,
    "date_of_birth"	date NULL,
    "email_address"	varchar(1024) NULL,
    "citizenship_confirmed"	boolean NULL,
    "salutation"	varchar(1024) NULL,
    "first_name"	varchar(1024) NULL,
    "middle_name"	varchar(1024) NULL,
    "last_name"	varchar(1024) NULL,
    "name_suffix"	varchar(1024) NULL,
    "home_address"	varchar(1024) NULL,
    "home_unit"	integer NULL,
    "home_city"	varchar(1024) NULL,
    "home_county" varchar(1024) NULL,
    "home_state" varchar(1024) NULL,
    "home_zip_code"	varchar(1024) NULL,
    "mailing_address" varchar(1024) NULL,
    "mailing_unit" integer NULL,
    "mailing_city" varchar(1024) NULL,
    "mailing_county" varchar(1024) NULL,
    "mailing_state"	varchar(1024) NULL,
    "mailing_zip_code"	varchar(1024) NULL,
    "party"	varchar(1024) NULL,
    "race"	varchar(1024) NULL,
    "phone"	varchar(1024) NULL,
    "phone_type" varchar(1024) NULL,
    "opt_in_to_vendor_email" boolean NULL,
    "opt_in_to_vendor_sms" boolean NULL,
    "opt_in_to_partner_email" boolean NULL,
    "opt_in_to_partner_smsrobocall"	boolean NULL,
    "volunteer_for_vendor" boolean NULL,
    "volunteer_for_partner"	boolean NULL,
    "pre_registered" boolean NULL,
    "registration_date"	timestamp NULL,
    "finish_with_state"	boolean NULL,
    "built_via_api"	boolean NULL,
    "submitted_via_state_api" boolean NULL,
    "registration_source" varchar(1024) NULL,
    "shift_id" integer NULL,
    "shift_type" integer NULL,
    "office" varchar(1024) NULL,
    "vendor_a_shift_id"	integer NULL,
    "salutation_standardized" varchar(1024) NULL,
    "has_mailing_address_standardized" boolean NULL,
    "has_state_license_standardized" boolean NULL,
    "has_ssn_standardized" bool NULL,
    "predicted_gender" varchar(1024) NULL,
    "org" varchar(1024) NULL,
    "evc_id" varchar(1024) NULL,
    "program_state"	varchar(1024) NULL,
    "partner_id" integer NULL,
    "field_start" timestamp NULL,
    "field_end"	timestamp NULL
    );
'''
conn = get_connection()
cursor = conn.cursor()
cursor.execute(create_table)
conn.commit()

In [57]:
# populate table all_vendors
conn = get_connection()
cursor = conn.cursor()
# load file
file = open('data/all_vendors.csv')
content_file = csv.reader(file)
# skip headers
next(content_file)
# create 55 %s parameters for the query
param = ""
for n in range(55):
    param += "%s, "
# define the query
insert_sql = f"INSERT INTO {schema}.all_vendors (vendor_id,status,tracking_source,tracking_id,date_of_birth,email_address,citizenship_confirmed,salutation,first_name,middle_name,last_name,name_suffix,home_address,home_unit,home_city,home_county,home_state,home_zip_code,mailing_address,mailing_unit,mailing_city,mailing_county,mailing_state,mailing_zip_code,party,race,phone,phone_type,opt_in_to_vendor_email,opt_in_to_vendor_sms,opt_in_to_partner_email,opt_in_to_partner_smsrobocall,volunteer_for_vendor,volunteer_for_partner,pre_registered,registration_date,finish_with_state,built_via_api,submitted_via_state_api,registration_source,shift_id,shift_type,office,vendor_a_shift_id,salutation_standardized,has_mailing_address_standardized,has_state_license_standardized,has_ssn_standardized,predicted_gender,org,evc_id,program_state,partner_id,field_start,field_end) VALUES ({param[:-2]})"
# run query
cursor.executemany(insert_sql, content_file)
conn.commit()
file.close()

In [58]:
# verify data in the table
query = f"""SELECT * FROM {schema}.all_vendors """
cursor.execute(query)
records = cursor.fetchall()
print(len(records), records[0])

294 ('d95bafc8-f2a4-427b-9cf4-bb99f4bea973', 'Step 1', 'ffxkVZQtq', 'MnMcLRkBO', datetime.date(1960, 5, 29), 'amy78yahoo.com', True, 'Dr.', 'Steven', 'Lucas', 'James', 'IV', '233 Gregory Extensions', 2, 'Yolandaside', 'county_3', 'NV', '54852', '557 Stephanie Locks Suite 837', 4, 'Masonstad', 'county_8', 'NH', '46477', 'independent', 'unknown', '+1-773-529-4744', 'other', False, False, False, False, False, False, False, datetime.datetime(2022, 4, 12, 0, 0), True, False, True, 'Web', 8386, 6006, 'office_2', 5594, 'Mr.', True, False, True, 'female', 'org_5', 'PdrLoRgWH', 'DC', 4355, datetime.datetime(2022, 2, 9, 0, 0), datetime.datetime(2022, 2, 9, 2, 0))
