# US I94 Immigration Data Lake
### Data Engineering Capstone Project

#### Project Summary

When entering the United States as a foreign visitor, the Customs Border Protection (CBP) officer examines your passpord and visa, then issues a small card called the Form I-94. This form, also named “Arrival/Departure Record”, is evidence of a nonimmigrant’s term of admission and used to document legal status in the United States, including length of stay and departure. 

The purpose of this project is to build an ETL pipeline that gathers all information about I94 records into a data lake allowing statistician and data scientist to be able to perform ad hoc data analysis and machine learning using the data. 


References:

https://www.uscis.gov/I-94information<br>
https://studyinthestates.dhs.gov/student-forms?form=Form_I-94

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Do all imports and installs here
import pandas as pd
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from datetime import datetime, timedelta
import configparser
import numpy as np
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
import re

pd.set_option("max_colwidth",1000000)
pd.set_option('max_columns', 15000)

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

The main dataset to be used in this project is the I94 immigration data provided by the US National Tourism and Trade Office. There are about 3 million I94 records being collected each month. To start up, I only consider the dataset from APRIL 2016, once the ETL went smoothly, I would like to ingest more data. All datasets exist in the workspace provided by Udacity, firstly I will upload all datasets onto Amazon S3. To process this big data, I would choose Spark to load data from S3, then manipulate and transform the data, finally store the output parquet format data back to S3. All data processing ETL will be deployed to Amazon EMR.   

These datasets will be collected, I will explore these datasets to determine whether they will be included in the data lake:
1. I94 Imigration Data in 2016
2. IATA Airlines codes
3. World Temperature Data
4. U.S. City Demographic Data
5. Airport Code Table
6. Code List of I94 Immigration Data




#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

I94 Immigration Data in 2016: This data comes from the US National Tourism and Trade Office. [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. This data is stored in 12 SAS tables of which each represents the month when the record was collected. Each table has about 3 million records. To test the ETL pipeline, we just use the table from April 2016.

World Temperature Data: This dataset came from Kaggle. You can read more about it [here](https://travel.trade.gov/research/reports/i94/historical/2016.html). This data is in a csv format

U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). This data is in a csv format

Airport Code Table: This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).

FMTOUT.csv: This file is generated from output_format.sas. It contains the code list of i94visa, i94mode, i94cit, i94res, i94addr, i94port

IATA Airlines Code: A dataset showing the IATA airline members, including the airline code and details. [Here]
(https://www.kaggle.com/guillaumekofi/iata-current-airline-members-2019#iata_airlines.csv) is where the data comes from.

#### I94 Immigration Data

In [3]:
# List all 12 I94 tables in 2016
for i in os.listdir('../../data/18-83510-I94-Data-2016/'):
    print(i)

i94_apr16_sub.sas7bdat
i94_sep16_sub.sas7bdat
i94_nov16_sub.sas7bdat
i94_mar16_sub.sas7bdat
i94_jun16_sub.sas7bdat
i94_aug16_sub.sas7bdat
i94_may16_sub.sas7bdat
i94_jan16_sub.sas7bdat
i94_oct16_sub.sas7bdat
i94_jul16_sub.sas7bdat
i94_feb16_sub.sas7bdat
i94_dec16_sub.sas7bdat


In [4]:
# Read in the I94 data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
i94 = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [5]:
i94.shape

(3096313, 28)

In [6]:
i94.head(5)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [7]:
i94.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3096313 entries, 0 to 3096312
Data columns (total 28 columns):
cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port     object
arrdate     float64
i94mode     float64
i94addr     object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile    object
visapost    object
occup       object
entdepa     object
entdepd     object
entdepu     object
matflag     object
biryear     float64
dtaddto     object
gender      object
insnum      object
airline     object
admnum      float64
fltno       object
visatype    object
dtypes: float64(13), object(15)
memory usage: 661.4+ MB


In [8]:
i94['i94yr'].value_counts(dropna=False)

2016.0    3096313
Name: i94yr, dtype: int64

#### FMTOUT.csv

In [9]:
fname = 'FMTOUT.csv'
fmtout = pd.read_csv(fname)

In [10]:
fmtout.head(2)

Unnamed: 0,FMTNAME,START,LABEL
0,I94CNTYL,0,INVALID: STATELESS
1,I94CNTYL,54,No Country Code (54)


#### airport-codes_csv

In [11]:
fname = 'airport-codes_csv.csv'
airport_code = pd.read_csv(fname)

In [12]:
airport_code.sample(5)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
22443,GB-0549,small_airport,Anniesfield Microlight Site,,EU,GB,GB-ENG,,,,,"0.699028, 51.751637"
54766,ZBUC,medium_airport,Ulanqab Jining Airport,,AS,CN,CN-15,Ulanqab,ZBUC,UCB,,"113.108056, 41.129722"
21508,FZAS,small_airport,Inkisi Airport,1968.0,AF,CD,CD-BC,Inkisi,FZAS,,,"15, -5.166999816894531"
19167,EPZR,small_airport,Å»ar Airport,1260.0,EU,PL,PL-SL,MiÄdzybrodzie Å»ywieckie,EPZR,,,"19.21809959411621, 49.77109909057617"
8415,7OH2,small_airport,Canal Fulton Airport,1150.0,,US,US-OH,Canal Fulton,7OH2,,7OH2,"-81.53369903564453, 40.900299072265625"


In [13]:
airport_code[airport_code.gps_code.str.contains("SFO",na = False)]

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
29940,KSFO,large_airport,San Francisco International Airport,13.0,,US,US-CA,San Francisco,KSFO,SFO,SFO,"-122.375, 37.61899948120117"
45938,SSFO,small_airport,Fazenda Novo Horizonte Airport,354.0,SA,BR,BR-MS,Miranda,SSFO,,,"-56.32027816772461, -20.039722442626953"


In [14]:
airport_code[airport_code.gps_code.str.contains("MSP",na = False)]

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
28131,KMSP,large_airport,Minneapolis-St Paul International/Wold-Chamberlain Airport,841.0,,US,US-MN,Minneapolis,KMSP,MSP,MSP,"-93.221802, 44.882"
33409,MMSP,medium_airport,Ponciano Arriaga International Airport,6035.0,,MX,MX-SLP,San Luis PotosÃ­,MMSP,SLP,,"-100.930999756, 22.254299163800003"
33946,MSPT,small_airport,El Platanar Airport,1354.0,,SV,SV-CU,Suchitoto,MSPT,,,"-89.06279754638672, 13.945300102233887"
53796,YMSP,small_airport,Mount Surprise Airport,,OC,AU,AU-QLD,,YMSP,,,"144.28500366210938, -18.126300811767578"


#### us cities demographics

In [15]:
fname = 'us-cities-demographics.csv'
us_city = pd.read_csv(fname,sep=";",header=0)

In [16]:
us_city.shape

(2891, 12)

#### Airlines

In [17]:
fname = 'iata_airlines.csv'
airlines= pd.read_csv(fname)

In [18]:
airlines.shape

(297, 5)

In [19]:
'''
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
'''

'\nfrom pyspark.sql import SparkSession\nspark = SparkSession.builder.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").enableHiveSupport().getOrCreate()\ndf_spark =spark.read.format(\'com.github.saurfang.sas.spark\').load(\'../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat\')\n'

In [20]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

##### I94 Immigration Data

In [21]:
# Check columns without missing value.
i94.columns[i94.isnull().mean()==0]

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94visa', 'count', 'admnum', 'visatype'],
      dtype='object')

In [22]:
# Check the columns that have highest percentage of missing values.
i94.isnull().mean().sort_values(ascending=False)

entdepu     9.998734e-01
occup       9.973756e-01
insnum      9.632763e-01
visapost    6.075775e-01
gender      1.337943e-01
i94addr     4.921079e-02
depdate     4.600859e-02
matflag     4.470769e-02
entdepd     4.470769e-02
airline     2.700857e-02
fltno       6.313638e-03
i94bir      2.590177e-04
biryear     2.590177e-04
dtaddto     1.540542e-04
i94mode     7.718858e-05
entdepa     7.686561e-05
dtadfile    3.229648e-07
i94cit      0.000000e+00
i94mon      0.000000e+00
i94port     0.000000e+00
i94yr       0.000000e+00
i94res      0.000000e+00
visatype    0.000000e+00
arrdate     0.000000e+00
i94visa     0.000000e+00
count       0.000000e+00
admnum      0.000000e+00
cicid       0.000000e+00
dtype: float64

In [23]:
# Check percentage of duplicates for each column
# admnum is in scientific format, let's convert it to integer first
i94['admnum'] = i94['admnum'].astype(int)

for i in i94.columns:
    result = i94[i].duplicated().mean()
    print("%s : %s" % (i,result))
    

cicid : 0.0
i94yr : 0.999999677035
i94mon : 0.999999677035
i94cit : 0.999921519562
i94res : 0.999926041069
i94port : 0.999903433535
arrdate : 0.999990311057
i94mode : 0.999998385176
i94addr : 0.999851759173
depdate : 0.999923780315
i94bir : 0.999963504982
i94visa : 0.999999031106
count : 0.999999677035
dtadfile : 0.999961890158
visapost : 0.99982850571
occup : 0.999963827946
entdepa : 0.999995478493
entdepd : 0.999995801458
entdepu : 0.999999031106
matflag : 0.99999935407
biryear : 0.999963504982
dtaddto : 0.999748733413
gender : 0.999998385176
insnum : 0.99938184544
airline : 0.999827213851
admnum : 0.00669635143475
fltno : 0.997689833037
visatype : 0.999994509599


In [24]:
# cicid has no duplicate, can be considered as a primary key.The admnum looks almost distinct.
# Let's check what is duplicated in admnum
i94_dup = i94[i94.duplicated(subset=['admnum'])==True]
i94_dup.shape

(20734, 28)

In [25]:
# Here is a sample about duplicated admnum

i94.loc[i94.admnum==92517099730,:]

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
52486,62220.0,2016.0,4.0,213.0,213.0,MAA,20545.0,1.0,CA,20546.0,38.0,1.0,1.0,20160401,BMB,,G,I,,M,1978.0,10012016,M,,EY,92517099730,171,B1
153389,282406.0,2016.0,4.0,213.0,213.0,MAA,20546.0,1.0,CA,20618.0,38.0,1.0,1.0,20160402,BMB,,U,O,,M,1978.0,10012016,M,,EY,92517099730,171,B1


In [26]:
# Looks like many zeros in admnum, I think it's a place holder if the admnum is missing.
result = i94.loc[i94.admnum==0,:]
result.head(5)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
1717020,3492908.0,2016.0,4.0,113.0,113.0,NYC,20546.0,1.0,CT,,39.0,2.0,1.0,20160419,,,A,,,,1977.0,10012016,M,5704,AZ,0,608.0,B2
2913107,5908904.0,2016.0,4.0,689.0,689.0,XXX,20555.0,1.0,CA,,67.0,2.0,1.0,20160503,RDJ,,A,,,,1949.0,10102016,F,5020,CM,0,441.0,B2
2913108,5908905.0,2016.0,4.0,689.0,689.0,XXX,20555.0,1.0,CA,,66.0,2.0,1.0,20160503,RDJ,,A,,,,1950.0,10102016,F,5020,CM,0,441.0,B2
2915879,5918805.0,2016.0,4.0,464.0,464.0,VIC,20550.0,9.0,WA,,19.0,2.0,1.0,20160513,,,A,,,,1997.0,7042016,F,3545,,0,,WT
2916873,5925193.0,2016.0,4.0,111.0,111.0,BUF,20563.0,1.0,SC,,40.0,2.0,1.0,20160520,,,A,,,,1976.0,7182016,F,4647,AC,0,,WT


In [27]:
# Print all duplicate admnum.
i94_all_dup = i94.loc[i94.admnum.isin(list(set(i94_dup.admnum.values))),:]
print(i94_all_dup.shape)

# Exclude the value when admnum is zero.
i94_all_dup = i94_all_dup.loc[i94_all_dup.admnum != 0,:]
print(i94_all_dup.shape)

(40108, 28)
(40040, 28)


In [28]:
# How many distinct admnum that has duplicates?
len(set(i94_all_dup.admnum.values))

19373

In [29]:
# Is a unique admnum issued to the same person?
result = i94_all_dup.drop_duplicates(subset=['admnum','biryear'],keep=False)
result.head(5)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
1245336,2534653.0,2016.0,4.0,209.0,209.0,LOS,20558.0,1.0,CA,20561.0,37.0,1.0,1.0,20160414,,,G,O,,M,1979.0,7122016,M,,JL,56136380833,60.0,WB
1711300,3486368.0,2016.0,4.0,110.0,110.0,DET,20545.0,9.0,MI,,63.0,2.0,1.0,20160419,,,A,,,,1953.0,6292016,F,6758.0,,43868540033,,WT
1711308,3486376.0,2016.0,4.0,110.0,110.0,DET,20545.0,9.0,MI,,39.0,2.0,1.0,20160419,,,A,,,,1977.0,6292016,M,6758.0,,43868557533,,WT
1791742,3649855.0,2016.0,4.0,999.0,110.0,LVG,20545.0,9.0,MI,,,2.0,1.0,20160419,,,A,,,,,6292016,U,7563.0,,43868531933,,WT
1799660,3658908.0,2016.0,4.0,111.0,316.0,HIG,20546.0,1.0,MA,,,2.0,1.0,20160420,,,A,,,,,6302016,U,2132.0,AH,42332806233,2700.0,WT


In [30]:
# The answer is No, see this example, the same admnum is issued to two persons whose birth year is various.
# The conclusion is admnum would not be considered as the primary key.
result.loc[result.admnum==56136380833,:]

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
1245336,2534653.0,2016.0,4.0,209.0,209.0,LOS,20558.0,1.0,CA,20561.0,37.0,1.0,1.0,20160414,,,G,O,,M,1979.0,7122016,M,,JL,56136380833,60,WB
2989732,6014949.0,2016.0,4.0,254.0,276.0,AGA,20568.0,1.0,GU,,25.0,2.0,1.0,20160617,,,A,,,,1991.0,6062016,F,3934.0,7C,56136380833,3154,GMT


In [31]:
# Check arrdate, it looks like close to evenly distributed, we can use arrdate as a partition key.
i94['arrdate'].value_counts(dropna=False)

20573.0    128267
20574.0    127155
20572.0    120971
20560.0    114970
20559.0    114803
20567.0    112883
20566.0    110304
20545.0    108407
20558.0    107557
20561.0    106474
20553.0    105930
20565.0    105454
20554.0    104394
20552.0    103660
20546.0    103196
20562.0    100493
20568.0    100203
20547.0     99972
20551.0     99763
20569.0     99652
20571.0     99259
20555.0     98737
20548.0     97653
20564.0     95428
20549.0     91514
20557.0     91173
20550.0     88273
20570.0     88100
20563.0     86068
20556.0     85600
Name: arrdate, dtype: int64

In [32]:
i94['airline'].value_counts(dropna=False)

AA     310091
UA     264271
DL     252526
BA     190997
LH     120556
VS     113384
NaN     83627
AF      81113
KE      71047
JL      69075
AM      60307
EK      55800
CM      49990
B6      49265
AV      48921
JJ      46277
LA      43111
QF      41945
NH      40665
KL      39978
SK      39802
DY      37932
Y4      35250
OZ      35043
LX      33727
CX      33096
HA      32154
QR      29189
MU      28717
TK      27567
        ...  
GT          1
N6          1
ATN         1
LZ          1
QA          1
18          1
0AZ         1
843         1
NRL         1
BE          1
B01         1
B1M         1
BC          1
T1G         1
RX          1
AJ          1
N9          1
0MT         1
DR          1
D0          1
L6K         1
U0C         1
OI          1
XLA         1
020         1
13          1
LU          1
IJ          1
DO          1
MM          1
Name: airline, Length: 535, dtype: int64

#### FMTOUT.csv

In [33]:
# We can split this file into multiple code lists by the variable name the code referred to
fmtout['FMTNAME'].value_counts(dropna=False)

I94PRTL     660
I94CNTYL    289
I94ADDRL     55
I94MODEL      4
I94VISA       3
Name: FMTNAME, dtype: int64

In [34]:
fmtout.loc[fmtout.FMTNAME=='I94PRTL',:].to_csv('i94port.csv',index=False)
fmtout.loc[fmtout.FMTNAME=='I94CNTYL',:].to_csv('i94cit_res.csv',index=False)
fmtout.loc[fmtout.FMTNAME=='I94ADDRL',:].to_csv('i94addr.csv',index=False)
fmtout.loc[fmtout.FMTNAME=='I94MODEL',:].to_csv('i94mode.csv',index=False)
fmtout.loc[fmtout.FMTNAME=='I94VISA',:].to_csv('i94visa.csv',index=False)

#### Airport codes

In [35]:
# Only consider US airports
airport_code_us = airport_code.loc[airport_code.iso_country=="US",:]

# Check the columns that have highest percentage of missing values.
airport_code_us.isnull().mean().sort_values(ascending=False)

continent       0.999956
iata_code       0.911280
gps_code        0.077910
local_code      0.066837
elevation_ft    0.010502
municipality    0.004482
coordinates     0.000000
iso_region      0.000000
iso_country     0.000000
name            0.000000
type            0.000000
ident           0.000000
dtype: float64

In [36]:
airport_code_us.head(5)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [37]:
# Check if iata_code equals to local_code

result = airport_code_us[airport_code_us.iata_code.notnull()]
print(result.shape)

(2019, 12)


In [38]:
# The answer is No, see this example
result[result.iata_code!=result.local_code].head(1)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
440,07FA,small_airport,Ocean Reef Club Airport,8.0,,US,US-FL,Key Largo,07FA,OCA,07FA,"-80.274803161621, 25.325399398804"


In [39]:
# Also, the i94port does not equal to iata_code, an example is:
# SFR is the entry port name of San Francisco, while the identical iata_code means San Fernando Airport in Los Angeles, rather than San Francisco International Airport.
# Hence, it's hard to find a key to connect with the I94 immigration table where I don't see any airport code information
# If I found the port of entry associated with the airport code, I would use that table instead.
# The conclusion is I would not include airport codes in the data lake to prevent such a bad matching mentioned above.
result[result.iata_code=="SFR"]

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
42481,SFR,closed,San Fernando Airport,1168.0,,US,US-CA,Los Angeles,,SFR,,"-118.422, 34.289"


In [40]:
# Another example is that Los Angeles has many airports, a port code LOS can indicate any airport in the city.
# Though we know it must be the international airport LAX but the data does not tell me.
# I need the port address to locate the port of entry, then I would know which airport it locates at.
result = airport_code_us[airport_code_us.municipality=="Los Angeles"]
result['name'].unique()

array(["Los Angeles County Sheriff's Department Heliport",
       'Drew Medical Center Heliport', 'Good Samaritan Hospital Heliport',
       'Devonshire Area Heliport', 'ABC-TV Heliport',
       'L A Co Mens Detention Cntr-Main Jail Heliport',
       'Litton Industries Heliport',
       'Childrens Hospital Los Angeles Heliport',
       'K & T 660 Figueroa Partners Heliport',
       'Dept. Of Water And Power Granada Hills Heliport',
       'Department Of Water & Power Los Angeles Heliport',
       'Los Angeles County/USC Medical Center Heliport',
       'J.H. Snyder Co. Iii Heliport',
       'Metropolitan Water District Heliport', 'Chase Plaza Heliport',
       'Raleigh Enterprises Heliport',
       'Los Angeles County Sheriffs Dept South LA Heliport',
       'Van Nuys County Court Heliport', 'UCLA Wilshire Glendon Heliport',
       'Wilshire Area Heliport', 'Parker Center Heliport',
       'AT&T Center Heliport', 'Lapd Hooper Heliport',
       'City National Bank Heliport',
       'Ran

#### US cities demographics

In [41]:
us_city.head(2)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723


In [42]:
# Check the city St. Paul
# It looks like the city name here is Saint Paul rather than ST PAUL which appears from the port code.
# City names here are not exactly matching the port codes. 
# It requires more data cleaning jobs in order to match all port location city.
# Indeed, I don't think the demographic dataset is meaningful information 
# Because people can enter US from a port but they can transfer a flight heading to their destination city.
# At this point, I would not include this dataset.
us_city[us_city.City.str.contains("Paul")]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
565,Saint Paul,Minnesota,31.5,149547.0,151293.0,300840,10548.0,56514.0,2.58,MN,Hispanic or Latino,27307
734,Saint Paul,Minnesota,31.5,149547.0,151293.0,300840,10548.0,56514.0,2.58,MN,Asian,58174
1195,Saint Paul,Minnesota,31.5,149547.0,151293.0,300840,10548.0,56514.0,2.58,MN,White,191369
1875,Saint Paul,Minnesota,31.5,149547.0,151293.0,300840,10548.0,56514.0,2.58,MN,Black or African-American,54665
2179,Saint Paul,Minnesota,31.5,149547.0,151293.0,300840,10548.0,56514.0,2.58,MN,American Indian and Alaska Native,6858


In [43]:
# Another example
# City name here is Pasco, but port code indicates it's TRI-CITIES- PASCO
# I don't think it's good to include this dataset into the data lake since the connection between city and port is not clear enough.
us_city[us_city.City.str.contains("Pasco")]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
274,Pasco,Washington,28.6,35298.0,32799.0,68097,2052.0,16265.0,3.45,WA,Asian,2215
1102,Pasco,Washington,28.6,35298.0,32799.0,68097,2052.0,16265.0,3.45,WA,Hispanic or Latino,38883
1196,Pasco,Washington,28.6,35298.0,32799.0,68097,2052.0,16265.0,3.45,WA,White,55741
2332,Pasco,Washington,28.6,35298.0,32799.0,68097,2052.0,16265.0,3.45,WA,Black or African-American,2265
2649,Pasco,Washington,28.6,35298.0,32799.0,68097,2052.0,16265.0,3.45,WA,American Indian and Alaska Native,1054


#### Airlines

In [44]:
airlines.sample(5)

Unnamed: 0,AirlineName,IATADesignator,3-DigitCode,ICAODesignator,Country/Territory
117,Egyptair,MS,77,MSR,Egypt
170,LATAM Airlines Argentina,4M,469,DSM,Argentina
287,Volotea,V7,712,VOE,Spain
280,Uzbekistan Airways,HY,250,UZB,Uzbekistan
216,Philippine Airlines,PR,79,PAL,Philippines


In [45]:
# The two-digit IATA code seems to match the airline code at I94 records
airlines[airlines['IATADesignator']=="AA"]

Unnamed: 0,AirlineName,IATADesignator,3-DigitCode,ICAODesignator,Country/Territory
52,American Airlines,AA,1,AAL,United States


In [46]:
# It looks like i94 airline is using IATA code, perhaps we can find them all from this code list.
# Though there are other 3-digit code I don't know what they mean, at least we can match the two-digit.
airlines['IATADesignator'].value_counts(dropna=False)

LH     2
G5     1
MP     1
PX     1
WX     1
KU     1
OS     1
DR     1
FV     1
AF     1
SK     1
YW     1
RH     1
SC     1
BV     1
5K     1
AW     1
BA     1
ZH     1
UP     1
TN     1
AS     1
KE     1
W8     1
PZ     1
LA     1
MF     1
KY     1
TW     1
GJ     1
      ..
4Z     1
SV     1
G9     1
M3     1
W7     1
OD     1
CV     1
CZ     1
UX     1
WE     1
BK     1
ES*    1
GX     1
TP     1
EB     1
ZE     1
PG     1
M4     1
V3     1
AH     1
WF     1
QY     1
5O     1
5C     1
PY     1
AZ     1
SU     1
XC     1
FU     1
EP     1
Name: IATADesignator, Length: 296, dtype: int64

In [47]:
airlines[airlines['IATADesignator']=='GT']

Unnamed: 0,AirlineName,IATADesignator,3-DigitCode,ICAODesignator,Country/Territory
23,Air Guilin,GT,730,CGH,China (People's Republic of)


#### Cleaning Steps
Document steps necessary to clean the data

#### i94port.csv

In [48]:
# Split the label into city and state
i94port = pd.read_csv("i94port.csv")

In [49]:
i94port['city'] = i94port['LABEL'].apply(lambda x: x.split(',')[0] if ',' in x else '')
i94port['state'] = i94port['LABEL'].apply(lambda x: x.split(',')[1] if ',' in x else '')

In [50]:
i94port.to_csv('i94port.csv',index=False)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

Star Schema model is chosen to be implemented in this project. The statistician and data scientist can make simple queries on the data and perform high level analysis. 

The fact table contains i94 immigration data extracted from the i94 immigration dataset:

##### i94

- id: A unique case Id combined with year, month, and cicid as primary key
- year: Year of arrival, extracted from arrival_date. 
- month: Month of arrival, extracted from arrival_date.
- day: Day of arrival, extracted from arrival_date. 
- citizen_code: 3 digit Visitor's citizenship code, extracted from i94cit.
- resident_code: 3 digit Visitor's country code of residence, extracted from i94res.
- port_code: 3 digit port of entry code, extracted from i94port.
- arrival_date: Date of arrival, converted from arrdate.
- mode_code: 1 digit code of entry mode, extracted from i94mode.
- address_state: The state where visitor stays? Extracted from i94addr.
- departure_date: Departure date, converted from depdate.
- age: Visitor's age, extracted from i94bir.
- visa_code: 1 digit visa category code, extracted from i94visa.
- birth_year: Visitor's birth year, extracted from biryear.
- gender: Visitor's gender, extracted from gender.
- airline_code: Airline code, extracted from airline.
- admission_number: Admission number, extracted from admnum.
- visa_type: Visa number, extracted from visatype.

These irrelevant variables will be dropped from the i94 immigration dataset: 

i94yr, i94mon: duplicated to the year and month extracted from arrival date.
count, dtadfile, visapost, entdepa, entdepd, entdepu, matflag, dtaddto, insnum, fltno: Information provded is unclear
occup: too many missing values.




Six dimension tables including airlines, ports, modes, countries, states, visas

##### airlines   (Extracted from iata_airlines dataset)

- iata_code: 2 digit IATA airpline code, extracted from IATADesignator.
- airline_name: Airline name, extracted from AirlineName.
- airline_country: Country of the airline, extracted from Country/Territory.

These irrelevant variables will be dropped from the airlines dataset:

3-DigitCode, ICAODesignator: other code information I don't need.

##### ports     (Extracted from i94port dataset)

- port_code: 3 digit port of entry code, extracted from START.
- city: The city where Port of enry locates, extracted from city.
- state: The state where Port of enry locates, extracted from state.

##### modes     (Extracted from i94mode dataset)

- mode_code: 1 digit mode of entry code, extracted from START.
- mode: mode of entry, extracted from LABEL.

##### countries     (Extracted from i94cit_res dataset)

- country_code:  3 digit country code, extracted from START.
- country: Country name, extracted from LABEL .    

##### states     (Extracted from i94addr dataset)

- address_state:  State abbreviation, extracted from START.
- state: Full state name, extracted from LABEL.   

##### visas     (Extracted from i94visa dataset)

- visa_code:  1 digit visa category code, extracted from START.
- visa: Visa category name, extracted from LABEL. 


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Run output_format.sas, generate FMTOUT.csv.
2. Split FMTOUT.csv into multiple i94 code CSVs, clean these CSVs as well.
3. Load the CSVs into Spark, clean the data if necessary, then select relevent columns.
4. Run a quality check the dimension tables.
5. Write all dimension tables to parquet file.
6. Load the I94 immigration dataset into Spark, clean the data, then select relevent columns.
7. Run a quality check the I94 immigration table, specially check the frequency of arrival date.
8. Write the I94 immigration table to parquet file partitioned by the arrival year, month, day.



### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [51]:
#spark = SparkSession \
#        .builder \
#        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
#        .getOrCreate()

In [52]:
spark = SparkSession \
         .builder \
         .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
         .enableHiveSupport() \
         .getOrCreate()

#### I94 immigration

In [53]:
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
i94 = spark.read.format('com.github.saurfang.sas.spark').load(fname)

In [54]:
i94.count()

3096313

In [55]:
i94.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [56]:
i94.describe()

DataFrame[summary: string, cicid: string, i94yr: string, i94mon: string, i94cit: string, i94res: string, i94port: string, arrdate: string, i94mode: string, i94addr: string, depdate: string, i94bir: string, i94visa: string, count: string, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: string, dtaddto: string, gender: string, insnum: string, airline: string, admnum: string, fltno: string, visatype: string]

In [57]:
i94.take(5)

[Row(cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94res=692.0, i94port='XXX', arrdate=20573.0, i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2'),
 Row(cicid=7.0, i94yr=2016.0, i94mon=4.0, i94cit=254.0, i94res=276.0, i94port='ATL', arrdate=20551.0, i94mode=1.0, i94addr='AL', depdate=None, i94bir=25.0, i94visa=3.0, count=1.0, dtadfile='20130811', visapost='SEO', occup=None, entdepa='G', entdepd=None, entdepu='Y', matflag=None, biryear=1991.0, dtaddto='D/S', gender='M', insnum=None, airline=None, admnum=3736796330.0, fltno='00296', visatype='F1'),
 Row(cicid=15.0, i94yr=2016.0, i94mon=4.0, i94cit=101.0, i94res=101.0, i94port='WAS', arrdate=20545.0, i94mode=1.0, i94addr='MI', depdate=20691.0, i94bir=55.0, i94visa=2.0, count=1.0, dtadfile=

In [58]:
# Calculate the arrival_date and departure_date
get_day = udf(lambda x: datetime(1960,1,1) + timedelta(days=int(x)) if x!=None else np.nan)


i94 = i94.withColumn("arrival_date", get_day(i94.arrdate))
i94 = i94.withColumn("departure_date", get_day(i94.depdate))

In [59]:
i94.show(n=1)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+--------------------+--------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|        arrival_date|departure_date|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+--------------------+--------------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null|1.897628485E9| null|      B2|java.util

In [63]:
i94.createOrReplaceTempView("i94_view")

i94_result = spark.sql(""" 
  select 
  
  concat(string(int(i94yr)),'-',string(int(i94mon)),'-',string(int(cicid))) as id,
  year(arrival_date) as year,
  month(arrival_date) as month,
  day(arrival_date) as day,
  int(i94cit) as citizen_code,
  int(i94res) as resident_code,
  i94port as port_code,
  arrival_date,
  departure_date,
  int(i94bir) as age,
  int(i94visa) as visa_code,
  int(biryear) as birth_year,
  gender,
  airline as airline_code,
  int(admnum) as admission_number,
  visatype as visa_type
  
  from i94_view
"""
  +""" TABLESAMPLE (.001 PERCENT)"""
)  
  


In [64]:
# write to parquet partitioned by year, month ,day
output_data = "data/"
i94_result.write.parquet(output_data+"i94.parquet",mode='overwrite',partitionBy=("year", "month","day"))

In [65]:
#test = spark.read.parquet('data/i94.parquet/year=__HIVE_DEFAULT_PARTITION__/month=__HIVE_DEFAULT_PARTITION__/day=__HIVE_DEFAULT_PARTITION__/part-00000-db8444a6-607d-44cf-81f0-07c9a628b510.c000.snappy.parquet')

#### airlines codes

In [66]:
fname = 'iata_airlines.csv'
airlines = spark.read.csv(fname, header=True)

In [67]:
airlines.show(5)

+--------------------+--------------+-----------+--------------+------------------+
|         AirlineName|IATADesignator|3-DigitCode|ICAODesignator| Country/Territory|
+--------------------+--------------+-----------+--------------+------------------+
|     Aegean Airlines|            A3|        390|           AEE|            Greece|
|          Aer Lingus|            EI|        053|           EIN|           Ireland|
|      Aero Republica|            P5|        845|           RPB|          Colombia|
|            Aeroflot|            SU|        555|           AFL|Russian Federation|
|Aerolineas Argent...|            AR|        044|           ARG|         Argentina|
+--------------------+--------------+-----------+--------------+------------------+
only showing top 5 rows



In [68]:
airlines.createOrReplaceTempView("airlines_view")

result = spark.sql(""" 
  select 
  
  IATADesignator as iata_code,
  AirlineName as airline_name,
  `Country/Territory` as airline_country
  
  from airlines_view 
 
""")  
  

In [69]:
output_data = "data/"
result.write.parquet(output_data+"airlines.parquet",mode='overwrite')

#### ports

In [70]:
fname = 'i94port.csv'
ports = spark.read.csv(fname, header=True)

In [71]:
ports.createOrReplaceTempView("ports_view")

result = spark.sql(""" 
  select distinct
  
  START as port_code,
  city,
  state
  
  from ports_view
 
""")  

In [72]:
output_data = "data/"
result.write.parquet(output_data+"ports.parquet",mode='overwrite')

#### modes

In [75]:
fname = 'i94mode.csv'
modes = spark.read.csv(fname, header=True)

In [76]:
modes.createOrReplaceTempView("modes_view")

result = spark.sql(""" 
  select distinct
  
  START as mode_code,
  LABEL as mode
  
  from modes_view
 
""")  

In [77]:
output_data = "data/"
result.write.parquet(output_data+"modes.parquet",mode='overwrite')

#### countries

In [78]:
fname = 'i94cit_res.csv'
countries = spark.read.csv(fname, header=True)

countries.createOrReplaceTempView("countries_view")

result = spark.sql(""" 
  select distinct
  
  START as country_code,
  LABEL as country
  
  from countries_view
 
""") 

output_data = "data/"
result.write.parquet(output_data+"countries.parquet",mode='overwrite')

#### states

In [79]:
fname = 'i94addr.csv'
states = spark.read.csv(fname, header=True)

states.createOrReplaceTempView("states_view")

result = spark.sql(""" 
  select distinct
  
  START as address_state,
  LABEL as state
  
  from states_view
 
""") 

output_data = "data/"
result.write.parquet(output_data+"states.parquet",mode='overwrite')

#### visas

In [80]:
fname = 'i94visa.csv'
visas = spark.read.csv(fname, header=True)

visas.createOrReplaceTempView("visas_view")

result = spark.sql(""" 
  select distinct
  
  START as visa_code,
  LABEL as visa
  
  from visas_view
 
""") 

output_data = "data/"
result.write.parquet(output_data+"visas.parquet",mode='overwrite')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [81]:
# Perform quality checks here
# Check the frequency of arrival date, we expect the number of visitors is close to evenly distributed for each day in a month

i94_result.createOrReplaceTempView("i94_view")

result = spark.sql(""" 
  select arrival_date, count(*)
  from i94_view
  group by arrival_date order by arrival_date
 
""")
result.show()

+--------------------+--------+
|        arrival_date|count(1)|
+--------------------+--------+
|java.util.Gregori...|       2|
|java.util.Gregori...|       3|
|java.util.Gregori...|       1|
|java.util.Gregori...|       2|
|java.util.Gregori...|       1|
|java.util.Gregori...|       2|
|java.util.Gregori...|       3|
|java.util.Gregori...|       1|
|java.util.Gregori...|       1|
|java.util.Gregori...|       1|
|java.util.Gregori...|       2|
|java.util.Gregori...|       3|
|java.util.Gregori...|       2|
|java.util.Gregori...|       3|
|java.util.Gregori...|       3|
|java.util.Gregori...|       3|
|java.util.Gregori...|       2|
|java.util.Gregori...|       2|
|java.util.Gregori...|       4|
|java.util.Gregori...|       1|
+--------------------+--------+
only showing top 20 rows



In [82]:
result.collect()

[Row(arrival_date='java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2016,MONTH=3,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]', count(1)=2),
 Row(arrival_date='java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2016,MONTH=3,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=10,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]', count(1)=3),
 Row(arrival_date='ja

In [83]:
result2 = result.collect()
result2[0][0]

'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2016,MONTH=3,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]'

In [84]:
# Let's make a better view for this frequency table.
for line in result2:
    mdy = re.findall('YEAR=(\d+).+MONTH=(\d+).+DAY_OF_MONTH=(\d+)',line[0],flags=0)
    count= line[1]
    year = mdy[0][0]
    month = str(int(mdy[0][1])+1)
    day = mdy[0][2]
    
    print(f"{year}-{month}-{day}: {count}")

2016-4-1: 2
2016-4-10: 3
2016-4-11: 1
2016-4-12: 2
2016-4-13: 1
2016-4-14: 2
2016-4-15: 3
2016-4-16: 1
2016-4-17: 1
2016-4-18: 1
2016-4-2: 2
2016-4-20: 3
2016-4-21: 2
2016-4-22: 3
2016-4-23: 3
2016-4-24: 3
2016-4-28: 2
2016-4-29: 2
2016-4-30: 4
2016-4-4: 1
2016-4-6: 2
2016-4-7: 3
2016-4-8: 1


In [85]:
#re.findall('YEAR=(\d+).+MONTH=(\d+).+DAY_OF_MONTH=(\d+)',result2[0][0],flags=0)

In [86]:
i94_result.createOrReplaceTempView("i94_view")

result2 = spark.sql(""" 
  select count(*) 
  from i94_view TABLESAMPLE (.01 PERCENT)
 
""")
result2.show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up

#### Clearly state the rationale for the choice of tools and technologies for the project.

- Stored the project data in the cloud based Amazon S3, which has high performance and scalability, as well as easier connected with other AWS tools.

- Spark is chosen to perform massive parallel data processing.

- Python is being used in every step, because Python has powerful open source packages, which enable data analysis and manipulation, as well as a complete API access to various platforms, such as Spark, AWS, Linux. 

- The ETL process will be run on Amazon EMR, a clouded based cluster that has Hadoop/Spark pre-installed, can be easily running the analysis script without worrying the cluster's configuration. Also, a better connection performance with its own storage product S3 where we store the raw data.



####  Propose how often the data should be updated and why.

- Assumed the data is being collected monthly in SAS format, hence it's reasonable to update the data monthly. In fact, the data processing ETL on the cloud just takes a few minutes to be finished. 


#### Write a description of how you would approach the problem differently under the following scenarios:


#### The data was increased by 100x.
 
- This can refer to a scenario that we need to process 10 years of I94 Immigration data. As Spark can be used on massive parallel processing, as long as we increase the number of nodes by hiring more machines when deploying to EMR, this problem will be resolved. Currently I use 4 notes on EMR, if I increase the note number to 40, the performance will increase likely 10x.  
 
#### The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
- The ELT pipeline can be done in a few minutes, there is enough time to review the job workflow and report everyday. It should not be a problem running the workflow everyday.
 
#### The database needed to be accessed by 100+ people.
 
- Assumed these people just make a query to the S3 occasionally, then we can consider using Amazon Athena. It allows users to perform SQL query on the S3 data. Athena API has a limit number of calls varying from 5 to 100 per second depended on the API name being called, can burst up to 10 to 200 calls per second. I think it's reasonable for the capacity of a hundred users.   