# Project Title
### Data Engineering Capstone Project

#### Project Summary

This project is aim to create a data warehouse with ETL pipeline. The analytical data warehouse will enable U.S. officials to find untrivial insights of travellers/immigrants' patterns to the U.S. For example, is there any collrelation between temperature and the number of travellers? If so, is the correlation postisive or negative? And how strong is the correlation? 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
%load_ext autoreload
%autoreload 2

import numpy as np
import re
import psycopg2
from sql_queries import drop_table_queries, create_table_queries, insert_table_queries
from sqlalchemy import create_engine

import pandas as pd
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

import warnings
warnings.filterwarnings('ignore')

### Step 1: Scope the Project and Gather Data

#### Scope 

The final deliverable of this project will be a data warehouse. I94 immigration, global temperature and U.S. demograthpics and immigration label description datasets will be use to crate the analytical database. The data warehouse will provide an enriched view over the immigration data.

The project will use Python, Pandas and Postgres to build ETL pipeline and then the data warehouse.

#### Describe and Gather Data 

#### I94 Immigration Data

This data comes from the [US National Tourism and Trade Office](https://travel.trade.gov/research/reports/i94/historical/2016.html). The I94 dataset used in this project is provided by Udacity.com. The dataset (SAS7BDAT format) contains 12 reports in 2016, each of which contains monthly international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries). Due to the limited computation power, the project will use the April 2016 dataset here.

In [2]:
# Read in the I94 immigration data
fname_immigration = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_immigration = pd.read_sas(fname_immigration, 'sas7bdat', encoding="ISO-8859-1")

In [3]:
df_immigration.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


#### I94 Immigration Data Dictionary

This data dictionary (SAS format) is provided along with the I94 Immgration dataset by Udacity.com. It contains labels descriptions for the I94 dataset.

In [4]:
# Read in the I94 immigration data dictionary
fname_immigration_dict = './I94_SAS_Labels_Descriptions.SAS'
with open(fname_immigration_dict) as file:
    immigration_data_dict = file.readlines()
    
# Strip the leading or trailing space in each line
immigration_data_dict_stripped = [line.strip() for line in immigration_data_dict]

# Print the first 20 entries
immigration_data_dict_stripped[:20]

["libname library 'Your file location' ;",
 'proc format library=library ;',
 '',
 '/* I94YR - 4 digit year */',
 '',
 '/* I94MON - Numeric month */',
 '',
 '/* I94CIT & I94RES - This format shows all the valid and invalid codes for processing */',
 'value i94cntyl',
 "582 =  'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)'",
 "236 =  'AFGHANISTAN'",
 "101 =  'ALBANIA'",
 "316 =  'ALGERIA'",
 "102 =  'ANDORRA'",
 "324 =  'ANGOLA'",
 "529 =  'ANGUILLA'",
 "518 =  'ANTIGUA-BARBUDA'",
 "687 =  'ARGENTINA '",
 "151 =  'ARMENIA'",
 "532 =  'ARUBA'"]

#### World Temperature Data

This dataset (CSV format) came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

In [5]:
# Read in the world temperature data
fname_temperature = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = pd.read_csv(fname_temperature)

In [6]:
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


#### U.S. City Demographic Data

This data (CSV format) comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

In [7]:
# Read in the US city demographic data
fname_demographics = './us-cities-demographics.csv'
df_demographics = pd.read_csv(fname_demographics, sep=';')

In [8]:
df_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


#### Airport Code Table

This is a simple table (CSV format) of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).

In [9]:
# Read in the airport code data
fname_airport = './airport-codes_csv.csv'
df_airport = pd.read_csv(fname_airport)

In [10]:
df_airport.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


### Step 2: Explore and Assess the Data

#### Explore I94 Immigration Data Dictionary

In [11]:
immigration_data_dict_stripped[:30]

["libname library 'Your file location' ;",
 'proc format library=library ;',
 '',
 '/* I94YR - 4 digit year */',
 '',
 '/* I94MON - Numeric month */',
 '',
 '/* I94CIT & I94RES - This format shows all the valid and invalid codes for processing */',
 'value i94cntyl',
 "582 =  'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)'",
 "236 =  'AFGHANISTAN'",
 "101 =  'ALBANIA'",
 "316 =  'ALGERIA'",
 "102 =  'ANDORRA'",
 "324 =  'ANGOLA'",
 "529 =  'ANGUILLA'",
 "518 =  'ANTIGUA-BARBUDA'",
 "687 =  'ARGENTINA '",
 "151 =  'ARMENIA'",
 "532 =  'ARUBA'",
 "438 =  'AUSTRALIA'",
 "103 =  'AUSTRIA'",
 "152 =  'AZERBAIJAN'",
 "512 =  'BAHAMAS'",
 "298 =  'BAHRAIN'",
 "274 =  'BANGLADESH'",
 "513 =  'BARBADOS'",
 "104 =  'BELGIUM'",
 "581 =  'BELIZE'",
 "386 =  'BENIN'"]

#### Clean I94 Immigration Data Dictionary

In [12]:
# Select the port entries
ports = immigration_data_dict_stripped[302:893]

# Create a dictionary that maps a port to its corresponding city
dict_ports = {}
for port in ports:
    match = re.search(r"\'(\w+)\'.*\'(.+),", port)
    if match:
        dict_ports[match[1]] = match[2].title()

In [13]:
list(dict_ports.items())

[('ALC', 'Alcan'),
 ('ANC', 'Anchorage'),
 ('BAR', 'Baker Aaf - Baker Island'),
 ('DAC', 'Daltons Cache'),
 ('PIZ', 'Dew Station Pt Lay Dew'),
 ('DTH', 'Dutch Harbor'),
 ('EGL', 'Eagle'),
 ('FRB', 'Fairbanks'),
 ('HOM', 'Homer'),
 ('HYD', 'Hyder'),
 ('JUN', 'Juneau'),
 ('5KE', 'Ketchikan'),
 ('KET', 'Ketchikan'),
 ('MOS', 'Moses Point Intermediate'),
 ('NIK', 'Nikiski'),
 ('NOM', 'Nom'),
 ('PKC', 'Poker Creek'),
 ('ORI', 'Port Lions Spb'),
 ('SKA', 'Skagway'),
 ('SNP', 'St. Paul Island'),
 ('TKI', 'Tokeen'),
 ('WRA', 'Wrangell'),
 ('HSV', 'Madison County - Huntsville'),
 ('MOB', 'Mobile'),
 ('LIA', 'Little Rock'),
 ('ROG', 'Rogers Arpt'),
 ('DOU', 'Douglas'),
 ('LUK', 'Lukeville'),
 ('NAC', 'Naco'),
 ('NOG', 'Nogales'),
 ('PHO', 'Phoenix'),
 ('POR', 'Portal'),
 ('SLU', 'San Luis'),
 ('SAS', 'Sasabe'),
 ('TUC', 'Tucson'),
 ('YUI', 'Yuma'),
 ('AND', 'Andrade'),
 ('BUR', 'Burbank'),
 ('CAL', 'Calexico'),
 ('CAO', 'Campo'),
 ('FRE', 'Fresno'),
 ('ICP', 'Imperial County'),
 ('LNB', 'Long Be

#### Explore I94 Immigration Data

In [14]:
# Utility functions that are used to explore a dataset in a dataframe format
def number_of_rows_columns(df):
    """Display number of rows and columns of a dataframe"""
    print('Number of rows: {:,d}\nNumber of columns: {}'.format(*df.shape))
    
def number_of_duplicated_rows(df):
    """Display number of duplicated rows of a dataframe"""
    print('Nubmer of duplicated rows: {}'.format(df.duplicated().sum()))
    
def number_of_missing_values(df):
    """Display missing values information of a dataframe"""
    # Number of missing values
    n_missing = df.isnull().sum()

    # Percentage of missing values in each column
    n_missing_pct = np.round(n_missing / df.shape[0] * 100, 2)

    # Print the missing values info
    df_returned = pd.DataFrame({
        'Column': n_missing.index,
        'Number of Missing Values': n_missing.values,
        'Percentage of Missing Values': n_missing_pct.values
        }).sort_values('Percentage of Missing Values', ascending=False)
    
    return df_returned

In [15]:
df_immigration.sample(5)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
359420,709261.0,2016.0,4.0,438.0,438.0,DAL,20548.0,1.0,TN,20567.0,48.0,2.0,1.0,20160404,,,O,O,,M,1968.0,7022016,,,QF,55599140000.0,7,WT
2428406,4914052.0,2016.0,4.0,254.0,276.0,LOS,20570.0,1.0,CA,20574.0,64.0,2.0,1.0,20160426,,,O,O,,M,1952.0,7242016,,,KE,59282280000.0,17,WT
2558106,5205888.0,2016.0,4.0,111.0,111.0,HOU,20572.0,1.0,NY,20580.0,35.0,2.0,1.0,20160428,,,G,O,,M,1981.0,7262016,F,,FI,59423020000.0,623,WT
350680,698364.0,2016.0,4.0,251.0,251.0,LOS,20548.0,1.0,CA,20561.0,28.0,2.0,1.0,20160404,TLV,,G,O,,M,1988.0,10032016,M,,LY,92715590000.0,5,B2
1928729,3883372.0,2016.0,4.0,209.0,209.0,TOR,20565.0,1.0,NY,20574.0,17.0,2.0,1.0,20160421,,,G,O,,M,1999.0,7192016,F,,AC,56522580000.0,718,WT


In [16]:
number_of_rows_columns(df_immigration)

Number of rows: 3,096,313
Number of columns: 28


In [17]:
number_of_duplicated_rows(df_immigration)

Nubmer of duplicated rows: 0


In [18]:
number_of_missing_values(df_immigration)

Unnamed: 0,Column,Number of Missing Values,Percentage of Missing Values
18,entdepu,3095921,99.99
15,occup,3088187,99.74
23,insnum,2982605,96.33
14,visapost,1881250,60.76
22,gender,414269,13.38
8,i94addr,152372,4.92
9,depdate,142457,4.6
17,entdepd,138429,4.47
19,matflag,138429,4.47
24,airline,83627,2.7


#### Clean I94 Immigration Data

In [19]:
# Filter immigratin entries with valid i94 port
df_immigration_filtered = df_immigration[df_immigration.i94port.isin(dict_ports)]

In [20]:
# Create a city column whose values are U.S. city names
df_immigration_filtered['city'] = df_immigration_filtered['i94port'].apply(lambda x: dict_ports[x])

In [21]:
# Select columns for fact table data modeling
df_immigration_filtered = df_immigration_filtered[
    [
        'cicid', 'i94cit', 'i94res', 'biryear', 
        'gender', 'i94port', 'arrdate', 'i94mode', 
        'city', 'i94addr', 'depdate', 'i94visa', 
        'visatype', 'admnum', 'airline', 'fltno', 'count'
    ]
]

In [22]:
# Drop rows with null values
df_immigration_filtered.dropna(inplace=True)

# Drop rows where the i94addr file contains null, i.e. '\x00\x00'
df_immigration_filtered = df_immigration_filtered[~(df_immigration_filtered.i94addr == '\x00\x00')]

In [23]:
number_of_rows_columns(df_immigration_filtered)

Number of rows: 2,306,895
Number of columns: 17


In [24]:
number_of_missing_values(df_immigration_filtered)

Unnamed: 0,Column,Number of Missing Values,Percentage of Missing Values
0,cicid,0,0.0
9,i94addr,0,0.0
15,fltno,0,0.0
14,airline,0,0.0
13,admnum,0,0.0
12,visatype,0,0.0
11,i94visa,0,0.0
10,depdate,0,0.0
8,city,0,0.0
1,i94cit,0,0.0


In [25]:
# Covert to correct data types that will match the fact table schema
df_immigration_filtered['cicid'] = df_immigration_filtered['cicid'].astype('int32').astype('str')
df_immigration_filtered['i94cit'] = df_immigration_filtered['i94cit'].astype('int32').astype('str')
df_immigration_filtered['i94res'] = df_immigration_filtered['i94res'].astype('int32').astype('str')
df_immigration_filtered['biryear'] = df_immigration_filtered['i94res'].astype('int32')
df_immigration_filtered['arrdate'] = df_immigration_filtered['arrdate'].astype('int64')
df_immigration_filtered['i94mode'] = df_immigration_filtered['i94mode'].astype('int32').astype('str')
df_immigration_filtered['depdate'] = df_immigration_filtered['depdate'].astype('int64')
df_immigration_filtered['i94visa'] = df_immigration_filtered['i94visa'].astype('int32').astype('str')
df_immigration_filtered['count'] = df_immigration_filtered['count'].astype('int32')

In [26]:
df_immigration_filtered.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 2306895 entries, 12 to 3029508
Data columns (total 17 columns):
cicid       object
i94cit      object
i94res      object
biryear     int32
gender      object
i94port     object
arrdate     int64
i94mode     object
city        object
i94addr     object
depdate     int64
i94visa     object
visatype    object
admnum      float64
airline     object
fltno       object
count       int32
dtypes: float64(1), int32(2), int64(2), object(12)
memory usage: 299.2+ MB


#### Explore World Temperature Data

In [27]:
df_temperature.sample(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
2077028,1942-09-01,20.453,0.378,Downey,United States,34.56N,118.70W
7573164,1820-12-01,24.473,3.091,Thanjavur,India,10.45N,79.36E
1043062,1783-12-01,-1.571,3.253,Bonn,Germany,50.63N,6.34E
1711881,1840-08-01,16.947,1.168,Colchester,United Kingdom,52.24N,0.00W
4832431,1843-11-01,6.041,1.23,Mingshui,China,36.17N,117.35E


In [28]:
# Select US city temperature only
df_temperature_us = df_temperature[df_temperature.Country == 'United States']

In [29]:
number_of_rows_columns(df_temperature_us)

Number of rows: 687,289
Number of columns: 7


In [30]:
number_of_duplicated_rows(df_temperature_us)

Nubmer of duplicated rows: 0


In [31]:
number_of_missing_values(df_temperature_us)

Unnamed: 0,Column,Number of Missing Values,Percentage of Missing Values
1,AverageTemperature,25765,3.75
2,AverageTemperatureUncertainty,25765,3.75
0,dt,0,0.0
3,City,0,0.0
4,Country,0,0.0
5,Latitude,0,0.0
6,Longitude,0,0.0


In [32]:
df_temperature_us.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 687289 entries, 47555 to 8439246
Data columns (total 7 columns):
dt                               687289 non-null object
AverageTemperature               661524 non-null float64
AverageTemperatureUncertainty    661524 non-null float64
City                             687289 non-null object
Country                          687289 non-null object
Latitude                         687289 non-null object
Longitude                        687289 non-null object
dtypes: float64(2), object(5)
memory usage: 41.9+ MB


#### Clean US Temperature Data

In [33]:
# Drop rows with missing values
df_temperature_us.dropna(inplace=True)

In [34]:
number_of_missing_values(df_temperature_us)

Unnamed: 0,Column,Number of Missing Values,Percentage of Missing Values
0,dt,0,0.0
1,AverageTemperature,0,0.0
2,AverageTemperatureUncertainty,0,0.0
3,City,0,0.0
4,Country,0,0.0
5,Latitude,0,0.0
6,Longitude,0,0.0


In [35]:
number_of_rows_columns(df_temperature_us)

Number of rows: 661,524
Number of columns: 7


In [36]:
# Aggregate the average temperature by city and select columns for dim table modeling
df_temperature_us = df_temperature_us[['City', 'AverageTemperature', 'Latitude', 'Longitude']]
df_temperature_us = (
    df_temperature_us
        .groupby(['City'])
        .agg({
            'AverageTemperature': 'mean', 
            'Latitude': 'first', 
            'Longitude': 'first'}))
df_temperature_us = df_temperature_us.reset_index()

In [37]:
df_temperature_us.sample(5)

Unnamed: 0,City,AverageTemperature,Latitude,Longitude
65,Eugene,9.777756,44.20N,122.98W
207,Sioux Falls,6.346163,44.20N,96.15W
89,Hayward,14.447987,37.78N,122.03W
25,Buffalo,7.726906,42.59N,78.55W
238,West Covina,15.878038,34.56N,118.70W


In [38]:
# Covert the average temperature column to float32
df_temperature_us['AverageTemperature'] = df_temperature_us['AverageTemperature'].astype('float32')

In [39]:
df_temperature_us.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 248 entries, 0 to 247
Data columns (total 4 columns):
City                  248 non-null object
AverageTemperature    248 non-null float32
Latitude              248 non-null object
Longitude             248 non-null object
dtypes: float32(1), object(3)
memory usage: 6.9+ KB


#### Explore U.S. City Demographic Data

In [40]:
df_demographics.sample(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
280,Milwaukee,Wisconsin,31.6,286315.0,313839.0,600154,20615.0,58321.0,2.51,WI,American Indian and Alaska Native,10813
236,San Diego,California,34.5,693826.0,701081.0,1394907,92489.0,373842.0,2.73,CA,White,949388
1922,Atascocita,Texas,32.8,37424.0,40816.0,78240,4416.0,8657.0,3.03,TX,Hispanic or Latino,20441
421,Deerfield Beach,Florida,41.4,37155.0,42614.0,79769,3882.0,23642.0,2.46,FL,Black or African-American,25344
1619,Yuma,Arizona,33.4,48298.0,45847.0,94145,7182.0,19326.0,2.64,AZ,Asian,1180


In [41]:
number_of_rows_columns(df_demographics)

Number of rows: 2,891
Number of columns: 12


In [42]:
number_of_duplicated_rows(df_demographics)

Nubmer of duplicated rows: 0


In [43]:
number_of_missing_values(df_demographics)

Unnamed: 0,Column,Number of Missing Values,Percentage of Missing Values
8,Average Household Size,16,0.55
6,Number of Veterans,13,0.45
7,Foreign-born,13,0.45
3,Male Population,3,0.1
4,Female Population,3,0.1
0,City,0,0.0
1,State,0,0.0
2,Median Age,0,0.0
5,Total Population,0,0.0
9,State Code,0,0.0


In [44]:
df_demographics.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


#### Clean U.S. City Demographic Data

In [45]:
# Drop rows with missing values
df_demographics.dropna(inplace=True)

In [46]:
number_of_missing_values(df_demographics)

Unnamed: 0,Column,Number of Missing Values,Percentage of Missing Values
0,City,0,0.0
1,State,0,0.0
2,Median Age,0,0.0
3,Male Population,0,0.0
4,Female Population,0,0.0
5,Total Population,0,0.0
6,Number of Veterans,0,0.0
7,Foreign-born,0,0.0
8,Average Household Size,0,0.0
9,State Code,0,0.0


In [47]:
number_of_rows_columns(df_demographics)

Number of rows: 2,875
Number of columns: 12


In [48]:
# Covert to correct data types for dim table modeling
df_demographics['Median Age'] = df_demographics['Median Age'].astype('float32')
df_demographics['Male Population'] = df_demographics['Male Population'].astype('int32')
df_demographics['Female Population'] = df_demographics['Female Population'].astype('int32')
df_demographics['Total Population'] = df_demographics['Total Population'].astype('int32')
df_demographics['Number of Veterans'] = df_demographics['Number of Veterans'].astype('int32')
df_demographics['Foreign-born'] = df_demographics['Foreign-born'].astype('int32')
df_demographics['Average Household Size'] = df_demographics['Average Household Size'].astype('float32')
df_demographics['Count'] = df_demographics['Count'].astype('int32')

In [49]:
df_demographics.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 2875 entries, 0 to 2890
Data columns (total 12 columns):
City                      2875 non-null object
State                     2875 non-null object
Median Age                2875 non-null float32
Male Population           2875 non-null int32
Female Population         2875 non-null int32
Total Population          2875 non-null int32
Number of Veterans        2875 non-null int32
Foreign-born              2875 non-null int32
Average Household Size    2875 non-null float32
State Code                2875 non-null object
Race                      2875 non-null object
Count                     2875 non-null int32
dtypes: float32(2), int32(6), object(4)
memory usage: 202.1+ KB


#### Explore Airport Code Table

In [50]:
df_airport.sample(5)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
46782,SVSC,small_airport,San Carlos RÃ­o Negro Airport,367.0,SA,VE,VE-Z,,SVSC,,,"-67.05000305175781, 1.9166669845581055"
43761,SJUC,small_airport,Jardim ParaÃ­so Airport,2395.0,SA,BR,BR-BA,LuÃ­s Eduardo MagalhÃ£es,SJUC,,,"-45.77750015258789, -12.108332633972168"
2109,1MU8,small_airport,Church's Landing Airport,826.0,,US,US-MO,Rushville,1MU8,,1MU8,"-94.99859619140625, 39.55419921875"
14215,CDL8,small_airport,Centredale Airport,595.0,,CA,CA-NS,Centredale,CDL8,,CDL8,"-62.618401, 45.4094"
13507,CA-0565,seaplane_base,Gowganda/Gowganda Lake Seaplane Base,930.0,,CA,CA-ON,,,,,"-80.785301, 47.6506"


In [51]:
number_of_rows_columns(df_airport)

Number of rows: 55,075
Number of columns: 12


In [52]:
# Select US airport entries only
df_airport_us = df_airport[df_airport.iso_country == 'US']

In [53]:
number_of_rows_columns(df_airport_us)

Number of rows: 22,757
Number of columns: 12


In [54]:
number_of_duplicated_rows(df_airport_us)

Nubmer of duplicated rows: 0


In [55]:
number_of_missing_values(df_airport_us)

Unnamed: 0,Column,Number of Missing Values,Percentage of Missing Values
4,continent,22756,100.0
9,iata_code,20738,91.13
8,gps_code,1773,7.79
10,local_code,1521,6.68
3,elevation_ft,239,1.05
7,municipality,102,0.45
0,ident,0,0.0
1,type,0,0.0
2,name,0,0.0
5,iso_country,0,0.0


In [56]:
df_airport_us.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 22757 entries, 0 to 54896
Data columns (total 12 columns):
ident           22757 non-null object
type            22757 non-null object
name            22757 non-null object
elevation_ft    22518 non-null float64
continent       1 non-null object
iso_country     22757 non-null object
iso_region      22757 non-null object
municipality    22655 non-null object
gps_code        20984 non-null object
iata_code       2019 non-null object
local_code      21236 non-null object
coordinates     22757 non-null object
dtypes: float64(1), object(11)
memory usage: 2.3+ MB


#### Clean US Airport Code Table

In [57]:
# Select columns for dim table modeling
df_airport_us = df_airport_us[
    [
        'ident', 'type', 'name', 'elevation_ft', 'iso_country',
        'iso_region', 'municipality', 'gps_code', 'iata_code'
    ]
]

In [58]:
# Drop rows with missing in iata_code column
df_airport_us.dropna(inplace=True)

In [59]:
number_of_missing_values(df_airport_us)

Unnamed: 0,Column,Number of Missing Values,Percentage of Missing Values
0,ident,0,0.0
1,type,0,0.0
2,name,0,0.0
3,elevation_ft,0,0.0
4,iso_country,0,0.0
5,iso_region,0,0.0
6,municipality,0,0.0
7,gps_code,0,0.0
8,iata_code,0,0.0


In [60]:
number_of_rows_columns(df_airport_us)

Number of rows: 1,914
Number of columns: 9


In [61]:
# Covert to correct data type
df_airport_us['elevation_ft'] = df_airport_us['elevation_ft'].astype('float32')

In [62]:
df_airport_us.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1914 entries, 440 to 54896
Data columns (total 9 columns):
ident           1914 non-null object
type            1914 non-null object
name            1914 non-null object
elevation_ft    1914 non-null float32
iso_country     1914 non-null object
iso_region      1914 non-null object
municipality    1914 non-null object
gps_code        1914 non-null object
iata_code       1914 non-null object
dtypes: float32(1), object(8)
memory usage: 142.1+ KB


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Fact Table: immigrations

| Column Name | Data Type |
| :--- | :--- |
| cicid | VARCHAR PRIMARY KEY |
| cit | VARCHAR |
| res | VARCHAR |
| biryear | SMALLINT |
| gender | VARCHAR |
| iata | VARCHAR |
| arrdate | BIGINT|
| mode | VARCHAR |
| city | VARCHAR |
| state | VARCHAR |
| depdate | BIGINT |
| visa | VARCHAR |
| visatype | VARCHAR |
| admnum | DOUBLE PRECISION |
| airline | VARCHAR |
| fltno | VARCHAR|
| count | SMALLINT |

The immigrations table will serve as a fact table and will store the I94 immigration data as events. The `iata` column will be used to join the `iata` column of the airports table. Value of the `city` column is derived from the immigration label description file and is used to join the `city` column of temperature and demographics, repsectively.


Dim Table: temperature

| Column Name | Data Type |
| :--- | :--- |
| city | VARCHAR  PRIMARY KEY |
| average_temperature | REAL |
| latitude | VARCHAR |
| longitude | VARCHAR |

The temperature table will serve as a dimension table and will the average historical temperauture of each U.S. city along with its latitude and longitude.

Dim Table: demographics

| Column Name | Data Type |
| :--- | :--- |
| city | VARCHAR PRIMARY KEY |
| state | VARCHAR PRIMARY KEY |
| median_age | REAL |
| male_population | INTEGER |
| female_population | INTEGER |
| number_of_veterans | INTEGER|
| number_of_foreign_born | INTEGER |
| average_household_size | REAL |
| race | VARCHAR |
| count | INTEGER|

The demographics table will serve as a dimension table and will store demographic information of U.S. cities.

Dim Table: airports

| Column Name | Data Type |
| :--- | :--- |
| ident | VARCHAR PRIMARY KEY |
| type | VARCHAR |
| name | VARCHAR |
| elevation_ft | REAL |
| iso_country | VARCHAR |
| iso_region | VARCHAR |
| municipality | VARCHAR |
| gps_code | VARCHAR |
| iata | VARCHAR |

The airports table will serve as a dimentsion table and will store information of U.S. airports.

#### 3.2 Mapping Out Data Pipelines

1. Create fact and dim tables
2. Extract i94 immigration, i94 immigration data dictionary, temperature, demogratphic and airport data
3. Transform the above extracted data
4. Load the transoformed data into fact and dim tables

### Step 4: Run Pipelines to Model the Data 

In [63]:
# Create a connection to the database, get a cursor, and set autocommit to true
try: 
    conn = psycopg2.connect("host=127.0.0.1 dbname=studentdb user=student password=student")
    conn.set_client_encoding('utf-8')
except psycopg2.Error as e: 
    print("Error: Could not make connection to the Postgres database")
    print(e)
    
try: 
    cur = conn.cursor()
except psycopg2.Error as e: 
    print("Error: Could not get curser to the Database")
    print(e)
    
conn.set_session(autocommit=True)

#### 4.1 Create the data model

In [64]:
# Drop table if exists
for query in drop_table_queries:
    cur.execute(query)

# Create table if not exists
for query in create_table_queries:
    cur.execute(query)

In [65]:
# Check create tables
cur.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='public'")
for table in cur.fetchall():
    print('created table:', table[0])

created table: immigrations
created table: temperature
created table: demographics
created table: airports


#### #1: immigrations Table

In [66]:
print('Total number of records in filtered immigration dataset: {:,d}'.format(df_immigration_filtered.shape[0]))

Total number of records in filtered immigration dataset: 2,306,895


In [67]:
# Select the number of recrods you would like to insert into immigrations table, 
# reccommend minumum number of records is 10,000 and miximum 2306895
df_immigration_filtered_subset = df_immigration_filtered.sample(2306895)
for idx, row in df_immigration_filtered_subset.iterrows():
    cur.execute(insert_table_queries[0], list(row))

#### #2: temperature Table

In [68]:
for idx, row in df_temperature_us.iterrows():
    cur.execute(insert_table_queries[1], list(row))

#### #3: demographics Table

In [69]:
for idx, row in df_demographics.iterrows():
    cur.execute(insert_table_queries[2], list(row))

#### #4: airports Table

In [70]:
for idx, row in df_airport_us.iterrows():
    cur.execute(insert_table_queries[3], list(row))

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

#### #1: Check unique key

In [71]:
def check_unique_key(query_actual, query_expected, table):
    """Check the number of distinct records of a table and the actual number of records of the table."""
    cur.execute(query_actual)
    actual = cur.fetchone()[0]
    cur.execute(query_expected)
    expected = cur.fetchone()[0]
    print('Number of distinct rows on table {}:\nactual {:,d}\nexpected {:,d}'.format(table, actual, expected))
    if actual == expected:
        print('Unique key check on table {} passed.\n'.format(table))
    else:
        print('Unique key check on table {} failed.\n'.format(table))

In [72]:
# Check immigrations table
query_actual = 'SELECT COUNT(DISTINCT cicid) FROM immigrations'
query_expected = 'SELECT COUNT(*) FROM immigrations'
check_unique_key(query_actual, query_expected, 'immigrations')

# Check temperature table
query_actual = 'SELECT COUNT(DISTINCT city) FROM temperature'
query_expected = 'SELECT COUNT(*) FROM temperature'
check_unique_key(query_actual, query_expected, 'temperature')

# Check demographics table
query_actual = 'SELECT COUNT(DISTINCT CONCAT(city, state, race)) FROM demographics'
query_expected = 'SELECT COUNT(*) FROM demographics'
check_unique_key(query_actual, query_expected, 'demographics')

# Check airports table
query_actual = 'SELECT COUNT(DISTINCT ident) FROM airports'
query_expected = 'SELECT COUNT(*) FROM airports'
check_unique_key(query_actual, query_expected, 'airports')

Number of distinct rows on table immigrations:
actual 2,306,895
expected 2,306,895
Unique key check on table immigrations passed.

Number of distinct rows on table temperature:
actual 248
expected 248
Unique key check on table temperature passed.

Number of distinct rows on table demographics:
actual 2,875
expected 2,875
Unique key check on table demographics passed.

Number of distinct rows on table airports:
actual 1,914
expected 1,914
Unique key check on table airports passed.



#### #2: Check table join

In [73]:
def check_table_join(query, table1, table2):
    """Check ther number of records after performing a join query on table1 and table 2."""
    cur.execute(query)
    result = cur.fetchone()[0]
    if result > 0:
        print('Table join check passed. {} join {} returned {:,d} rows.\n'.format(table1, table2, result))
    else:
        print('Table join check faled. {} join {} return {:,d} rows.\n'.format(table1, table2, result))

In [74]:
# immigrations join temperature
query = 'SELECT COUNT(*) FROM immigrations i JOIN temperature t ON i.city = t.city'
check_table_join(query, 'immigrations', 'temperature')

# immigrations join demographics
query = 'SELECT COUNT(*) FROM immigrations i JOIN demographics d ON i.city = d.city'
check_table_join(query, 'immigrations', 'demographics')

# immigrations join airports
query = 'SELECT COUNT(*) FROM immigrations i JOIN airports a ON i.iata = a.iata'
check_table_join(query, 'immigrations', 'airports')

Table join check passed. immigrations join temperature returned 1,832,188 rows.

Table join check passed. immigrations join demographics returned 9,274,518 rows.

Table join check passed. immigrations join airports returned 988,785 rows.



#### #3: Check source/count completeness

In [75]:
def check_source_count(query, table, source):
    """Check the number of records inserted into table from source and the number of records of the source."""
    cur.execute(query)
    actual = cur.fetchone()[0]
    expected = len(source)
    print('Number of rows on table {}:\nactual {:,d}\nexpected {:,d}'.format(table, actual, expected))
    if actual == expected:
        print('Source/Count completeness check on table {} passed.\n'.format(table))
    else:
        print('Source/Count completeness check on table {} failed.\n'.format(table))

In [76]:
# Check immigrations table
query = 'SELECT COUNT(*) FROM immigrations'
check_source_count(query, 'immigrations', df_immigration_filtered_subset)

# Check temperature table
query = 'SELECT COUNT(*) FROM temperature'
check_source_count(query, 'temperature', df_temperature_us)

# Check demographics table
query = 'SELECT COUNT(*) FROM demographics'
check_source_count(query, 'demographics', df_demographics)

# Check airports table
query = 'SELECT COUNT(*) FROM airports'
check_source_count(query, 'airports', df_airport_us)

Number of rows on table immigrations:
actual 2,306,895
expected 2,306,895
Source/Count completeness check on table immigrations passed.

Number of rows on table temperature:
actual 248
expected 248
Source/Count completeness check on table temperature passed.

Number of rows on table demographics:
actual 2,875
expected 2,875
Source/Count completeness check on table demographics passed.

Number of rows on table airports:
actual 1,914
expected 1,914
Source/Count completeness check on table airports passed.



In [77]:
# Close db connection
cur.close()
conn.close()

#### #4: Check table contents

In [78]:
# Crate engine
engine = create_engine("postgres://student:student@127.0.0.1/studentdb")

In [79]:
# Check immigrations table
with engine.connect() as conn:
    rs = conn.execute('SELECT * from immigrations limit 5;')
    df = pd.DataFrame(rs.fetchall())
    df.columns = rs.keys()
df

Unnamed: 0,cicid,cit,res,biryear,gender,iata,arrdate,mode,city,state,depdate,visa,visatype,admnum,airline,fltno,count
0,4682523,117,687,687,F,NYC,20569,1,New York,NY,20582,2,WT,59220930000.0,JJ,8080,1
1,1342530,692,692,692,M,FTL,20551,1,Fort Lauderdale,NE,20552,2,B2,92995760000.0,AM,404,1
2,2028122,209,209,209,M,CHI,20555,1,Chicago,IN,20606,1,E2,93259440000.0,JL,10,1
3,6044051,254,276,276,M,AGA,20573,1,Agana,GU,20575,2,GMT,45136820000.0,7C,3154,1
4,2139405,689,689,689,M,SEA,20555,1,Seattle,NY,20560,2,B2,93252350000.0,JJ,8080,1


In [80]:
# Check temperature table
with engine.connect() as conn:
    rs = conn.execute('SELECT * from temperature limit 5;')
    df = pd.DataFrame(rs.fetchall())
    df.columns = rs.keys()
df

Unnamed: 0,city,average_temperature,latitude,longitude
0,Abilene,16.8925,32.95N,100.53W
1,Akron,9.60508,40.99N,80.95W
2,Albuquerque,11.1353,34.56N,107.03W
3,Alexandria,11.9185,39.38N,76.99W
4,Allentown,9.5233,40.99N,74.56W


In [81]:
# Check demographics table
with engine.connect() as conn:
    rs = conn.execute('SELECT * from demographics limit 5;')
    df = pd.DataFrame(rs.fetchall())
    df.columns = rs.keys()
df

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_of_veterans,number_of_foreign_born,average_household_size,state_code,race,count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [82]:
# Check airports table
with engine.connect() as conn:
    rs = conn.execute('SELECT * from airports limit 5;')
    df = pd.DataFrame(rs.fetchall())
    df.columns = rs.keys()
df

Unnamed: 0,ident,type,name,elevation_ft,iso_country,iso_region,municipality,gps_code,iata
0,07FA,small_airport,Ocean Reef Club Airport,8.0,US,US-FL,Key Largo,07FA,OCA
1,0CO2,small_airport,Crested Butte Airpark,8980.0,US,US-CO,Crested Butte,0CO2,CSE
2,0TE7,small_airport,LBJ Ranch Airport,1515.0,US,US-TX,Johnson City,0TE7,JCY
3,13MA,small_airport,Metropolitan Airport,418.0,US,US-MA,Palmer,13MA,PMX
4,13Z,seaplane_base,Loring Seaplane Base,0.0,US,US-AK,Loring,13Z,WLR


#### 4.3 Data dictionary 

Fact Table: immigrations

| Column Name | Description |
| :--- | :--- |
| cicid | Unique record identifier |
| cit | Born country |
| res | Residence country |
| biryear | 4 digit year of birth |
| gender | Non-immigrant sex |
| iata | Arrival port |
| arrdate | Arrival date in the USA |
| mode | Arrival mode of transportation 1 = 'Air' 2 = 'Sea' 3 = 'Land' 9 = 'Not reported' |
| city | Arrival city |
| state | Arrival state |
| depdate | Departure Date from the USA |
| visa | Visa categories: 1 = 'Business' 2 = 'Pleasure' 3 = 'Student' |
| visatype | Class of admission legally admitting the non-immigrant to temporarily stay in USA |
| admnum | Admission Number |
| airline | Airline used to arrive in the USA |
| fltno | Flight number of Airline used to arrive in USA |
| count | Used for summary statistics |

The immigrations table is populated from the I94 immigration dataset.

Dim Table: temperature

| Column Name | Description |
| :--- | :--- |
| city | Name of U.S. city |
| average_temperature | Averate temperature of the city |
| latitude | Latitude of the city |
| longitude | Longitude of the city |

The temperature table is populated from the subset (U.S.) of world temperature dataset.

Dim Table: demographics

| Column Name | Description |
| :--- | :--- |
| city | Name of U.S. city |
| state | Name of U.S. state of the city |
| median_age | Median age of pupulation |
| male_population | Number of male population |
| female_population | Number of female population |
| number_of_veterans | Number of veterans |
| number_of_foreign_born | Number of residents who were not born in the city |
| average_household_size | Average household size |
| race | Race class |
| count | Number of individual of each race |

The demographics table is populated from the U.S. demographic dataset.

Dim Table: airports

| Column Name | Description |
| :--- | :--- |
| ident | Unique record identifier |
| type | Type of the airport |
| name | Name of U.S. airport |
| elevation_ft | Altitude of the airport |
| iso_country | ISO code of U.S. |
| iso_region | ISO code of U.S. regions |
| municipality | Name of the city where the airport resides |
| gps_code | GPS code |
| iata | IATA code of the airport |

The airports table is poupulated from the U.S. airports dataset.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

    The largest dataset is the I94 immigration data. I only choose the Aprial 2016 subset, which contains around 3 million records. Pandas is still capable of handling this amount of data. Plus Pandas has a `read_sas` API to extract the dataset in `.sas7bdat` format. The size of each of the rest of datasets is way too smaller than the immigration data. Thus, Pandas is good enough for building a ETL pipeline. The final version of the datasets are loaded into a fact and dimension tables in a Postgres database.
    
* Propose how often the data should be updated and why.

    The US National Tourism and Trade Office release the dataset once a month, so the data should be updated monthly.
    
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 
     If the data was increased by 100x, it could not fit into the momery of a typical PC or laptop, so the Amazon cloud platform, specifically, the configuration of EMR should be scaled horizontally to meet the increase in the size of the dataset. Futhermore, it would be too slow for Pandas to handle such amount of data, so Spark would be a ideal choice.
     
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
     The Apache Airflow would be the candidate that could automate the ETL pipeline and perform quality check.
     
 * The database needed to be accessed by 100+ people.
 
     Distibuted dabases come into rescue in a way that it could improve the query efficiency. For example, Amazon Reshift would be a ideal choice that could store and share data with other users. 
     