# Data Engineering Capstone Project

#### Project Summary

This projects aggregate several data sources (immigration, temperature, demographic and airport codes), which aims to help the immigration office to better understand the pattern of migrations.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Project Scope 

#### Steps:
1. Load the data into dataframe for further processing.
2. Explore the data, identify odd/missing fields and clean the data.
3. Create dimension tables and fact table.

Tech Stack:
AWS, Spark, Pandas.

#### Describe and Gather Data 
**I94 Immigration Data**:
This data comes from the US National Tourism and Trade Office. When an individual enters the US as a nonimmigrant/citizen, his information is collected by Customs Border 
Protection (CBP) officer. The dataset contains the data for the year 2016. (https://www.trade.gov/national-travel-and-tourism-office).

**World Temperature Data**:
This data is from Kaggle (https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data).

**US Demographic Data**:
This data is from OpenSoft (https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

**Airport Codes**:
This data contains airport codes and their corresponding cities (https://datahub.io/core/airport-codes#data).

### Packages

In [1]:
import pandas as pd
import os
import configparser
import datetime as dt

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark.sql import SQLContext
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import *

import requests
requests.packages.urllib3.disable_warnings()

### AWS Configs

In [2]:
config = configparser.ConfigParser()
config.read('config.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

### Spark

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

### Gather I94 Immigration Dataset

In [17]:
# Load data to a dataframe
im_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat')

In [18]:
def print_formatted_float(number):
    print('{:,}'.format(number))
    
# Total number
print_formatted_float(im_df.count())
# display samples
im_df.limit(10).toPandas()

3,444,249


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2.0,2016.0,5.0,207.0,207.0,XXX,20605.0,,,,...,U,,1989.0,D/S,,,,1141634000.0,,F1
1,3.0,2016.0,5.0,209.0,209.0,XXX,20598.0,,,,...,U,,1989.0,05232018,,,,1863211000.0,,E2
2,4.0,2016.0,5.0,213.0,213.0,XXX,20578.0,,,,...,U,,1938.0,11032016,,,,4696371000.0,,B2
3,5.0,2016.0,5.0,213.0,213.0,XXX,20601.0,,,,...,U,,1987.0,D/S,,,,1141260000.0,,F1
4,13.0,2016.0,5.0,213.0,213.0,CHI,20577.0,1.0,IL,20270.0,...,,M,1987.0,D/S,F,,EK,64792870000.0,235.0,F1
5,20.0,2016.0,5.0,582.0,582.0,XXX,20605.0,0.0,,,...,,,1968.0,07152016,,,,77523310000.0,,M1
6,22.0,2016.0,5.0,101.0,101.0,BOS,20575.0,1.0,NY,20587.0,...,,M,1963.0,10312016,F,,BA,911023900.0,215.0,B2
7,25.0,2016.0,5.0,101.0,101.0,WAS,20575.0,1.0,NY,20584.0,...,,M,1950.0,10312016,,,OS,95077270000.0,93.0,B1
8,26.0,2016.0,5.0,101.0,101.0,NYC,20575.0,1.0,NY,20604.0,...,,M,1948.0,10312016,,,AZ,95084120000.0,608.0,B2
9,28.0,2016.0,5.0,101.0,101.0,MIA,20575.0,1.0,FL,20596.0,...,,M,1984.0,10312016,F,,AZ,909366500.0,630.0,B2


In [19]:
im_df.select("visatype").dropDuplicates().show(5)

+--------+
|visatype|
+--------+
|      F2|
|     GMB|
|      B2|
|      F1|
|     CPL|
+--------+
only showing top 5 rows



<b><i>Decription for each column</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Column</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">cicid</td><td class="tg-0pky">Unique ID</td>
 <tr><td class="tg-0pky">i94cit</td><td class="tg-0pky">3 digit code for immigrant country of birth</td>
 <tr><td class="tg-0pky">i94res</td><td class="tg-0pky">3 digit code for immigrant country of residence </td>
 <tr><td class="tg-0pky">i94port</td><td class="tg-0pky">Port of admission</td>
 <tr><td class="tg-0pky">admnum</td><td class="tg-0pky">Admission Number</td>
 <tr><td class="tg-0pky">fltno</td><td class="tg-0pky">Flight number of Airline used to arrive in U.S.</td>
 <tr><td class="tg-0pky">visatype</td><td class="tg-0pky">Class of admission legally admitting the non-immigrant to temporarily stay in U.S.</td>
 <tr><td class="tg-0pky">arrdate</td><td class="tg-0pky">Arrival Date in the USA</td>
 <tr><td class="tg-0pky">i94mode</td><td class="tg-0pky">Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)</td>
 <tr><td class="tg-0pky">i94addr</td><td class="tg-0pky">USA State of arrival</td>
 <tr><td class="tg-0pky">depdate</td><td class="tg-0pky">Departure Date from the USA</td>
 <tr><td class="tg-0pky">entdepa</td><td class="tg-0pky">Arrival Flag - admitted or paroled into the U.S.</td>
 <tr><td class="tg-0pky">entdepd</td><td class="tg-0pky">Departure Flag - Departed, lost I-94 or is deceased</td>
 <tr><td class="tg-0pky">entdepu</td><td class="tg-0pky">Update Flag - Either apprehended, overstayed, adjusted to perm residence</td>
 <tr><td class="tg-0pky">matflag</td><td class="tg-0pky">Match flag - Match of arrival and departure records</td>
 <tr><td class="tg-0pky">biryear</td><td class="tg-0pky">4 digit year of birth</td>
 <tr><td class="tg-0pky">dtaddto</td><td class="tg-0pky">Character Date Field - Date to which admitted to U.S. (allowed to stay until)</td>
 <tr><td class="tg-0pky">gender</td><td class="tg-0pky">Non-immigrant sex</td>
 <tr><td class="tg-0pky">insnum</td><td class="tg-0pky">INS number</td>
 <tr><td class="tg-0pky">i94bir</td><td class="tg-0pky">Age of Respondent in Years</td>
 <tr><td class="tg-0pky">i94visa</td><td class="tg-0pky">Visa codes collapsed into three categories</td>
 <tr><td class="tg-0pky">count</td><td class="tg-0pky">Field used for summary statistics</td>
 <tr><td class="tg-0pky">dtadfile</td><td class="tg-0pky">Character Date Field - Date added to I-94 Files</td>
 <tr><td class="tg-0pky">visapost</td><td class="tg-0pky">Department of State where where Visa was issued </td>
 <tr><td class="tg-0pky">occup</td><td class="tg-0pky">Occupation that will be performed in U.S</td>
 <tr><td class="tg-0pky">airline</td><td class="tg-0pky">Airline used to arrive in U.S.</td>
 <tr><td class="tg-0pky">i94yr</td><td class="tg-0pky">year</td>
 <tr><td class="tg-0pky">i94mon</td><td class="tg-0pky">month</td>
</table>

### Gather World Temperature Data

In [20]:
temp_df = spark.read.csv('GlobalLandTemperaturesByCity.csv', header=True, inferSchema=True)

In [21]:
# Total number
print_formatted_float(temp_df.count())
# display samples
temp_df.limit(10).toPandas()

1,981,321


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


<b><i>Description for each column</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Column</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">dt</td><td class="tg-0pky">Date</td>
 <tr><td class="tg-0pky">AverageTemperature</td><td class="tg-0pky">Global average land temperature in celsius</td>
 <tr><td class="tg-0pky">AverageTemperatureUncertainty</td><td class="tg-0pky">95% confidence interval around the average</td>
 <tr><td class="tg-0pky">Country</td><td class="tg-0pky">Country</td>
 <tr><td class="tg-0pky">City</td><td class="tg-0pky">City</td>
 <tr><td class="tg-0pky">Latitude</td><td class="tg-0pky">Latitude</td>
 <tr><td class="tg-0pky">Longitude</td><td class="tg-0pky">Longitude</td>
</table>

### Gather US Demographic Data

In [22]:
dm_df = spark.read.csv("us-cities-demographics.csv", inferSchema=True, header=True, sep=';')

In [23]:
# Total number
print_formatted_float(dm_df.count())
# display samples
dm_df.limit(10).toPandas()

2,891


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229,62432,118661,6634,7517,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712,41971,80683,4815,8355,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629,56860,108489,3800,37038,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762,43270,85032,5783,3269,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751,58077,109828,5204,16315,2.65,NC,Asian,11060


<b><i>Description for each column</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">City</td><td class="tg-0pky">Name of the city</td>
 <tr><td class="tg-0pky">State</td><td class="tg-0pky">State of the city</td>
 <tr><td class="tg-0pky">Foreign born</td><td class="tg-0pky">Number of residents who weren't born here</td>
 <tr><td class="tg-0pky">Average Household Size</td><td class="tg-0pky">Average size of a household</td>
 <tr><td class="tg-0pky">Median Age</td><td class="tg-0pky">Population's median age</td>
 <tr><td class="tg-0pky">Male Population</td><td class="tg-0pky">Population of male</td>
 <tr><td class="tg-0pky">State Code</td><td class="tg-0pky">US state code</td>
 <tr><td class="tg-0pky">Race</td><td class="tg-0pky">Race</td>
 <tr><td class="tg-0pky">Count</td><td class="tg-0pky">Number of people for each race</td>
 <tr><td class="tg-0pky">Female Population</td><td class="tg-0pky">Population of female</td>
 <tr><td class="tg-0pky">Total Population</td><td class="tg-0pky">Total population</td>
 <tr><td class="tg-0pky">Number of Veterans</td><td class="tg-0pky">Total Veterans</td>

 
</table>

### Gather Airport codes

In [24]:
ac_df = spark.read.csv("airport-codes_csv.csv", inferSchema=True, header=True, sep=',')

In [25]:
# Total number
print_formatted_float(ac_df.count())
# display samples
ac_df.limit(10).toPandas()

28,650


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


<b><i>Description for each column</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">ident</td><td class="tg-0pky">Identity code</td>
 <tr><td class="tg-0pky">type</td><td class="tg-0pky">Type of the airport</td>
 <tr><td class="tg-0pky">name</td><td class="tg-0pky">Name of the airport</td>
 <tr><td class="tg-0pky">elevation_ft</td><td class="tg-0pky">Elevation of the airport</td>
 <tr><td class="tg-0pky">continent</td><td class="tg-0pky">Continent where the airport is located</td>
 <tr><td class="tg-0pky">iso_country</td><td class="tg-0pky">Country name</td>
 <tr><td class="tg-0pky">iso_region</td><td class="tg-0pky">Region name</td>
 <tr><td class="tg-0pky">municipality</td><td class="tg-0pky">Administrative division</td>
 <tr><td class="tg-0pky">gps_code</td><td class="tg-0pky">GPS code</td>
 <tr><td class="tg-0pky">coordinates</td><td class="tg-0pky">GPS coordinates</td>

 
</table>

### Step 2: Explore and Clean the Data

### Explore Immigration Data

In [26]:
im_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [27]:
def percentage_missing(df):
    """Calculate the percentage of missing data in each column
    """
    # Dataframe that store missing data stats
    missing_stats_df = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()
    
    # Change dataframe format
    missing_stats_df = pd.melt(missing_stats_df, var_name='columns', value_name='counts')

    # Calculate the percentage of missing values
    missing_stats_df['%Percentage of missing data'] = 100*missing_stats_df['counts']/df.count()
    
    return missing_stats_df

In [28]:
percentage_missing(im_df)

Unnamed: 0,columns,counts,%Percentage of missing data
0,cicid,0,0.0
1,i94yr,0,0.0
2,i94mon,0,0.0
3,i94cit,0,0.0
4,i94res,0,0.0
5,i94port,0,0.0
6,arrdate,0,0.0
7,i94mode,142,0.004123
8,i94addr,172701,5.014185
9,depdate,184494,5.356581


### Clean Immigration Data

In [29]:
# Drop columns that contains significant missing data
columns = ['occup', 'entdepu','insnum']
im_df = im_df.drop(*columns)

# Drop duplicated data based on ID
im_df = im_df.dropDuplicates(['cicid'])

# Drop rows that contains missing data
im_df = im_df.dropna(how='all', subset=['cicid'])

### Explore World Temperature Data

In [30]:
temp_df.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [31]:
temp_df = temp_df.withColumn("dt",col("dt").cast(StringType()))
percentage_missing(temp_df)

Unnamed: 0,columns,counts,%Percentage of missing data
0,dt,0,0.0
1,AverageTemperature,91720,4.629235
2,AverageTemperatureUncertainty,91720,4.629235
3,City,0,0.0
4,Country,0,0.0
5,Latitude,0,0.0
6,Longitude,0,0.0


### Clean World Temperature Data

In [32]:
# Drop rows that contains missing data
temp_df = temp_df.dropna(how='all')
print(temp_df.count())

1981321


### Explore US demographic data

In [33]:
dm_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [34]:
percentage_missing(dm_df)

Unnamed: 0,columns,counts,%Percentage of missing data
0,City,0,0.0
1,State,0,0.0
2,Median Age,0,0.0
3,Male Population,3,0.10377
4,Female Population,3,0.10377
5,Total Population,0,0.0
6,Number of Veterans,13,0.449671
7,Foreign-born,13,0.449671
8,Average Household Size,16,0.553442
9,State Code,0,0.0


### Clean US demographic data

In [35]:
# Drop rows that contains missing data
dm_df = dm_df.dropna(how='all')
print(dm_df.count())

2891


### Explore airport codes

In [36]:
ac_df.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [37]:
percentage_missing(ac_df)

Unnamed: 0,columns,counts,%Percentage of missing data
0,ident,0,0.0
1,type,0,0.0
2,name,0,0.0
3,elevation_ft,3024,10.554974
4,continent,0,0.0
5,iso_country,0,0.0
6,iso_region,0,0.0
7,municipality,2473,8.631763
8,gps_code,7072,24.684119
9,iata_code,24716,86.268761


### Clean airport codes data

In [38]:
# Drop columns that contains missing data
columns = ['iata_code', 'local_code', 'elevation_ft', 'municipality', 'gps_code']
ac_df = ac_df.drop(*columns)
print(ac_df.count())

28650


### Step 3: Define the Data Model

#### 3.1 Conceptual Data Model
![title](Fact_Dim_Tables_Diagram.png)
As shown above, 7 dimension tables are designed together with a fact table.<br>

<b><i>Description of each table</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Table</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">Country Dimension Table</td><td class="tg-0pky">contains generic information about the country, such as temperature, latitude and longitude etc.</td>
 <tr><td class="tg-0pky">Arrival Dimension Table</td><td class="tg-0pky">contains the immigrant's arrival information such as the airline, type of transport, arrival location etc.</td>
 <tr><td class="tg-0pky">Visa Dimension Table</td><td class="tg-0pky">contains the immigrant's visa related information such as type, issuing place etc.</td>
 <tr><td class="tg-0pky">Departure Dimension Table</td><td class="tg-0pky">contians the immigrant's departure information.</td>
 <tr><td class="tg-0pky">Demographic Dimension Table</td><td class="tg-0pky">contains the state's demographic informaton such as population, age and race.</td>
 <tr><td class="tg-0pky">Personal Dimension Table</td><td class="tg-0pky">contains the immigrant's information such as age and gender.</td>
 <tr><td class="tg-0pky">Time Dimenstion Table</td><td class="tg-0pky">contains the date and time when the immigrant arrives.</td>
 <tr><td class="tg-0pky">Immigrant Fact Table</td><td class="tg-0pky">contains all the foreign keys and other essential data.</td>
</table>

#### 3.2 Mapping Out Data Pipelines
- Load data to Spark dataframe
- Clean the data
- Create dimension tables
- Create fact table

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

### Create Country Dimension Table

In [39]:
# Aggregate temp data
def aggregate_temp(df):
    
    df = df.select(['Country', 'AverageTemperature']).groupby('Country').avg()
    
    df = df.withColumnRenamed('avg(AverageTemperature)', 'average_temperature')
    
    return df

In [40]:
output = "tables/"

In [41]:
# Create country dimension table
def country_dim(df, temp_df, output):
    # Load aggregated temp data
    agg_temp = aggregate_temp(temp_df).toPandas()
    # Load mapping codes
    mapping_codes = pd.read_csv('mapping_codes.csv')
    
    @udf('string')
    def get_country_average_temperature(name):
        print("Processing: ", name)
        avg_temp = agg_temp[agg_temp['Country']==name]['average_temperature']
        
        if not avg_temp.empty:
            return str(avg_temp.iloc[0])
        
        return None
    
    @udf()
    def get_country_name(code):
        name = mapping_codes[mapping_codes['code']==code]['Name'].iloc[0]
        
        if name:
            return name.title()
        return None
        
    # select and rename i94res column
    dim_df = df.select(['i94res']).distinct() \
                .withColumnRenamed('i94res', 'country_code')
    
    # create country_name column
    dim_df = dim_df.withColumn('country_name', get_country_name(dim_df.country_code))
    
    # create average_temperature column
    dim_df = dim_df.withColumn('average_temperature', get_country_average_temperature(dim_df.country_name))
    
    # write the dimension to a parquet file
    dim_df.write.parquet(output + "country", mode="overwrite")
    
    return dim_df

In [43]:
#country_dim_df = country_dim(im_df, temp_df, output)

In [None]:
country_dim_df.show(5)

### Create Time Dimension Table

In [44]:
def time_dim (df, output):
    
    # Convert to datetime format
    get_datetime = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
    
    # Create time dimension table and add columns
    time_df = df.select(['arrdate']).withColumn("arrdate", get_datetime(df.arrdate)).distinct()
    time_df = time_df.withColumn('day', dayofmonth('arrdate'))
    time_df = time_df.withColumn('week', weekofyear('arrdate'))
    time_df = time_df.withColumn('month', month('arrdate'))
    time_df = time_df.withColumn('year', year('arrdate'))
    time_df = time_df.withColumn('weekday', dayofweek('arrdate'))

    time_df = time_df.withColumn('id', monotonically_increasing_id())
    
    # Write to parquet file
    partition_columns = ['year', 'month', 'week']
    time_df.write.parquet(output + "time_dim", partitionBy=partition_columns, mode="overwrite")
    
    return time_df

In [45]:
time_dim_df = time_dim(im_df, output)

In [46]:
time_dim_df.show(5)

+----------+---+----+-----+----+-------+------------+
|   arrdate|day|week|month|year|weekday|          id|
+----------+---+----+-----+----+-------+------------+
|2016-05-21| 21|  20|    5|2016|      7| 17179869184|
|2016-05-16| 16|  20|    5|2016|      2| 77309411328|
|2016-05-27| 27|  21|    5|2016|      6|137438953472|
|2016-05-08|  8|  18|    5|2016|      1|249108103168|
|2016-05-19| 19|  20|    5|2016|      5|309237645312|
+----------+---+----+-----+----+-------+------------+
only showing top 5 rows



### Create Visa Dimension Table

In [47]:
def visa_dim(df, output_data):

    visa_dim_df = df.select(['cicid', 'visatype', 'visapost', 'i94visa', 'i94port'])
    
    # write dimension to parquet file
    visa_dim_df.write.parquet(output + "visa", mode="overwrite")
    
    return visa_dim_df

In [48]:
visa_dim_df = visa_dim(im_df, output)
visa_dim_df.show(n=5)

+-----+--------+--------+-------+-------+
|cicid|visatype|visapost|i94visa|i94port|
+-----+--------+--------+-------+-------+
|299.0|      WT|    null|    2.0|    LOS|
|305.0|      WB|    null|    1.0|    LOS|
|558.0|      WT|    null|    2.0|    SFR|
|692.0|      WB|    null|    1.0|    CLT|
|769.0|      WT|    null|    2.0|    ATL|
+-----+--------+--------+-------+-------+
only showing top 5 rows



### Create Demographics Dimension Table

In [None]:
def dm_dim(df, output_data):
    dim_df = df.withColumnRenamed('Median Age','median_age') \
            .withColumnRenamed('Male Population', 'male_population') \
            .withColumnRenamed('Female Population', 'female_population') \
            .withColumnRenamed('Total Population', 'total_population') \
            .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
            .withColumnRenamed('Foreign-born', 'foreign_born') \
            .withColumnRenamed('Average Household Size', 'average_household_size') \
            .withColumnRenamed('State Code', 'state_code')
    # lets add an id column
    dim_df = dim_df.withColumn('id', monotonically_increasing_id())
    
    # write dimension to parquet file
    dim_df.write.parquet(output + "demographics", mode="overwrite")
    
    return dim_df

In [None]:
dm_dim_df = dm_dim(dm_df, output)
dm_dim_df.limit(5).toPandas()

### Create Personal Dimension Table

In [None]:
def personal_dim(df, output_data):

    personal_dim_df = df.select(['cicid', 'gender', 'biryear', 'i94bir'])
    
    # write dimension to parquet file
    personal_dim_df.write.parquet(output + "personal", mode="overwrite")
    
    return personal_dim_df

In [None]:
personal_dim_df = personal_dim(im_df, output)
personal_dim_df.limit(5).toPandas()

### Create Arrival Dimension Table

In [None]:
def arr_dim(df, output_data):

    arr_dim_df = df.select(['cicid', 'airline', 'i94mode', 'fltno', 'entdepa', 'i94addr'])
    
    # Write to parquet file
    arr_dim_df.write.parquet(output + "arrival", mode="overwrite")
    
    return arr_dim_df

In [None]:
arr_dim_df = arr_dim(im_df, output)
arr_dim_df.limit(5).toPandas()

### Create Departure Dimension Table

In [None]:
def dep_dim(df, output_data):

    dep_dim_df = df.select(['cicid', 'depdate', 'entdepd'])
    
    # Write to parquet file
    dep_dim_df.write.parquet(output + "arrival", mode="overwrite")
    
    return dep_dim_df

In [None]:
dep_dim_df = dep_dim(im_df, output)
dep_dim_df.limit(5).toPandas()

### Create Immigration Fact Table

In [None]:
def im_fact(df, output):
    
    # convert to datetime format
    get_datetime = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
    df = df.withColumn("arrdate", get_datetime(df.arrdate))
    
    # rename columns
    df = df.withColumnRenamed('cicid','id') \
            .withColumnRenamed('i94res', 'country_code') \
            .withColumnRenamed('i94addr', 'state_code') \
            .withColumnRenamed('matflag', 'match_true') \
            .withColumnRenamed('admnum', 'admission_number') \
    
    # write dimension to parquet file
    df.write.parquet(output + "immigration", mode="overwrite")
    
    return df

In [None]:
im_fact_df = im_fact(im_df, output)
im_fact_df.limit(5).toPandas()

#### 4.2 Data Quality Checks
- Two data quality checks are performed:
 - Count check: the count check ensures that the ETL has created fact and dimension tables.
 - Unique key check: the unique key check ensures the data does not contain duplicated values.

In [1]:
def count_check(df, table):
    
    total = df.count()

    if total == 0:
        print(f"Failed")
    else:
        print(f"Check passed for {table_name}, it contains {total} records.")
    return 0

In [2]:
def unique_key_check(df, column, table):
    
    if df[column].duplicated().any():
        print(f"Failed, column {column} in table {table} contains duplicated values.")
    else:
        print(f"Check passed for column {column} in table {table}.")
    return 0

In [76]:
table_dataframes = {
    'immigration_fact': im_fact_df,
    'visa_dim': visa_dim_df,
    'time_dim': time_dim_df,
    'demographics_dim': dm_dim_df,
    'country_dim': country_dim_df,
    'arrival_dim': arr_dim_df,
    'depart_dim': dep_dim_df,
    'personal_dim': personal_dim_df
}
for table_name, table_df in table_dataframes.items():
    count_check(table_df, table_name)

Check passed for immigration_fact, it contains 3444249 records.
Check passed for visa_dim, it contains 3444249 records.
Check passed for time_dim, it contains 31 records.
Check passed for demographics_dim, it contains 2891 records.
Check passed for country_dim, it contains 230 records.
Check passed for arrival_dim, it contains 3444249 records.
Check passed for depart_dim, it contains 3444249 records.
Check passed for personal_dim, it contains 3444249 records.


#### 4.3 Data dictionary 

##### Immigration Fact Table

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">id</td><td class="tg-0pky">Unique record ID</td></tr>
 <tr><td class="tg-0pky">i94res</td><td class="tg-0pky">3 digit code for immigrant country of residence</td></tr>
 <tr><td class="tg-0pky">arrdate</td><td class="tg-0pky">Arrival Date in the USA</td></tr>
 <tr><td class="tg-0pky">i94addr</td><td class="tg-0pky">USA State of arrival</td></tr>
 <tr><td class="tg-0pky">count</td><td class="tg-0pky">Field used for summary statistics</td></tr>
 <tr><td class="tg-0pky">matflag</td><td class="tg-0pky">Match flag - Match of arrival and departure records</td></tr>
 <tr><td class="tg-0pky">dtaddto</td><td class="tg-0pky">Character Date Field - Date to which admitted to U.S. (allowed to stay until)</td></tr>
 <tr><td class="tg-0pky">admnum</td><td class="tg-0pky">Admission Number</td></tr>
</table>

##### Country Dimension Table
<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">i94res</td><td class="tg-0pky">3 digit code for immigrant country of residence</td></tr>
 <tr><td class="tg-0pky">AverageTemperature</td><td class="tg-0pky">Global average land temperature in celsius</td></tr>
 <tr><td class="tg-0pky">AverageTemperatureUncertainty</td><td class="tg-0pky">95% confidence interval around the average</td></tr>
 <tr><td class="tg-0pky">City</td><td class="tg-0pky">Name of City</td></tr>
 <tr><td class="tg-0pky">Country</td><td class="tg-0pky">Name of Country</td></tr>
 <tr><td class="tg-0pky">Latitude</td><td class="tg-0pky">City Latitude</td></tr>
 <tr><td class="tg-0pky">Longitude</td><td class="tg-0pky">City Longitude</td></tr>
</table>

##### Visa Dimension Table
<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">cicid</td><td class="tg-0pky">Unique record ID</td></tr>
 <tr><td class="tg-0pky">visatype</td><td class="tg-0pky">Class of admission legally admitting the non-immigrant to temporarily stay in U.S.</td></tr>
 <tr><td class="tg-0pky">visapost</td><td class="tg-0pky">Department of State where where Visa was issued</td></tr>
 <tr><td class="tg-0pky">i94visa</td><td class="tg-0pky">Visa codes collapsed into three categories</td></tr>
 <tr><td class="tg-0pky">i94port</td><td class="tg-0pky">Port of admission</td></tr>
</table>

 ##### Time Dimension Table
<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">arrdate</td><td class="tg-0pky">Arrival Date in the USA</td></tr>
 <tr><td class="tg-0pky">i94yr</td><td class="tg-0pky">4 digit year</td></tr>
 <tr><td class="tg-0pky">i94mon</td><td class="tg-0pky">Numeric month</td></tr>
 <tr><td class="tg-0pky">dtadfile</td><td class="tg-0pky">Character Date Field - Date added to I-94 Files</td></tr>
</table>

##### Demographics Dimension Table
<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">State Code</td><td class="tg-0pky">Code of the US state</td>
 <tr><td class="tg-0pky">i94addr</td><td class="tg-0pky">USA State of arrival</td>
 <tr><td class="tg-0pky">City</td><td class="tg-0pky">City Name</td>
 <tr><td class="tg-0pky">Median Age</td><td class="tg-0pky">Median age of the population</td>
 <tr><td class="tg-0pky">Male Population</td><td class="tg-0pky">Count of male population</td>
 <tr><td class="tg-0pky">Female Population</td><td class="tg-0pky">Count of female population</td>
 <tr><td class="tg-0pky">Total Population</td><td class="tg-0pky">Count of total population</td>
 <tr><td class="tg-0pky">Number of Veterans</td><td class="tg-0pky">Count of total Veterans</td>
 <tr><td class="tg-0pky">Foreign born</td><td class="tg-0pky">Count of residents of the city that were not born in the city</td>
 <tr><td class="tg-0pky">Race</td><td class="tg-0pky">Respondent race</td>
 <tr><td class="tg-0pky">Count</td><td class="tg-0pky">Count of city's individual per race</td>
</table>

##### Personal Dimension Table
<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">cicid</td><td class="tg-0pky">Unique record ID</td>
 <tr><td class="tg-0pky">gender</td><td class="tg-0pky">Non-immigrant sex</td>
 <tr><td class="tg-0pky">biryear</td><td class="tg-0pky">4 digit year of birth</td>
 <tr><td class="tg-0pky">i94bir</td><td class="tg-0pky">Age of Respondent in Years</td>
</table>

##### Arrival Dimension Table
<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">cicid</td><td class="tg-0pky">Unique record ID</td>
 <tr><td class="tg-0pky">airline</td><td class="tg-0pky">Airline used to arrive in U.S.</td>
 <tr><td class="tg-0pky">fltno</td><td class="tg-0pky">Flight number of Airline used to arrive in U.S.</td>
 <tr><td class="tg-0pky">i94mode</td><td class="tg-0pky">Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)</td>
 <tr><td class="tg-0pky">entdepa</td><td class="tg-0pky">Arrival Flag - admitted or paroled into the U.S.</td>
 <tr><td class="tg-0pky">i94addr</td><td class="tg-0pky">USA State of arrival</td>
</table>

##### Departure Dimension Table
<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">cicid</td><td class="tg-0pky">Unique record ID</td>
 <tr><td class="tg-0pky">depdate</td><td class="tg-0pky">Departure Date from the USA</td>
 <tr><td class="tg-0pky">entdepd</td><td class="tg-0pky">Departure Flag - Departed, lost I-94 or is deceased</td>
</table>

### Step 5: Complete Project Write Up
* **Clearly state the rationale for the choice of tools and technologies for the project:**
Spark was chosen as the technology for data processing. This is because it has well-developed APIs to process different formats.
Pandas was chosen in some cases since it provides flexible data structures for data manipulation.
* **Propose how often the data should be updated and why:**
This depends on the I94 dataset its self, according to the website, it is updated monthly.

* **Write a description of how you would approach the problem differently under the following scenarios:**
 * **The data was increased by 100x:**
 Spark will still be chosen but parallel computing will be required, this can be provided by any mordern cloud platform.
 * **The data populates a dashboard that must be updated on a daily basis by 7am every day:**
 Pipeline automation tools such as Airflow can be used to schedule the work.
 * **The database needed to be accessed by 100+ people:**
 Data warehouse such as Redshift or Azure SQL Data Warehouse can be used to address this senario. 

### Sample queries

In [80]:
# How many people entre the US from port "LOS" in May 2016?
visa_dim_df.createOrReplaceTempView("visa_table")
sqlDF = spark.sql("""
SELECT COUNT(*)
FROM visa_table
WHERE i94port="LOS"
""").show()

+--------+
|count(1)|
+--------+
|  362779|
+--------+



In [77]:
# How many people entre the US under visa type "WT" in May 2016?
visa_dim_df.createOrReplaceTempView("visa_table")
sqlDF = spark.sql("""
SELECT COUNT(*)
FROM visa_table
WHERE visatype="WT"
""").show()

+--------+
|count(1)|
+--------+
| 1379841|
+--------+

