<a href="https://colab.research.google.com/github/gimenopea/missionwired/blob/main/PaulGimenoETLTask.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Submission by Paul Gimeno

gimenopea@gmail.com

---


In [None]:
import pandas as pd

#grab source files
cons_src = 'https://als-hiring.s3.amazonaws.com/fake_data/2020-07-01_17%3A11%3A00/cons.csv'
emails_src = 'https://als-hiring.s3.amazonaws.com/fake_data/2020-07-01_17%3A11%3A00/cons_email.csv'
cons_sub_src = 'https://als-hiring.s3.amazonaws.com/fake_data/2020-07-01_17%3A11%3A00/cons_email_chapter_subscription.csv'

**1. Setting up helper functions for profiling and reading source files**

This is designed in chunks to accomodate large blob source formats.

In [None]:
def sample_src_df(src_url = None, chunksize=100):
    ''' returns the first chunk of a csv file as a dataframe of chunksize = chunksize'''
    if src_url is None:
        src_url = cons_src  
    reader = pd.read_csv(src_url, chunksize=chunksize)
    cons_df = next(reader)
    return cons_df

In [None]:
#this helper function can be modified to process each chunk for map/reduction tasks as needed
def read_src_df(src_url = None, chunksize=100, columns=None):
    ''' input: source url of csv file
    chunksize: defaults to 100 records per chunk
    columns: list of columns to be returned
    returns a dataframe of chunksize = chunksize''' 

    reader = pd.read_csv(src_url, chunksize=chunksize)
    #for each chunk, grab the columns cons_id and source and concat them into one dataframe
    df = pd.concat([chunk[columns] for chunk in reader])
    print(f'{len(df)} records read')
    return df

In [None]:
#profile initial rows
sample_cons = sample_src_df(cons_src)
sample_emails = sample_src_df(emails_src)
sample_cons_sub = sample_src_df(cons_sub_src)

**2. Read and process cons.csv**

In [None]:
#profile initial rows
sample_cons.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 29 columns):
 #   Column                      Non-Null Count  Dtype  
---  ------                      --------------  -----  
 0   cons_id                     100 non-null    int64  
 1   prefix                      47 non-null     object 
 2   firstname                   54 non-null     object 
 3   middlename                  76 non-null     object 
 4   lastname                    43 non-null     object 
 5   suffix                      53 non-null     object 
 6   salutation                  48 non-null     object 
 7   gender                      61 non-null     object 
 8   birth_dt                    42 non-null     object 
 9   title                       53 non-null     object 
 10  employer                    52 non-null     object 
 11  occupation                  46 non-null     object 
 12  income                      52 non-null     float64
 13  source                      60 non-n

In [None]:
#read cons.csv and return relevant columns for processing  using helper function in step 1
cons_df = read_src_df(cons_src, columns=['cons_id', 'source','create_dt','modified_dt'])

700000 records read


In [None]:
#check if cons_id is unique
print(cons_df['cons_id'].is_unique)

#read unique values of source
print(cons_df['source'].unique())

#replace null values with 'unknown'
cons_df['source'] = cons_df['source'].fillna('unknown')

#convert create_dt and modified_dt to datetime for date grouping in later steps
cons_df['create_dt'] = pd.to_datetime(cons_df['create_dt'], format='%a, %Y-%m-%d %H:%M:%S')
cons_df['modified_dt'] = pd.to_datetime(cons_df['modified_dt'], format ='%a, %Y-%m-%d %H:%M:%S')

#column renaming to match desired output 
cons_df.rename(columns={'modified_dt':'updated_dt'}, inplace=True)

True
['google' 'facebook' nan 'twitter' 'organic']


In [None]:
#final cons pre-processed file
cons_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 700000 entries, 0 to 699999
Data columns (total 4 columns):
 #   Column      Non-Null Count   Dtype         
---  ------      --------------   -----         
 0   cons_id     700000 non-null  int64         
 1   source      700000 non-null  object        
 2   create_dt   700000 non-null  datetime64[ns]
 3   updated_dt  700000 non-null  datetime64[ns]
dtypes: datetime64[ns](2), int64(1), object(1)
memory usage: 21.4+ MB


**3. process emails file**

In [None]:
sample_emails.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 16 columns):
 #   Column                Non-Null Count  Dtype 
---  ------                --------------  ----- 
 0   cons_email_id         100 non-null    int64 
 1   cons_id               100 non-null    int64 
 2   cons_email_type_id    100 non-null    int64 
 3   is_primary            100 non-null    int64 
 4   email                 100 non-null    object
 5   canonical_local_part  58 non-null     object
 6   domain                100 non-null    object
 7   double_validation     55 non-null     object
 8   create_dt             100 non-null    object
 9   create_app            100 non-null    int64 
 10  create_user           100 non-null    int64 
 11  modified_dt           100 non-null    object
 12  modified_app          100 non-null    int64 
 13  modified_user         100 non-null    int64 
 14  status                100 non-null    int64 
 15  note                  6 non-null      obj

In [None]:
emails_df = read_src_df(emails_src, columns=['cons_email_id', 'cons_id', 'email','is_primary'])

1400000 records read


In [None]:
emails_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1400000 entries, 0 to 1399999
Data columns (total 4 columns):
 #   Column         Non-Null Count    Dtype 
---  ------         --------------    ----- 
 0   cons_email_id  1400000 non-null  int64 
 1   cons_id        1400000 non-null  int64 
 2   email          1400000 non-null  object
 3   is_primary     1400000 non-null  int64 
dtypes: int64(3), object(1)
memory usage: 42.7+ MB


**4. Inner join cons and email on cons_id**

In [None]:
#inner join person_df and emails_df on cons_id
person_email_df = pd.merge(cons_df, emails_df, on='cons_id', how='inner')
person_email_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1400000 entries, 0 to 1399999
Data columns (total 7 columns):
 #   Column         Non-Null Count    Dtype         
---  ------         --------------    -----         
 0   cons_id        1400000 non-null  int64         
 1   source         1400000 non-null  object        
 2   create_dt      1400000 non-null  datetime64[ns]
 3   updated_dt     1400000 non-null  datetime64[ns]
 4   cons_email_id  1400000 non-null  int64         
 5   email          1400000 non-null  object        
 6   is_primary     1400000 non-null  int64         
dtypes: datetime64[ns](2), int64(3), object(2)
memory usage: 85.4+ MB


**5. process subscription file**

In [None]:
sample_cons_sub.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 6 columns):
 #   Column                              Non-Null Count  Dtype 
---  ------                              --------------  ----- 
 0   cons_email_chapter_subscription_id  100 non-null    int64 
 1   cons_email_id                       100 non-null    int64 
 2   chapter_id                          100 non-null    int64 
 3   isunsub                             100 non-null    int64 
 4   unsub_dt                            100 non-null    object
 5   modified_dt                         100 non-null    object
dtypes: int64(4), object(2)
memory usage: 4.8+ KB


In [None]:
cons_sub_df = read_src_df(cons_sub_src, columns=['cons_email_id', 'chapter_id','isunsub','unsub_dt'])
cons_sub_df.info()

350000 records read
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 350000 entries, 0 to 349999
Data columns (total 4 columns):
 #   Column         Non-Null Count   Dtype 
---  ------         --------------   ----- 
 0   cons_email_id  350000 non-null  int64 
 1   chapter_id     350000 non-null  int64 
 2   isunsub        350000 non-null  int64 
 3   unsub_dt       350000 non-null  object
dtypes: int64(3), object(1)
memory usage: 10.7+ MB


**6. Create final merge output of people, email and subscription**

In [None]:
people_raw_df = pd.merge(person_email_df, cons_sub_df, on='cons_email_id', how='inner')
people_raw_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 350000 entries, 0 to 349999
Data columns (total 10 columns):
 #   Column         Non-Null Count   Dtype         
---  ------         --------------   -----         
 0   cons_id        350000 non-null  int64         
 1   source         350000 non-null  object        
 2   create_dt      350000 non-null  datetime64[ns]
 3   updated_dt     350000 non-null  datetime64[ns]
 4   cons_email_id  350000 non-null  int64         
 5   email          350000 non-null  object        
 6   is_primary     350000 non-null  int64         
 7   chapter_id     350000 non-null  int64         
 8   isunsub        350000 non-null  int64         
 9   unsub_dt       350000 non-null  object        
dtypes: datetime64[ns](2), int64(5), object(3)
memory usage: 29.4+ MB


In [None]:
#only return chapter_id = 1
people_df = people_raw_df[people_raw_df['chapter_id'] == 1].copy()

In [None]:
#adding a boolean column that does a group by on cons_id and returns a 1 if both isunsub and is_primary flag is 1. This retains the row granularity.
people_df['is_unsub'] = people_df.groupby('cons_id')['isunsub'].transform('max') & people_df.groupby('cons_id')['is_primary'].transform('max')

In [None]:
people_df = people_df[['email', 'source', 'is_unsub','create_dt', 'updated_dt']]

#export to csv file
people_df.to_csv('people.csv',index=False)

**7. Create an acquisition fact file based on people.csv**

assumption: the index here is now email and we lose cons_id as the basis for calculating acquisition

In [None]:
#create a new dataframe called acquisition df. Since we already converted the columns to datetime, we can utilize the grouper pandas function to roll-up to counts by Day
acquisition_df = people_df.groupby(pd.Grouper(key='create_dt', freq='D'))['email'].agg(['count']).rename(columns={'count':'acquisitions'}).reset_index()

In [None]:
#rename create_dt to acquisition_date
acquisition_df.rename(columns={'create_dt':'acquisition_date'}, inplace=True)
acquisition_df

Unnamed: 0,acquisition_date,acquisitions
0,1970-01-01,13
1,1970-01-02,14
2,1970-01-03,11
3,1970-01-04,18
4,1970-01-05,17
...,...,...
18440,2020-06-27,12
18441,2020-06-28,18
18442,2020-06-29,18
18443,2020-06-30,13


In [None]:
#sanity check where we expect 13 records
people_df[(people_df['create_dt'] >= '2020-07-01') & (people_df['create_dt'] < '2020-07-02')]

Unnamed: 0,email,source,is_unsub,create_dt,updated_dt
15835,kimberlyhopkins@harris-lewis.com,unknown,1,2020-07-01 07:57:02,2001-06-23 20:56:52
60819,cynthia45@vance.com,organic,1,2020-07-01 01:48:37,1995-08-18 12:17:02
61712,jenniferstone@serrano.net,facebook,1,2020-07-01 16:43:24,1973-06-05 13:50:05
61713,morgandaniels@yahoo.com,facebook,1,2020-07-01 16:43:24,1973-06-05 13:50:05
113834,nortonsandra@hartman-miller.com,google,0,2020-07-01 10:01:00,1981-01-09 02:00:18
194772,rjohnson@williams.org,organic,1,2020-07-01 10:33:22,2011-11-14 05:35:19
198442,paulcantu@randolph.com,unknown,1,2020-07-01 01:29:37,1978-12-09 10:21:29
228368,jwoodward@mayer-martin.com,google,0,2020-07-01 01:09:40,1982-05-08 08:05:21
229425,vhatfield@mcpherson.info,unknown,0,2020-07-01 06:44:15,1996-07-16 01:00:38
234066,matthewwhite@king.com,organic,1,2020-07-01 13:52:48,1993-04-09 10:28:04


In [None]:
#save to csv
acquisition_df.to_csv('acquisitions.csv',index=False)