# Data Engineer Exercise 1

The following code outlines the creation of the `src/data-engineer_exercise-1.py` script that creates
the `results/people.csv` data set.

In [1]:
import pandas as pd
import datetime as dt

pd.options.mode.chained_assignment = None

### Read in the data sources and view columns from each.

In [3]:
# Read in data sources:
# Constituent data
cons_data = pd.read_csv(r'https://als-hiring.s3.amazonaws.com'
                        r'/fake_data/2020-07-01_17%3A11%3A00/cons.csv')
# Constituent e-mail data
email_data = pd.read_csv(r'https://als-hiring.s3.amazonaws.com'
                         r'/fake_data/2020-07-01_17%3A11%3A00/cons_email.csv')
# Constituent subscription data
sub_data = pd.read_csv(r'https://als-hiring.s3.amazonaws.com'
                       r'/fake_data/2020-07-01_17%3A11%3A00/'
                       r'cons_email_chapter_subscription.csv')

In [123]:
# cons_data columns:
cons_data.columns

Index(['cons_id', 'prefix', 'firstname', 'middlename', 'lastname', 'suffix',
       'salutation', 'gender', 'birth_dt', 'title', 'employer', 'occupation',
       'income', 'source', 'subsource', 'userid', 'password', 'is_validated',
       'is_banned', 'change_password_next_login', 'consent_type_id',
       'create_dt', 'create_app', 'create_user', 'modified_dt', 'modified_app',
       'modified_user', 'status', 'note'],
      dtype='object')

In [124]:
# cons_data columns:
email_data.columns

Index(['cons_email_id', 'cons_id', 'cons_email_type_id', 'is_primary', 'email',
       'canonical_local_part', 'domain', 'double_validation', 'create_dt',
       'create_app', 'create_user', 'modified_dt', 'modified_app',
       'modified_user', 'status', 'note'],
      dtype='object')

In [125]:
# sub_data columns:
sub_data.columns

Index(['cons_email_chapter_subscription_id', 'cons_email_id', 'chapter_id',
       'isunsub', 'unsub_dt', 'modified_dt'],
      dtype='object')

### Assumptions and Plan

This script assumes that this instruction...

>We only care about subscription statuses where chapter_id is 1.

...indicates that constituents in chapters other than chapter one should not be included in the final table.

**To create the final data product, necessary columns include:**

From the constituent data:

- `source`
- `cons_id` -- for merging
- `created_dt`
- `modified_dt` 

From the e-mail data:

- `email`
    - `is_primary == 1`
- `cons_email_id` -- for merging
- `created_dt`
- `modified_dt` 
    
From the subscription data:

- `isunsub`
    - `chapter_id == 1`
- `cons_email_id` -- for merging
- `created_dt`
- `modified_dt` 
    
**Join keys:**

`cons_data` can be merged with `email_data` on `cons_id`, assuming that this id is unique to a constituent.

`email_data` can be merged with `sub_data` on `cons_email_id`, assuming that this id is unique to a constituent.

Checks on these assumptions are shown below:

In [126]:
# Check that each constituent id in the cons_data table appears only once (is unique):
assert all(cons_data.cons_id.value_counts().values == 1), \
"cons_id is not unique in the cons_data table"

# Check that each email_id in the email_data table appears only once (is unique):
assert all(email_data.cons_email_id.value_counts().values == 1), \
"cons_email_id is not unique in the email_data table"


## Filter data to specified restrictions:

- Relevant columns listed above for each dataframe
- Only primary e-mail addresses
- Only chapter 1 subscriptions

In [127]:
# Relevant columns for each dataframe
cons_cols = ['cons_id', 'source', 'create_dt', 'modified_dt']
email_cols = ['cons_id', 'cons_email_id', 'email', 'create_dt', 'modified_dt']
sub_cols = ['cons_email_id', 'isunsub', 'chapter_id', 'modified_dt']

# Drop irrelevant columns from constituent data
cons_data_filtered = cons_data.drop(columns = [col for col in cons_data.columns
                                              if col not in cons_cols])

# Filter e-mail data to primary address and drop irrelevant columns
email_data_filtered = (email_data[email_data.is_primary == 1]
                       .drop(columns = [col for col in 
                                        email_data.columns 
                                        if col not in email_cols]))

# Filter e-mail data to primary address and drop irrelevant columns
email_data_filtered = (email_data[email_data.is_primary == 1]
                       .drop(columns = [col for col in 
                                        email_data.columns 
                                        if col not in email_cols]))


# Filter subscription data to chapter 1 and drop irrelevant columns
sub_data_filtered = (sub_data[sub_data.chapter_id == 1]
                     .drop(columns = [col for col in 
                                     sub_data.columns
                                     if col not in sub_cols]))

In [128]:
# Check that all non-primary emails were removed
assert (email_data[email_data.is_primary == 0].shape[0] == 
        email_data.shape[0] - email_data_filtered.shape[0]), \
"All rows with non-primary emails should be removed"

# Check that each constituent has only 1 primary email
assert (all(email_data_filtered.cons_id.value_counts() == 1)), \
"Some constituents have more than one primary email"

## Merge dataframes

Join `cons_data` and `email_data` on the column `cons_id`. Perform an inner-join because constituents without recorded email addresses will not have any information in the subscription data. Therefore, there is no use to keeping them in the final table.

In [129]:
cons_email = pd.merge(cons_data_filtered, email_data_filtered, on='cons_id', how='right')

Next, join the subscription data to the merged cons/email dataframe. This will be a left join since we're assuming that email addresses that do not appear in the subscription data are still subscribed.

In [130]:
crm_df = pd.merge(cons_email, sub_data_filtered, on = 'cons_email_id', how='left')

## Determine `created_dt` and `updated_df`

Next, we'll find the create date and modified date for each constituent. 

- `create_dt` will be the minimum value (earliest timestamp) from the constituent and email data sets.
- `modified_dt` will by the maximum value (latest timestamp) from the constituent, email, and subscription data sets.

In [133]:
crm_df[['create_dt_x', 'create_dt_y',
       'modified_dt_x', 'modified_dt_y', 'modified_dt']] = \
    (crm_df[['create_dt_x', 'create_dt_y',
            'modified_dt_x', 'modified_dt_y', 'modified_dt']]
     .apply(lambda x: pd.to_datetime(x, 
                                     format = '%a, %Y-%m-%d %H:%M:%S'),
            axis = 1))

In [134]:
crm_df['created_dt'] = crm_df[['create_dt_x', 'create_dt_y']].min(axis = 1)
crm_df['updated_dt'] = crm_df[['create_dt_x', 'create_dt_y']].min(axis = 1)

## Final Adjustments

In [135]:
# Emails that are not included in the subscription data are assumed to be subscribed
crm_df['isunsub'] = crm_df['isunsub'].fillna(0.0)

In [136]:
# Drop extraneous columns
crm_df = crm_df.drop(columns = ['create_dt_x', 'create_dt_y', 'modified_dt_x', 
                                'modified_dt_y', 'modified_dt', 'cons_id', 
                                'cons_email_id', 'chapter_id'])

## Export to csv

In [138]:
crm_df.to_csv('../results/people.csv', header=crm_df.columns.tolist(), index=False)