# US I94 immigration data Spark ETL
### Data Engineering Capstone Project

#### Project Summary
* This projects aims to empower analytics team to analyze the US I94 immigration data with enriched U.S. City Demographic data and Airport Codes data. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [198]:
# Do all imports and installs here
import pandas as pd
import datetime
from pyspark.sql.types import StringType
from pyspark.sql.types import TimestampType
from pyspark.sql.types import DecimalType
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.functions import monotonically_increasing_id

In [199]:
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)

### Step 1: Scope the Project and Gather Data

#### Scope 
* This projects aims to empower analytics team to analyze the US I94 immigration data with enriched U.S. City Demographic data and Airport Codes data. 

* The ETL pipeline data analysis process is scaled up through the use of big data processing framework -- Spark, which achives high performance when processing analytic workloads on big data sets. 

* A set of star schema analytics tables are created to give users the ability to perform simple query with less joins and fast aggregations. The analytics tables also allow analytics team to continue finding insights on the immigration data.


#### Describe and Gather Data 
* The immigration dataset comes from the US National Tourism and Trade Office [I-94 Record](https://travel.trade.gov/research/reports/i94/historical/2016.html/). The dataset is partitioned by month and year. Each file is in sas7bdat format and contains metadata about an immigration record.

* The demographic dataset comes from OpenSoft [US Cities: Demographics](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). The file is in CSV format and contains data about about the demographics of all US cities.

* The airport code dataset comes from Datahub [Airport Codes](https://datahub.io/core/airport-codes#data/). The file is in CSV format and simple table of airport codes and corresponding cities.

In [138]:
immigration = pd.read_csv('immigration_data_sample.csv')
immigration.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [139]:
airport = pd.read_csv('airport-codes_csv.csv')
airport.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [140]:
city = pd.read_csv('us-cities-demographics.csv', sep=';')
city.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


### Step 1.1: Read data into Spark DataFrame

In [14]:
	
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [49]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

#### Step 1.1.1: Read immigration data

In [15]:
immigration_data=spark.read.parquet("sas_data")

In [16]:
immigration_data.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,1.0,20160430,SYD,,G,O,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,1.0,1.0,20160430,SYD,,G,O,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,1.0,1.0,20160430,SYD,,G,O,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


In [17]:
immigration_data.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [18]:
immigration_data.count()

3096313

#### Step 1.1.2: Read airport data

In [19]:
airport_data = spark.read.csv('airport-codes_csv.csv', header=True)

In [20]:
airport_data.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


#### Step 1.1.3: Read city data

In [21]:
city_data = spark.read.csv('us-cities-demographics.csv', sep=';', header=True)

In [22]:
city_data.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
1. airport_data cleaning: 
 * remove records without iata_code
 * filter on airport country US ONLY
 * dedupe based on iata_code
2. city_data cleaning:
 * dedupe to one city per row 
 * remove records withourt city name

#### 2.1 Clean airport_data

In [24]:
airport_data_clean = airport_data.filter(airport_data.iso_country == 'US').filter(airport_data.iata_code.isNotNull()).dropDuplicates(['iata_code'])

In [25]:
airport_data_clean.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,KBGM,medium_airport,Greater Binghamton/Edwin A Link field,1636,,US,US-NY,Binghamton,KBGM,BGM,BGM,"-75.97979736, 42.20869827"
1,2TE0,small_airport,Eagle Air Park,15,,US,US-TX,Brazoria,2TE0,BZT,2TE0,"-95.579696655273, 28.982200622559"
2,KCNU,medium_airport,Chanute Martin Johnson Airport,1002,,US,US-KS,Chanute,KCNU,CNU,CNU,"-95.4850997925, 37.668800354"
3,KCRS,small_airport,C David Campbell Field Corsicana Municipal Air...,449,,US,US-TX,Corsicana,KCRS,CRS,CRS,"-96.4005966187, 32.0280990601"
4,KFMY,medium_airport,Page Field,17,,US,US-FL,Fort Myers,KFMY,FMY,FMY,"-81.86329650879999, 26.58659935"


In [26]:
airport_data_clean.count()

2014

#### 2.2. Clean city_data

In [57]:
city_data_clean = city_data.dropDuplicates(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code']).filter(city_data.City.isNotNull())

In [58]:
city_data_clean.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Johnson City,Tennessee,38.2,31019,34350,65369,5038,2878,2.18,TN,Asian,1877
1,San Clemente,California,45.2,34076,31456,65532,3970,8109,2.64,CA,Black or African-American,251
2,Redwood City,California,37.1,42676,42624,85300,2430,27652,2.64,CA,Black or African-American,2758
3,West Palm Beach,Florida,39.6,49262,57520,106782,4917,30675,2.53,FL,White,60648
4,San Diego,California,34.5,693826,701081,1394907,92489,373842,2.73,CA,White,949388


In [59]:
city_data_clean.count()

596

In [60]:
city_data_clean.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Star-schema
* Fact Table: fact_immigration
* Dimension Tables: dim_airport, dim_city, dim_state, dim_date, dim_model, dim_visa

##### Project uses the Star Schema to give users the ability to perform simple query with less joins and fast aggregations. The Fact Table capture full immigration records. Using this table, analyst can relate and analyze six dimensions aiport, city, state, date, model_type and visa_code.

### Tables:
| table name | columns | description | type |
| ------- | ---------- | ----------- | ---- |
| fact_immigration | 'immigration_id', 'immigration_airport_id', 'immigration_arr_sas_date', 'immigration_dep_sas_date', 'immigration_model_type_id', 'immigration_addr_state_code', 'immigration_visa_code_id', 'immigration_age', 'immigration_match_flag', 'immigration_birth_year', 'immigration_gender', 'immigration_ins_number', 'immigration_airline', 'immigration_admission_number', 'immigration_flight_number', 'immigration_visa_type'| stores all i94 immigrations data | fact table |
| dim_airport | 'airport_id', 'airport_name', 'airpot_type', 'airport_state_code', 'airpot_city', 'airport_local_code', 'airport_coordinates' | stores information related to airports | dimension table |
| dim_city | 'city_id', 'city_name', 'city_state_name', 'city_state_code', 'city_median_age', 'city_male_population', 'city_female_population', 'city_total_population', 'city_veterans', 'city_foreign_born', 'city_household_size' | stores demographics data for cities | fact table |
| dim_state | 'state_code', 'state_name', 'state_male_population', 'state_female_population', 'state_total_population', 'state_veterans', 'state_foreign_born' | stores demographics data for states | dimension table |
| dim_date | 'sas_date', 'date_timestamp', 'date_day', 'date_month', 'date_year', 'date_weekday', 'date_week'] | stores date data | dimension table |
| dim_model_type | 'model_type_id', 'model_type_name' | stores immigration model type information | dimension table |
| dim_visa_code | 'visa_code_id', 'visa_code_name' | stores immigration visa code information | dimension table |



#### 3.2 Mapping Out Data Pipelines
1. The ETL pipeline extracts data from 3 different data sources
2. Processes the data using Spark
3. Transforms the data into star schema analytics tables

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

### 4.1.1 dim_airport table

In [46]:
airport_data_clean.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,KBGM,medium_airport,Greater Binghamton/Edwin A Link field,1636,,US,US-NY,Binghamton,KBGM,BGM,BGM,"-75.97979736, 42.20869827"
1,2TE0,small_airport,Eagle Air Park,15,,US,US-TX,Brazoria,2TE0,BZT,2TE0,"-95.579696655273, 28.982200622559"
2,KCNU,medium_airport,Chanute Martin Johnson Airport,1002,,US,US-KS,Chanute,KCNU,CNU,CNU,"-95.4850997925, 37.668800354"
3,KCRS,small_airport,C David Campbell Field Corsicana Municipal Air...,449,,US,US-TX,Corsicana,KCRS,CRS,CRS,"-96.4005966187, 32.0280990601"
4,KFMY,medium_airport,Page Field,17,,US,US-FL,Fort Myers,KFMY,FMY,FMY,"-81.86329650879999, 26.58659935"


In [47]:
dim_airport = airport_data_clean.select(['iata_code', 'name', 'type', 'iso_region', 'municipality', 'local_code', 'coordinates'])\
.dropDuplicates(['iata_code'])

In [48]:
dim_airport = dim_airport.withColumnRenamed('iata_code','airport_id')\
.withColumnRenamed('name','airport_name')\
.withColumnRenamed('type','airpot_type')\
.withColumnRenamed('iso_region','iso_region')\
.withColumnRenamed('municipality','airpot_city')\
.withColumnRenamed('local_code','airport_local_code')\
.withColumnRenamed('coordinates','airport_coordinates')

In [50]:
extact_state = udf(lambda x: x[3:], StringType())

In [51]:
dim_airport = dim_airport.withColumn('airport_state_code', extact_state('iso_region'))

In [52]:
dim_airport = dim_airport.drop('iso_region')

In [53]:
dim_airport.printSchema()

root
 |-- airport_id: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- airpot_type: string (nullable = true)
 |-- airpot_city: string (nullable = true)
 |-- airport_local_code: string (nullable = true)
 |-- airport_coordinates: string (nullable = true)
 |-- airport_state_code: string (nullable = true)



In [54]:
dim_airport = dim_airport.select(['airport_id', 'airport_name', 'airpot_type', 'airport_state_code', 'airpot_city', 'airport_local_code', 'airport_coordinates'])

In [55]:
dim_airport.limit(5).toPandas()

Unnamed: 0,airport_id,airport_name,airpot_type,airport_state_code,airpot_city,airport_local_code,airport_coordinates
0,BGM,Greater Binghamton/Edwin A Link field,medium_airport,NY,Binghamton,BGM,"-75.97979736, 42.20869827"
1,BZT,Eagle Air Park,small_airport,TX,Brazoria,2TE0,"-95.579696655273, 28.982200622559"
2,CNU,Chanute Martin Johnson Airport,medium_airport,KS,Chanute,CNU,"-95.4850997925, 37.668800354"
3,CRS,C David Campbell Field Corsicana Municipal Air...,small_airport,TX,Corsicana,CRS,"-96.4005966187, 32.0280990601"
4,FMY,Page Field,medium_airport,FL,Fort Myers,FMY,"-81.86329650879999, 26.58659935"


### 4.1.2 dim_city table

In [61]:
city_data_clean.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Johnson City,Tennessee,38.2,31019,34350,65369,5038,2878,2.18,TN,Asian,1877
1,San Clemente,California,45.2,34076,31456,65532,3970,8109,2.64,CA,Black or African-American,251
2,Redwood City,California,37.1,42676,42624,85300,2430,27652,2.64,CA,Black or African-American,2758
3,West Palm Beach,Florida,39.6,49262,57520,106782,4917,30675,2.53,FL,White,60648
4,San Diego,California,34.5,693826,701081,1394907,92489,373842,2.73,CA,White,949388


In [62]:
dim_city = city_data_clean.select(['City', 'State', 'State Code','Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size'])

In [63]:
dim_city = dim_city.withColumnRenamed('City','city_name')\
.withColumnRenamed('State','city_state_name')\
.withColumnRenamed('State Code','city_state_code')\
.withColumnRenamed('Median Age','city_median_age')\
.withColumnRenamed('Male Population','city_male_population')\
.withColumnRenamed('Female Population','city_female_population')\
.withColumnRenamed('Total Population','city_total_population')\
.withColumnRenamed('Number of Veterans','city_veterans')\
.withColumnRenamed('Foreign-born','city_foreign_born')\
.withColumnRenamed('Average Household Size','city_household_size')

In [65]:
dim_city = dim_city.withColumn('city_id',monotonically_increasing_id())

In [66]:
dim_city = dim_city.select(['city_id', 'city_name', 'city_state_name', 'city_state_code', 'city_median_age',\
                            'city_male_population', 'city_female_population','city_total_population','city_veterans',\
                            'city_foreign_born', 'city_household_size'])

In [68]:
dim_city = dim_city.withColumn('city_median_age', dim_city['city_median_age'].cast(DecimalType()))
dim_city = dim_city.withColumn('city_male_population', dim_city['city_male_population'].cast(DecimalType()))
dim_city = dim_city.withColumn('city_female_population', dim_city['city_female_population'].cast(DecimalType()))
dim_city = dim_city.withColumn('city_total_population', dim_city['city_total_population'].cast(DecimalType()))
dim_city = dim_city.withColumn('city_veterans', dim_city['city_veterans'].cast(DecimalType()))
dim_city = dim_city.withColumn('city_foreign_born', dim_city['city_foreign_born'].cast(DecimalType()))
dim_city = dim_city.withColumn('city_household_size', dim_city['city_household_size'].cast(DecimalType()))

In [69]:
dim_city.limit(5).toPandas()

Unnamed: 0,city_id,city_name,city_state_name,city_state_code,city_median_age,city_male_population,city_female_population,city_total_population,city_veterans,city_foreign_born,city_household_size
0,0,Johnson City,Tennessee,TN,38,31019,34350,65369,5038,2878,2
1,8589934592,San Clemente,California,CA,45,34076,31456,65532,3970,8109,3
2,8589934593,Redwood City,California,CA,37,42676,42624,85300,2430,27652,3
3,17179869184,West Palm Beach,Florida,FL,40,49262,57520,106782,4917,30675,3
4,17179869185,San Diego,California,CA,35,693826,701081,1394907,92489,373842,3


### 4.1.3 dim_state table

In [147]:
dim_city.printSchema()

root
 |-- city_id: long (nullable = false)
 |-- city_name: string (nullable = true)
 |-- city_state_name: string (nullable = true)
 |-- city_state_code: string (nullable = true)
 |-- city_median_age: decimal(10,0) (nullable = true)
 |-- city_male_population: decimal(10,0) (nullable = true)
 |-- city_female_population: decimal(10,0) (nullable = true)
 |-- city_total_population: decimal(10,0) (nullable = true)
 |-- city_veterans: decimal(10,0) (nullable = true)
 |-- city_foreign_born: decimal(10,0) (nullable = true)
 |-- city_household_size: decimal(10,0) (nullable = true)



In [148]:
dim_state = dim_city.groupBy('city_state_code','city_state_name').sum('city_male_population', 'city_female_population',\
                                                          'city_total_population','city_veterans','city_foreign_born')

In [151]:
dim_state = dim_state.withColumnRenamed('city_state_code','state_code')\
.withColumnRenamed('city_state_name','state_name')\
.withColumnRenamed('sum(city_state_name)','state_name')\
.withColumnRenamed('sum(city_male_population)','state_male_population')\
.withColumnRenamed('sum(city_female_population)','state_female_population')\
.withColumnRenamed('sum(city_total_population)','state_total_population')\
.withColumnRenamed('sum(city_veterans)','state_veterans')\
.withColumnRenamed('sum(city_foreign_born)','state_foreign_born')

In [152]:
dim_state.limit(5).toPandas()

Unnamed: 0,state_code,state_name,state_male_population,state_female_population,state_total_population,state_veterans,state_foreign_born
0,MT,Montana,87707,93587,181294,13854,5977
1,NC,North Carolina,1466105,1594094,3060199,166146,379327
2,MD,Maryland,627951,684178,1312129,64143,229794
3,CO,Colorado,1454619,1481050,2935669,187896,337631
4,CT,Connecticut,432157,453424,885581,24953,225866


### 4.1.4 dim_date table

In [74]:
immigration_data.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,1.0,20160430,SYD,,G,O,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,1.0,1.0,20160430,SYD,,G,O,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,1.0,1.0,20160430,SYD,,G,O,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


In [75]:
arrdate = immigration_data.select(['arrdate'])

In [76]:
depdate = immigration_data.select(['depdate'])

In [77]:
dim_date = arrdate.union(depdate).dropDuplicates(['arrdate'])

In [78]:
dim_date.count()

236

In [79]:
dim_date.limit(5).toPandas()

Unnamed: 0,arrdate
0,20593.0
1,20689.0
2,20673.0
3,20467.0
4,20652.0


In [81]:
epoch = datetime.datetime(1960, 1, 1)
get_timestamp = udf(lambda x: epoch + datetime.timedelta(days=x), TimestampType())

In [82]:
dim_date = dim_date.withColumn('date_timestamp', get_timestamp('arrdate'))

In [83]:
dim_date.limit(5).toPandas()

Unnamed: 0,arrdate,date_timestamp
0,20593.0,2016-05-19
1,20689.0,2016-08-23
2,20673.0,2016-08-07
3,20467.0,2016-01-14
4,20652.0,2016-07-17


In [84]:
dim_date = dim_date.withColumn('date_day', F.dayofmonth('date_timestamp'))\
                        .withColumn('date_month', F.month('date_timestamp'))\
                        .withColumn('date_year', F.year('date_timestamp'))\
                        .withColumn('date_weekday', F.dayofweek('date_timestamp'))\
                        .withColumn('date_week', F.weekofyear('date_timestamp'))

In [85]:
dim_date = dim_date.withColumnRenamed('arrdate','sas_date')

In [86]:
dim_date.limit(5).toPandas()

Unnamed: 0,sas_date,date_timestamp,date_day,date_month,date_year,date_weekday,date_week
0,20593.0,2016-05-19,19,5,2016,5,20
1,20689.0,2016-08-23,23,8,2016,3,34
2,20673.0,2016-08-07,7,8,2016,1,31
3,20467.0,2016-01-14,14,1,2016,5,2
4,20652.0,2016-07-17,17,7,2016,1,28


### 4.1.5 dim_model_type table

In [127]:
dim_model_type = immigration_data.select(['i94mode']).filter(immigration_data.i94mode.isNotNull()).dropDuplicates()

In [128]:
dim_model_type.limit(5).toPandas()

Unnamed: 0,i94mode
0,1.0
1,3.0
2,2.0
3,9.0


In [130]:
get_model_name = udf(lambda x: 'Air' if x == 1 else ('Sea' if x == 2 else ('Land' if x == 3 else 'Not reported')), StringType())

In [131]:
dim_model_type = dim_model_type.withColumn('model_type_name', get_model_name('i94mode')).withColumnRenamed('i94mode','model_type_id')

In [132]:
dim_model_type.toPandas()

Unnamed: 0,model_type_id,model_type_name
0,1.0,Air
1,3.0,Land
2,2.0,Sea
3,9.0,Not reported


### 4.1.6 dim_visa_code table

In [133]:
dim_visa_code = immigration_data.select('I94VISA').dropDuplicates()

In [134]:
get_visa_name = udf(lambda x: 'Business' if x == 1 else ('Pleasure' if x == 2 else 'Student'), StringType())

In [135]:
dim_visa_code = dim_visa_code.withColumn('visa_code_name',get_visa_name('I94VISA'))\
.withColumnRenamed('I94VISA', 'visa_code_id')

In [136]:
dim_visa_code.toPandas()

Unnamed: 0,visa_code_id,visa_code_name
0,1.0,Business
1,3.0,Student
2,2.0,Pleasure


### 4.1.7 fact_immigration table

In [97]:
immigration_data.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,1.0,20160430,SYD,,G,O,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,1.0,1.0,20160430,SYD,,G,O,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,1.0,1.0,20160430,SYD,,G,O,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


In [98]:
fact_immigration = immigration_data.select(['cicid', 'i94port', 'arrdate', 'depdate','i94mode','i94addr','i94visa', 'i94bir', 'matflag',\
                                           'biryear', 'gender', 'insnum', 'airline', 'admnum','fltno','visatype'])

In [99]:
fact_immigration = fact_immigration.withColumnRenamed('cicid','immigration_id')\
.withColumnRenamed('i94port','immigration_airport_id')\
.withColumnRenamed('arrdate','immigration_arr_sas_date')\
.withColumnRenamed('depdate','immigration_dep_sas_date')\
.withColumnRenamed('i94mode','immigration_model_type_id')\
.withColumnRenamed('i94addr','immigration_addr_state_code')\
.withColumnRenamed('i94visa','immigration_visa_code_id')\
.withColumnRenamed('i94bir','immigration_age')\
.withColumnRenamed('matflag','immigration_match_flag')\
.withColumnRenamed('biryear','immigration_birth_year')\
.withColumnRenamed('gender','immigration_gender')\
.withColumnRenamed('insnum','immigration_ins_number')\
.withColumnRenamed('airline','immigration_airline')\
.withColumnRenamed('admnum','immigration_admission_number')\
.withColumnRenamed('fltno','immigration_flight_number')\
.withColumnRenamed('visatype','immigration_visa_type')

In [100]:
fact_immigration.limit(5).toPandas()

Unnamed: 0,immigration_id,immigration_airport_id,immigration_arr_sas_date,immigration_dep_sas_date,immigration_model_type_id,immigration_addr_state_code,immigration_visa_code_id,immigration_age,immigration_match_flag,immigration_birth_year,immigration_gender,immigration_ins_number,immigration_airline,immigration_admission_number,immigration_flight_number,immigration_visa_type
0,5748517.0,LOS,20574.0,20582.0,1.0,CA,1.0,40.0,M,1976.0,F,,QF,94953870000.0,11,B1
1,5748518.0,LOS,20574.0,20591.0,1.0,NV,1.0,32.0,M,1984.0,F,,VA,94955620000.0,7,B1
2,5748519.0,LOS,20574.0,20582.0,1.0,WA,1.0,29.0,M,1987.0,M,,DL,94956410000.0,40,B1
3,5748520.0,LOS,20574.0,20588.0,1.0,WA,1.0,29.0,M,1987.0,F,,DL,94956450000.0,40,B1
4,5748521.0,LOS,20574.0,20588.0,1.0,WA,1.0,28.0,M,1988.0,M,,DL,94956390000.0,40,B1


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

### 4.2.1 Quality Check on Unique Key

In [192]:
def check_unique_key(table, unique_key):
    if table.count() > table.dropDuplicates([unique_key]).count():
        raise ValueError('Table has duplicates')
    else:
        print('Table pass unique key data quality check')

In [193]:
check_unique_key(fact_immigration, 'immigration_id')

Table pass unique key data quality check


In [194]:
check_unique_key(dim_airport, 'airport_id')

Table pass unique key data quality check


In [195]:
check_unique_key(dim_city, 'city_id')

Table pass unique key data quality check


In [196]:
check_unique_key(dim_state, 'state_code')

Table pass unique key data quality check


In [197]:
check_unique_key(dim_date, 'sas_date')

Table pass unique key data quality check


### 4.2.2 Quality Check on analytics query

In [114]:
fact_immigration.createOrReplaceTempView('fact_immigration')
dim_airport.createOrReplaceTempView('dim_airport')
dim_date.createOrReplaceTempView('dim_date')

In [125]:
number_of_immigration_records = spark.sql('''
                    SELECT 
                    da.airport_name,
                    dd.date_month,
                    COUNT(fm.immigration_id) AS number_of_immigration_records
                    FROM fact_immigration fm
                    LEFT JOIN dim_airport da ON fm.immigration_airport_id = da.airport_id
                    LEFT JOIN dim_date dd ON fm.immigration_arr_sas_date = dd.sas_date
                    WHERE da.airport_name IS NOT NULL
                    GROUP BY  da.airport_name, dd.date_month
                    ORDER BY number_of_immigration_records DESC''')

In [126]:
number_of_immigration_records.limit(10).toPandas()

Unnamed: 0,airport_name,date_month,number_of_immigration_records
0,Miami International Airport,4,343941
1,San Fernando Airport,4,152586
2,Orlando Executive Airport,4,149195
3,Lakefront Airport,4,136122
4,William P Hobby Airport,4,101481
5,Hartsfield Jackson Atlanta International Airport,4,92579
6,Dallas Love Field,4,71809
7,General Edward Lawrence Logan International Ai...,4,57354
8,Seattle Tacoma International Airport,4,47719
9,Point Hope Airport,4,38890


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

* Fact Table: fact_immigration
* Dimension Tables: dim_airport, dim_city, dim_state, dim_date, dim_model_type, dim_visa_code

### Tables:
| table name | columns | description | type |
| ------- | ---------- | ----------- | ---- |
| fact_immigration | 'immigration_id', 'immigration_airport_id', 'immigration_arr_sas_date', 'immigration_dep_sas_date', 'immigration_model_type_id', 'immigration_addr_state_code', 'immigration_visa_code_id', 'immigration_age', 'immigration_match_flag', 'immigration_birth_year', 'immigration_gender', 'immigration_ins_number', 'immigration_airline', 'immigration_admission_number', 'immigration_flight_number', 'immigration_visa_type'| stores all i94 immigrations data | fact table |
| dim_airport | 'airport_id', 'airport_name', 'airpot_type', 'airport_state_code', 'airpot_city', 'airport_local_code', 'airport_coordinates' | stores information related to airports | dimension table |
| dim_city | 'city_id', 'city_name', 'city_state_name', 'city_state_code', 'city_median_age', 'city_male_population', 'city_female_population', 'city_total_population', 'city_veterans', 'city_foreign_born', 'city_household_size' | stores demographics data for cities | fact table |
| dim_state | 'state_code', 'state_name', 'state_male_population', 'state_female_population', 'state_total_population', 'state_veterans', 'state_foreign_born' | stores demographics data for states | dimension table |
| dim_date | 'sas_date', 'date_timestamp', 'date_day', 'date_month', 'date_year', 'date_weekday', 'date_week'] | stores date data | dimension table |
| dim_model_type | 'model_type_id', 'model_type_name' | stores immigration model type information | dimension table |
| dim_visa_code | 'visa_code_id', 'visa_code_name' | stores immigration visa code information | dimension table |

#### Step 5: Complete Project Write Up
1. Clearly state the rationale for the choice of tools and technologies for the project.
 * The ETL pipeline data analysis process is scaled up through the use of big data processing framework: Spark, in order to further optimize queries on immigration data analysis. Spark achives high performance when processing analytic workloads on big data sets. Using in-memory computing, parallel processing and lazy evaluation, Spark supports fast query, analyze, and transform data at scale across a cluster with multiple machines.
 * A set of star schema analytics tables are created to give users the ability to perform simple query with less joins and fast aggregations. The analytics tables also allow analytics team to continue finding insights on immigration data.



2. Write a description of how you would approach the problem differently under the following scenarios:

* The data was increased by 100x.
 * The project already uses big data processing framework: Spark, which ahives high performance when processing analytic workloads on big data sets.If we need a write-heavy operation, we could consider using a Cassandra database.
  
  
* The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * Use Airflow to automate the data pipeline on a daily schedule. Create a DAG that performs the logic of the described pipeline. If executing the DAG fails, we could use Airflow's builtin feature to automatically send emails to the engineering team, so they can fix potential issues as soon as possible.
  
  
* The database needed to be accessed by 100+ people.
 * Cloud Data Warehouse like Redshift to give the analytics team a flexible architecture that can scale up or down in seconds to meet changing in user access/storage demands. Redshift also achieves high performance when handling analytic workloads on big data sets. Using its Massively Parallel Processing (MPP) architecture, Redshift can parallelize data loading, backup, and restore operations on multiple machines.