### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [1]:
# Import Python packages 
import pandas as pd
import re
import os
import glob
import numpy as np
import json
import csv

In [2]:
from sql_queries.create_tables import create_table_queries
from sql_queries.insert_queries import insert_table_queries

In [3]:
airport_df = pd.read_csv("data_second_cleaning/airports.csv")
states_df = pd.read_csv("data_second_cleaning/states.csv")
temp_df = pd.read_csv("data_second_cleaning/temperature.csv")

In [4]:
#Takes 10-15 minutes to load
imm_df_raw =  pd.read_sas("../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat", format='sas7bdat', index=None, encoding=None, chunksize=None, iterator=False)

### Transform data into load-ready dataframes and insert

In [5]:
#transform destination_table
states_df['dest_id'] = states_df['State']
dest_cols = ['dest_id','State','Median Age','Total Population','White','Black or African-American','Asian','Hispanic or Latino','American Indian and Alaska Native','Foreign-born']
destination_df = states_df[dest_cols]

In [6]:
#transform geography_table (join airport_df and temp_df)
#airport_df transform to join into geography_df

#### Basis for joining temperature and airport data
Problems:
- Cannot join on city because there are multiple cities with the same name
- Cannot easily take a city/country combination because countries are not in the same format
- Cannot take the coordinates because the airport coordinates or for the airport vs. the temp which is the city center  

Solution:
- Only use cities with large airports to avoid repeated names

In [7]:
large_airports = airport_df[airport_df.airport_size=='large_airport']
double_airports = large_airports[large_airports.duplicated(subset=['municipality'],keep=False)]
print(double_airports[double_airports.municipality=='London'])

     airport_identifier   airport_size             airport_name  elevation_ft  \
1205               EGGW  large_airport     London Luton Airport         526.0   
1219               EGKK  large_airport   London Gatwick Airport         202.0   
1224               EGLL  large_airport  London Heathrow Airport          83.0   
1261               EGSS  large_airport  London Stansted Airport         348.0   

     continent country iso_region municipality gps_code iata_code local_code  \
1205        EU      GB     GB-ENG       London     EGGW       LTN        NaN   
1219        EU      GB     GB-ENG       London     EGKK       LGW        NaN   
1224        EU      GB     GB-ENG       London     EGLL       LHR        NaN   
1261        EU      GB     GB-ENG       London     EGSS       STN        NaN   

                                   coordinates  
1205  -0.36833301186561584, 51.874698638916016  
1219                      -0.190278, 51.148102  
1224                        -0.461941, 51.4706

The large airports dataset does have rows with the same city in multiple rows, but that is because the city has multiple airports and not because they are for different cities.

In [8]:
#join airport_df and temp_df into geography_df
geography_df = large_airports.copy()
geography_df = geography_df.rename(columns={'municipality':'City'})

geography_df = geography_df.set_index('City').join(temp_df.set_index('City'), how = 'left', lsuffix ='_airports', rsuffix='_temp')
geography_df = geography_df.reset_index()

#clean geography_df into insert_ready format
geography_cols = ['airport_id', 'state', 'country', 'city', 'airport_name', 'elevation', 'continent', 'avg_yrly_temp', \
                  'mnthly_high_temp', 'mnthly_low_temp', 'temp_delta_10_yrs', 'temp_delta_20_yrs']
geography_df = geography_df.rename(columns={'State':'state', 'City':'city', 'elevation_ft':'elevation', 'iata_code':'airport_id'})
geography_df = geography_df[geography_cols]

In [9]:
#date_table transform
import datetime
dates_df = pd.to_timedelta(imm_df_raw.arrdate.unique(), unit='D') + pd.Timestamp('1960-1-1')
dates_df = pd.DataFrame(np.sort(dates_df))
dates_df.columns = ['date_id']
dates_df.date_id = pd.to_datetime(dates_df.date_id)
dates_df['day'] = pd.DatetimeIndex(dates_df.date_id).day
dates_df['week'] = pd.DatetimeIndex(dates_df.date_id).week
dates_df['month'] = pd.DatetimeIndex(dates_df.date_id).month
dates_df['year'] = pd.DatetimeIndex(dates_df.date_id).year
dates_df['date_id'] = dates_df['date_id'].astype(str)

In [10]:
#uncomment this cell to work with a subset of the immigration data only
#imm_df_copy = imm_df_raw.copy() #to avoid needing to read the sas data directly as that is very slow
#imm_mini_df = imm_df_copy.head(250000)

In [13]:
def clean_imm(df):
    df = df.drop(['i94yr', 'i94mon', 'insnum', 'admnum', 'fltno', 'dtaddto', 'count', 'delete_days', 
       'delete_mexl', 'delete_dup', 'delete_visa', 'delete_recdup', 'validres', 'i94cit', 'depdate', 'dtadfile'], axis=1)
    df = df.rename(columns={'cicid':'imm_id', 'i94res':'country_of_origin', 'visatype':'visatype', 'dtaddto':'i94_date',
                            'biryear':'birth_year', 'i94bir':'age', 'i94port':'airport_id', 'gender':'gender', 'i94mode':'mode_transport', 
                            'visapost':'dos_issuing_office', 'entdepa':'arrival_flag', 'entdepu':'update_flag', 'entdepd':'departure_flag', 
                            'entdepm':'matching_flag', 'airline':'airline', 'i94visa':'visa_reason', 'occup':'occupation',
                            'arrdate':'date_id', 'i94addr': 'dest_id'})

    df.age = df.age.fillna(0)
    df.country_of_origin = df.country_of_origin.fillna(0)
    df.imm_id = df.imm_id.fillna(0)
    df.birth_year = df.birth_year.fillna(1900)
    
    astype_dict = {'imm_id': int, 'country_of_origin': int, 'age': int, 'birth_year': int, 
                   'airport_id': str, 'mode_transport': str,'dest_id': str,'visa_reason': str,'dos_issuing_office': str,'occupation': str,
                   'arrival_flag': str,'departure_flag': str, 'update_flag': str, 'matflag': str, 'gender': str,'airline': str,'visatype': str,
               } 
    df = df.astype(astype_dict) 
       
    country_df = df[['country_of_origin', 'birth_year']]
    country_matrix = pd.read_csv('Misc/i94res matrix.csv', delimiter='\t')
    country_matrix.country_of_origin = country_matrix.country_of_origin.astype(int)
    country_df = country_df.join(country_matrix, on ='country_of_origin', how ='left', lsuffix='delete')
    df.country_of_origin = country_df.Country
    
    df.loc[df.mode_transport=='1.0', 'mode_transport'] = 'Air'
    df.loc[df.mode_transport=='2.0', 'mode_transport'] = 'Sea'
    df.loc[df.mode_transport=='3.0', 'mode_transport'] = 'Land'
    
    df.loc[df.visa_reason=='1.0', 'visa_reason'] = 'Business'
    df.loc[df.visa_reason=='2.0', 'visa_reason'] = 'Pleasure'
    df.loc[df.visa_reason=='3.0', 'visa_reason'] = 'Student'

    df.date_id = pd.to_timedelta(df.date_id, unit='D') + pd.Timestamp('1960-1-1')
    df.date_id = df.date_id.dt.strftime('%Y-%m-%d')
    
    b_first = ['dest_id', 'airport_id', 'dos_issuing_office', 'arrival_flag', 'departure_flag', 'update_flag', 'matflag', 'gender', 'airline', 'visatype']
    for col in b_first:
        df[col] = df[col].apply(lambda x : x[1:] if x.startswith("b") else x)
        
    return df

In [14]:
#immigration_table
#imm_df = clean_imm(imm_mini_df)
#60 minutes
imm_df = clean_imm(imm_df_raw)

In [15]:
print(imm_df.head(1))

   imm_id           country_of_origin airport_id     date_id mode_transport  \
0       4  MAYOTTE (AFRICA - FRENCH)'      'XXX'  2016-06-07            nan   

  dest_id  age visa_reason dos_issuing_office occupation arrival_flag  \
0     nan   59    Pleasure                nan        nan          'Z'   

  departure_flag update_flag matflag  birth_year gender airline visatype  
0            nan         'U'     nan        1957    nan     nan     'WT'  


As a reminder from step 1, the raw immigration dataset has the following fields:
- cicid: unique identifier
- i94yr: 4 digit year
- i94mon: numeric month
- i94cit: origin code for processing (3 numbers)
- i94res: origin code for processing (3 numbers)
- i94port: airport code of arrival to US (3 letters)
- arrdate: Arrival date to US
- i94mode: Mode of transportation on arrival (air, sea, land as 1, 2, and 3 respectively)
- i94addr: 2 letter state code of destination
- depdate: Departure date from US
- i94bir: Age of individual in years
- i94visa: visa reason codes (1, 2, 3 for business, pleasure or student respectively)
- count: contains value of 1 for summarizing
- dtadfile: date field yyyymmdd format
- visapost: dept of state branch issuing visa
- occup: occupation to perform in US
- entdepa: arrival flag  (admitted or paroled)
- entdepd: departure flag (departed, lost or deceased)
- entdepu: update flag (apprehended, overstayed, adjusted to permanent residence)
- matflag: flag if arrival/departure records matching
- biryear: 4 digit birth year
- dtaddto: date allowed to stay in US until
- gender: gender code (1 letter abbreviation)
- insnum: INS number
- airline: airline flown to arrive in US
- admnum: admission number
- fltno: flight number of airline flown to arrive in US
- visatype: admission class of visa for non-immigrant family

In [16]:
#10 minutes
immigration_cols = list(imm_df.columns)
immigration_cols.remove('dest_id')
immigration_cols.remove('date_id')
immigration_cols.remove('airport_id')
immigration_df = imm_df[immigration_cols]

In [17]:
#facts_table
facts_cols = ['imm_id', 'date_id', 'dest_id', 'country_of_origin', 'airport_id']
facts_df = imm_df[facts_cols]
facts_cols[3] = 'origin_id'
facts_df.columns = facts_cols

In [18]:
import re

def format_for_load(df):
    df1 = df.replace("'", "", regex=True)
    df2 = df1.replace('"', '', regex=True)   
    df3 = df2.replace("nan", "NULL")
    df4 = df3.replace(np.nan, 'NULL')
    return df4

In [19]:
#55 minutes
raw_load_dfs = [facts_df, destination_df, geography_df, dates_df, immigration_df]
load_dfs = []
for df in raw_load_dfs:
    df2 = format_for_load(df)
    load_dfs.append(df2)

In [21]:
df_nulls = load_dfs[2].replace('NULL', -100)
load_dfs[2] = df_nulls.copy()

## Create Redshift DB

In [2]:
import configparser
import psycopg2
from sql_queries.quality_checks import quality_check_queries

In [3]:
config = configparser.ConfigParser()
config.read('Misc/dwh.cfg')

['Misc/dwh.cfg']

In [4]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [25]:
#create tables
for create_query in create_table_queries:
    cur.execute(create_query)
    conn.commit()

In [None]:
#run insert table queries
#30 minutes
for df, insert_query in zip(load_dfs, insert_table_queries):
    query = insert_query
    for i,row in df.iterrows():
        if i <2: print(str(tuple(row.values)))
        query = query + str(tuple(row.values)) + ', ' 
        if (i+1)%10000==0:
            cur.execute(query[:-2])
            conn.commit()
            query = insert_query
    if query[-7:] != 'values ':
        cur.execute(query[:-2])
        conn.commit()

(4, '2016-06-07', 'NULL', 'MAYOTTE (AFRICA - FRENCH)', 'XXX')
(5, '2016-06-07', 'NULL', 'MAYOTTE (AFRICA - FRENCH)', 'XXX')
('AK', 'AK', 32.2, 0.3, 71.2, 7.7, 12.3, 9.1, 12.2, 11.1)
('AL', 'AL', 36.23, 0.8999999999999999, 47.5, 49.6, 2.7, 3.7, 0.8, 5.0)
('AAL', -100, 'DK', 'Aalborg', 'Aalborg Airport', 10.0, 'EU', -100.0, -100.0, -100.0, -100.0, -100.0)
('ABZ', -100, 'GB', 'Aberdeen', 'Aberdeen Dyce Airport', 215.0, 'EU', 14.628, 14.77, 2.79, -1.4800000000000004, 0.009999999999999787)
('2016-06-01', 1, 22, 6, 2016)
('2016-06-02', 2, 22, 6, 2016)
(4, 'MAYOTTE (AFRICA - FRENCH)', 'NULL', 59, 'Pleasure', 'NULL', 'NULL', 'Z', 'NULL', 'U', 'NULL', 1957, 'NULL', 'NULL', 'WT')
(5, 'MAYOTTE (AFRICA - FRENCH)', 'NULL', 50, 'Pleasure', 'NULL', 'NULL', 'Z', 'NULL', 'U', 'NULL', 1966, 'NULL', 'NULL', 'WT')


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [21]:
for quality_query in quality_check_queries[:-1]:
    cur.execute(quality_query)
    try:
        print(f'{cur.fetchone()[0]} rows returned by {quality_query}.')
    except:
        print(f'{quality_query} returned an empty table!')

3574989 rows returned by SELECT COUNT(*) FROM facts_table.
48 rows returned by SELECT COUNT(*) FROM destination_table.
610 rows returned by SELECT COUNT(*) FROM geography_table.
30 rows returned by SELECT COUNT(*) FROM date_table.


In [22]:
imm_dest_report = '''SELECT COUNT(imm_id) as Num_People, dest_id as State
FROM facts_table
GROUP BY State
ORDER BY Num_People DESC
LIMIT 10;'''

In [7]:
cur.execute(imm_dest_report)
results = cur.fetchall()
print('Most popular destination of immigrants:')
print(f'\n{imm_dest_report}\n\nreturns the following:')
for x in results:
    print([y for y in x])
conn.commit()

Most popular destination of immigrants:

SELECT COUNT(imm_id) as Num_People, dest_id as State
FROM facts_table
GROUP BY State
ORDER BY Num_People DESC
LIMIT 10;

returns the following:
[603181, 'CA']
[589603, 'NY']
[584520, 'FL']
[186031, 'NULL']
[184452, 'HI']
[144576, 'TX']
[121027, 'IL']
[113264, 'NV']
[105693, 'GU']
[105193, 'MA']


In [19]:
imm_latino_report = '''SELECT COUNT(FT.imm_id) as Num_People, FT.dest_id as State, DT.latino
FROM facts_table FT JOIN destination_table DT ON FT.dest_id = DT.dest_id
WHERE DT.latino>20
GROUP BY FT.dest_id, DT.latino
ORDER BY Num_People DESC
LIMIT 10;'''

In [20]:
cur.execute(imm_latino_report)
results = cur.fetchall()
print('Most popular destination of immigrants to states with >20% latino population:')
print(f'\n{imm_latino_report}\n\nreturns the following:')
for x in results:
    print([y for y in x])
conn.commit()

Most popular destination of immigrants to states with >20% latino population:

SELECT COUNT(FT.imm_id) as Num_People, FT.dest_id as State, DT.latino
FROM facts_table FT JOIN destination_table DT ON FT.dest_id = DT.dest_id
WHERE DT.latino>20
GROUP BY FT.dest_id, DT.latino
ORDER BY Num_People DESC
LIMIT 10;

returns the following:
[603181, 'CA', 39.7]
[589603, 'NY', 27.8]
[584520, 'FL', 28.9]
[144576, 'TX', 44.1]
[121027, 'IL', 26.6]
[113264, 'NV', 30.8]
[105193, 'MA', 21.5]
[105030, 'NJ', 42.0]
[28878, 'CO', 24.0]
[21620, 'AZ', 33.5]


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### See Data_Dictionary.txt