# US I94 Immigration Analytics

### Data Engineering Capstone Project

#### Project Summary

This project attempts to build an ETL pipeline hosted on S3 with the use of Spark so as to set up efficient analytics framework with I94 immigration, global land temperatures and US demographics data. Within the project, we will load data from S3, process the data into analytics tables (fact and dimension tables which will act as the fundamental of further business cases analysis) using Spark based on EDA (exploratory data analysis) result, and load them back into S3. 

This project is going to utilize I94 immigration, global land temperatures and US demographics data to build up analytics database. By combining the datasets appropriately, analysis can be done on the dataset to explore the pattern and insight of US immigration along the time.

The data will load in i94 immigration event, land temperature and US demographic dataset, then clean and process the datasets efficiently with the help of module functions and pipeline, then extract the dimension information from cleaned source data and create the dimension tables. With the fact table which store the immigration events and all dimension tables which store the arrive date, visa type, country averag temperature and demographics data, we can do analysis on:
- How international traveler distributes within months over the whole year?
- How international visitor arrival date expand?
- How people choose which port and which travel mode to enter the US? 
- How does age and visa status of international visitor look?
- Is there any relationship between where international visitors come from and the land temperature?

The Spark process will be deployed on a cluster using AWS. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Import libraries

In [76]:
# All imports and installs
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

In [77]:
import pandas as pd
pd.options.display.max_columns = 100
import configparser
import datetime as dt

import utils
import clean_data
import create_tables
import quality_checks

# Spark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import avg
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear
from pyspark.sql.functions import monotonically_increasing_id


In [78]:
# config information
config = configparser.ConfigParser()
config.read('config.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']


In [79]:
# create spark session
spark = utils.creat_spark_session()

### Step 1: Scope the Project and Gather Data

#### Scope 
This project is going to utilize I94 immigration, global land temperatures and US demographics data to build up analytics database. By combining the datasets appropriately, analysis can be done on the dataset to explore the pattern and insight of US immigration along the time.

The first step will be loading the data set with Spark, then appropriate processing and cleansing will be made based on EDA (exploratory data analysis) on I94 immigration, demographics and global land temperatures dataset. Then, dimension and fact tables will be created accordingly so that further advanced analysis can be done on the database.

The project will store the data on Amazon S3 and use Apache Spark to read in source data from staging tables, extract necessary columns needed for analysis and populate the fact and dimension tables. Then will use Spark to write the data back to S3 if needed. For the data modeling part, the project will use the dimensional model which will make it easy for business users to work with the data and also improve analytical queries performance. So, in this case, we will use Star Schema which fits OLAP (online analytical processing) very well.
#### Describe and Gather Data 
The project will be working with four datasets that resides in S3. The main dataset will include data on immigration to the United States, and supplementary datasets will include data on airport codes, U.S. city demographics, and temperature data.

- __I94 Immigration Data:__ This data comes from the US National Tourism and Trade Office. The data contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries).

In [6]:
# Read in the data here

# I94 SAS Data
# file_name = 'immigration_data_sample.csv'
# df_immigration = pd.read_csv(file_name)
# df_immigration.head()

In [5]:
# Read in the data here
# I94 SAS Data
df_immigration =spark.read.load('./sas_data')

In [6]:
# show first several rows
df_immigration.show(n=5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [9]:
# Read in the data here

# I94 SAS Data
# fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
# df_immigration =spark.read.format('com.github.saurfang.sas.spark').load(fname)

In [7]:
# show first several rows
df_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,1.0,20160430,SYD,,G,O,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,1.0,1.0,20160430,SYD,,G,O,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,1.0,1.0,20160430,SYD,,G,O,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


In [8]:
# write to parquet
# df_immigrationa.write.parquet("sas_data")
# df_immigration=spark.read.parquet("sas_data")

- __World Temperature Data:__ This dataset came from Kaggle. The data contains the information of global average temperature, average temperature uncertainty by country and city.

In [9]:
# read in temperature data
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = spark.read.csv(fname, header=True, inferSchema=True)
# df_temperature = pd.read_csv(fname)

In [10]:
# display the first five records
df_temperature.show(n=5)

+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



- __U.S. City Demographic Data:__ This data comes from OpenSoft. This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000.

In [11]:
# read in us cities demoraphics data
file_name = 'us-cities-demographics.csv'
# df_demographics = pd.read_csv(fname, sep=';')
df_demographics = spark.read.csv(file_name, inferSchema=True, header=True, sep=';')

In [12]:
# display the first five records
df_demographics.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


- __Airport Code Table:__ This is a simple table of airport codes and corresponding cities.

In [13]:
# read in airport codes data
file_name = 'airport-codes_csv.csv'
# df_airport_codes = pd.read_csv(fname)
df_airport_codes = spark.read.csv(file_name, inferSchema=True, header=True, sep=',')

In [14]:
# display the first five records
df_airport_codes.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


### Step 2: Explore and Assess the Data
#### Explore the Data and Cleaning
Identify data quality issues, like missing values, duplicate data, etc.


- __I94 Immigration Dat__

By looking at the labels mapping of I94 immigration dataset, we can select some candidate columns which will be used in later data modeling. Since we are only looking at 2016 I94 immigration dataset, there’s no need to keep year column, but will keep the month column so that we might be able to get some insight related to month. CIT and Port columns can be kept for exploring the location distribution, mode, birth and visa will be helpful information for analyzing demographic insight. Visapost column can also be useful if we look at where most of international visitor come from, and last will include female and airline columns at first round to see if we can get any valuable information from them.

In [15]:
# Explore the I94 immigration data
df_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,1.0,20160430,SYD,,G,O,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,1.0,1.0,20160430,SYD,,G,O,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,1.0,1.0,20160430,SYD,,G,O,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


In [16]:
# overview on columns
df_immigration.columns

['cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'i94addr',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'dtadfile',
 'visapost',
 'occup',
 'entdepa',
 'entdepd',
 'entdepu',
 'matflag',
 'biryear',
 'dtaddto',
 'gender',
 'insnum',
 'airline',
 'admnum',
 'fltno',
 'visatype']

In [17]:
# based on initial analysis, exclude columns which have been decided not to be used
df_immigration_selected = df_immigration[['cicid', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
                                          'i94mode', 'i94addr', 'depdate', 'i94bir', 'visatype',
                                          'count', 'visapost', 'occup', 'gender', 'airline']]

In [18]:
df_immigration_selected.limit(5).toPandas()

Unnamed: 0,cicid,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,visatype,count,visapost,occup,gender,airline
0,5748517.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,B1,1.0,SYD,,F,QF
1,5748518.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,B1,1.0,SYD,,F,VA
2,5748519.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,B1,1.0,SYD,,M,DL
3,5748520.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,B1,1.0,SYD,,F,DL
4,5748521.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,B1,1.0,SYD,,M,DL


In [19]:
# check the missing values
nrows = df_immigration_selected.count()
df_missing = df_immigration_selected.select([(count(when(isnan(c) | col(c).isNull(), c))/nrows).alias(c) for c in df_immigration_selected.columns]).toPandas()

# display the missing value info
df_missing = pd.melt(df_missing, var_name='cols', value_name='values')
df_missing

Unnamed: 0,cols,values
0,cicid,0.0
1,i94mon,0.0
2,i94cit,0.0
3,i94res,0.0
4,i94port,0.0
5,arrdate,0.0
6,i94mode,7.7e-05
7,i94addr,0.049282
8,depdate,0.046009
9,i94bir,0.000259


In [20]:
# get columns to drop which have missing values over 50%
drop_cols = list(df_missing[df_missing['values']>0.5]['cols'])
drop_cols

['visapost', 'occup']

In [21]:
# drop the columns which have missing data > 50% since it won't be helpful for analysis
df_immigration_cols_dropped = df_immigration_selected.drop(*drop_cols)

In [22]:
# check column names left
df_immigration_cols_dropped.columns

['cicid',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'i94addr',
 'depdate',
 'i94bir',
 'visatype',
 'count',
 'gender',
 'airline']

In [23]:
# drop duplicates if exists
df_immigration_dropped = df_immigration_cols_dropped.drop_duplicates()

# drop rows which have missing value in all columns
df_immigration_dropped = df_immigration_dropped.dropna(how='all')

In [24]:
# test pipeline function
df_immigration_dropped = clean_data.clean_immigration_data(df_immigration)

In [25]:
# quick view on processed data
df_immigration_dropped.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,474.0,2016.0,4.0,103.0,103.0,NEW,20545.0,2.0,,20547.0,25.0,2.0,1.0,20160401,G,O,,M,1991.0,6292016,F,,VES,55410440000.0,91285,WT
1,1508.0,2016.0,4.0,104.0,104.0,NYC,20545.0,1.0,NY,20552.0,16.0,2.0,1.0,20160401,G,O,,M,2000.0,6292016,F,,LX,55416410000.0,16,WT
2,1669.0,2016.0,4.0,104.0,104.0,NYC,20545.0,1.0,FL,20561.0,57.0,2.0,1.0,20160401,G,O,,M,1959.0,6292016,M,,AA,55457750000.0,39,WT
3,2025.0,2016.0,4.0,104.0,104.0,NYC,20545.0,1.0,NY,20549.0,51.0,2.0,1.0,20160401,O,O,,M,1965.0,6292016,,,SN,55419980000.0,1401,WT
4,2048.0,2016.0,4.0,104.0,104.0,MIA,20545.0,1.0,FL,20554.0,3.0,2.0,1.0,20160401,O,O,,M,2013.0,6292016,,,UX,55456900000.0,97,WT


- __World Temperature data__

In [26]:
# quick look at first several rows
df_temperature.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [27]:
# convert dt column type to string
df_temperature = df_temperature.withColumn("dt",col("dt").cast(StringType())) 

In [28]:
# check the missing values
df_missing = utils.check_missing_values(df_temperature)
df_missing

Unnamed: 0,cols,values
0,dt,0.0
1,AverageTemperature,0.042345
2,AverageTemperatureUncertainty,0.042345
3,City,0.0
4,Country,0.0
5,Latitude,0.0
6,Longitude,0.0


Since the I94 immigration data only has 2016 data, and the temperature has the data till 2013, in that case, we will only look at the 2013 temperature data. And the I94 immigration data only have the information at country level for which country the visitor is from and at most state level for which state the visitor enter the US, in the following steps, we will use this data set at country level.

There’s very little amount of missing data in Average Temperature and Average Temperature Uncertainty columns (4%) and since we will only use temperature dataset at country level, we can just fill the missing data with the group average at country, dt level.

In [29]:
# drop missing values and duplicates
df_temperature_dropped = clean_data.clean_temperature_data(df_temperature)

In [30]:
# quick view on processed data
df_temperature_dropped.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01 00:00:00,7.76,1.973,Bilbao,SPAIN,42.59N,2.18W
1,1743-11-01 00:00:00,6.44,1.605,Göttingen,GERMANY,52.24N,10.51E
2,1744-04-01 00:00:00,14.251,2.169,Coimbra,PORTUGAL,40.99N,8.52W
3,1744-04-01 00:00:00,16.463,1.904,Palma,SPAIN,39.38N,2.08E
4,1744-04-01 00:00:00,8.807,2.252,Sterling Heights,UNITED STATES,42.59N,82.91W


- __U.S. City Demographic Data:__ Looking at US city demographic data, first check the overall missing data and it turned out there’s a vert few missing data (0.5% of the whole dataset at most), so here decide to just remove the rows with missing data. And also remove duplicate rows if exists. 

In [31]:
# quick look at first several rows
df_demographics.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [32]:
# check the missing values
df_missing = utils.check_missing_values(df_demographics)
df_missing

Unnamed: 0,cols,values
0,City,0.0
1,State,0.0
2,Median Age,0.0
3,Male Population,0.001038
4,Female Population,0.001038
5,Total Population,0.0
6,Number of Veterans,0.004497
7,Foreign-born,0.004497
8,Average Household Size,0.005534
9,State Code,0.0


In [33]:
# drop missing values and duplicates
df_demographics_dropped = clean_data.clean_demographics_data(df_demographics)

In [34]:
# quick view on processed data
df_demographics_dropped.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
1,Wilmington,North Carolina,35.5,52346,63601,115947,5908,7401,2.24,NC,Asian,3152
2,Tampa,Florida,35.3,175517,193511,369028,20636,58795,2.47,FL,Hispanic or Latino,95154
3,Gastonia,North Carolina,36.9,35527,39023,74550,3537,5715,2.67,NC,Asian,2788
4,Tyler,Texas,33.9,50422,53283,103705,4813,8225,2.59,TX,American Indian and Alaska Native,1057


- __Airport Code Data:__ Since after initial assessment, it looks like the airport code dataset is difficult to join the other three datasets especially the I94 immigration data (which doesn’t have city level information). There’s no good option for common column to join the dataset for further analysis, therefore, here decide to not be using this dataset in the data modeling. Since the project is not going to use this data set, here not goint to do more process on this dataset

In [35]:
# quick look at first several rows
df_airport_codes.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [36]:
# filter only US airports
df_airport_us = df_airport_codes[df_airport_codes['iso_country']=='US']

In [37]:
# check the missing values
df_missing = utils.check_missing_values(df_airport_us)
df_missing

Unnamed: 0,cols,values
0,ident,0.0
1,type,0.0
2,name,0.0
3,elevation_ft,0.010502
4,continent,0.0
5,iso_country,0.0
6,iso_region,0.0
7,municipality,0.004482
8,gps_code,0.07791
9,iata_code,0.91128


### Step 3: Define the Data Model
In this project, we will use Star Schema to convert the data which is a model designed to support OLAP (online analytics processing) at its best. We will have Fact tables which will record business events (like an order, a phone call, or a book review, etc.) recorded in quantifiable metrics (like quantity of an item, duration of a call, or a book rating, etc.). And Dimension tables will record the context of the business event (e.g., who, what, where, why, etc.) with attributes value like location of a store where the item was purchased, or name of the customer who made the call, etc.
#### 3.1 Conceptual Data Model
The data model will have one fact table that record the key information, and several dimension tables which contains the details. The country dim table has contry mapping and average temperature information. This can support the analysis on how the temperature changing or pattern are related to the immigration trend. The US demographics dimension table grabs information from demographics dataset and connect to the fact table with state level mapping. This will help the analysis on relationship between the immigration pattern and US demographic data. The visa dimension table includes the visa categories information from immigration dataset and can be linked back using visa type key.

#### 3.2 Mapping Out Data Pipelines
The steps to pipeline the data into the chosen data model are as following. First, load in the dataset for processing and cleaning, after getting clean immigration data frame, we can extract visa and arrive date dimension table. After loading the global temperature data, create country dimension table and immigration fact table. Then, load in and clean the demographic data then extract and create the demographic dimension table.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### Immigration date dimension table

In [38]:
# format the sas date in arrdate column
get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)

# create date dimension table using arrdate
df_arrdate_dim = df_immigration_dropped.select(['arrdate']).withColumn("arrdate", get_date(df_immigration_dropped.arrdate)).distinct()

# add other dimension of date
df_arrdate_dim = df_arrdate_dim.withColumn('arrival_day', dayofmonth('arrdate'))
df_arrdate_dim = df_arrdate_dim.withColumn('arrival_week', weekofyear('arrdate'))
df_arrdate_dim = df_arrdate_dim.withColumn('arrival_month', month('arrdate'))
df_arrdate_dim = df_arrdate_dim.withColumn('arrival_year', year('arrdate'))
df_arrdate_dim = df_arrdate_dim.withColumn('arrival_weekday', dayofweek('arrdate'))

# add an identical column
df_arrdate_dim = df_arrdate_dim.withColumn('id', monotonically_increasing_id())

In [39]:
# write to parquet file
partition_columns = ['arrival_year', 'arrival_month', 'arrival_week']
df_arrdate_dim.write.parquet("tables/" + "immigration_arrdate", partitionBy=partition_columns, mode="overwrite")

In [40]:
# test pipeline function
output_data = 'tables/'
df_arrdate_dim = create_tables.create_arrdate_dimension(df_immigration_dropped, output_data)

In [41]:
# quickly view the data
df_arrdate_dim.limit(5).toPandas()

Unnamed: 0,arrdate,arrival_day,arrival_week,arrival_month,arrival_year,arrival_weekday,id
0,2016-04-22,22,16,4,2016,6,8589934592
1,2016-04-15,15,15,4,2016,6,25769803776
2,2016-04-18,18,16,4,2016,2,42949672960
3,2016-04-09,9,14,4,2016,7,68719476736
4,2016-04-11,11,15,4,2016,2,85899345920


#### Country dimension table

In [42]:
# country codes mapping
country_cd = pd.read_csv('i94res.csv')
country_cd.head()

Unnamed: 0,Code,Name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [43]:
# aggregate the temperature data
agg_temp = df_temperature_dropped.select(['Country', 'AverageTemperature']).groupby('Country').avg()
agg_temp = agg_temp.withColumnRenamed('avg(AverageTemperature)', 'average_temperature').toPandas()
agg_temp.head()

Unnamed: 0,Country,average_temperature
0,SOUTH AFRICA,16.360849
1,ARMENIA,8.375597
2,BAHAMAS,24.786978
3,BURMA,26.01684
4,CAMBODIA,26.918136


In [44]:
# extract the i94res column
df_country_dim = df_immigration_dropped.select(['i94res']).distinct().withColumnRenamed('i94res', 'country_code')
df_country_dim.limit(5).toPandas()

Unnamed: 0,country_code
0,692.0
1,299.0
2,576.0
3,735.0
4,206.0


In [45]:
# add country name
# map the country name using country_cd 
@udf('string')
def map_country_name(code):
    country_name = country_cd[country_cd['Code']==code]['Name'].iloc[0]
        
    if country_name:
        return country_name
        
    return None


df_country_dim = df_country_dim.withColumn('country_name', map_country_name(df_country_dim.country_code))
df_country_dim.limit(5).toPandas()

Unnamed: 0,country_code,country_name
0,692.0,ECUADOR
1,299.0,MONGOLIA
2,576.0,EL SALVADOR
3,735.0,MONTENEGRO
4,206.0,HONG KONG


In [46]:
# add country average temperature
# map the country average temperature using country_cd 
@udf('string')
def map_average_temperature(country_name):
    average_temperature = agg_temp[agg_temp['Country']==country_name]['average_temperature']
    
    if not average_temperature.empty:
        return str(average_temperature.iloc[0])
    
    return None

df_country_dim = df_country_dim.withColumn('average_temperature', map_average_temperature(df_country_dim.country_name))
df_country_dim.limit(5).toPandas()

Unnamed: 0,country_code,country_name,average_temperature
0,692.0,ECUADOR,20.5391705374
1,299.0,MONGOLIA,-3.36548531952
2,576.0,EL SALVADOR,25.2628525509
3,735.0,MONTENEGRO,
4,206.0,HONG KONG,21.4236961538


In [47]:
output_data = 'tables/'
df_country_dim.write.parquet(output_data + "country", mode="overwrite")

In [48]:
country_cd = spark.read.csv('i94res.csv', header=True, inferSchema=True)

In [49]:
# test pipleline function
output_data = 'tables/'
df_country_dim = create_tables.create_country_dimension(spark, df_immigration_dropped, df_temperature_dropped, output_data, country_cd)

In [50]:
# quickly check the data
df_country_dim.limit(5).toPandas()

Unnamed: 0,country_code,country_name,average_temperature
0,151.0,ARMENIA,8.375597
1,512.0,BAHAMAS,24.786978
2,373.0,SOUTH AFRICA,16.360849
3,735.0,MONTENEGRO,
4,243.0,BURMA,26.01684


#### Visa dimension table

In [51]:
# create visa dimension dataframe using visatype column
df_visa_dim = df_immigration_dropped.select(['visatype']).distinct()

# add an nonduplicate id column
df_visa_dim = df_visa_dim.withColumn('visa_type_key', monotonically_increasing_id())


In [52]:
output_data = 'tables/'
df_visa_dim.write.parquet(output_data + "visa", mode="overwrite")

In [53]:
# test pipeline function
output_data = 'tables/'
df_visa_dim = create_tables.create_visa_dimension(df_immigration_dropped, output_data)

In [54]:
# quickly view the data
df_visa_dim.limit(5).toPandas()

Unnamed: 0,visatype,visa_type_key
0,F2,103079215104
1,GMB,352187318272
2,B2,369367187456
3,F1,498216206336
4,CPL,601295421440


#### Demographic dimension table

In [55]:
# create demographics dimension table
df_demographics_dim = df_demographics_dropped.withColumnRenamed('Median Age', 'median_age') \
    .withColumnRenamed('Male Population', 'male_population') \
    .withColumnRenamed('Female Population', 'female_population') \
    .withColumnRenamed('Total Population', 'total_population') \
    .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
    .withColumnRenamed('Foreign-born', 'foreign_born') \
    .withColumnRenamed('Average Household Size', 'average_household_size') \
    .withColumnRenamed('State Code', 'state_code')

# add an un duplicate id column
df_demographics_dim = df_demographics_dim.withColumn('id', monotonically_increasing_id())


In [56]:
output_data = 'tables/'
df_demographics_dim.write.parquet(output_data + "demographics", mode="overwrite")

In [58]:
# test pipeline function
output_data = 'tables/'
df_demographics_dim = create_tables.create_demographics_dimension(df_demographics_dropped, output_data)

In [59]:
# quickly check the data
df_demographics_dim.limit(5).toPandas()

Unnamed: 0,City,State,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,Race,Count,id
0,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723,0
1,Wilmington,North Carolina,35.5,52346,63601,115947,5908,7401,2.24,NC,Asian,3152,1
2,Tampa,Florida,35.3,175517,193511,369028,20636,58795,2.47,FL,Hispanic or Latino,95154,2
3,Gastonia,North Carolina,36.9,35527,39023,74550,3537,5715,2.67,NC,Asian,2788,3
4,Tyler,Texas,33.9,50422,53283,103705,4813,8225,2.59,TX,American Indian and Alaska Native,1057,4


#### Immigration fact table

In [60]:
# grab visa data
df_visa = df_visa_dim.toPandas()
df_visa

Unnamed: 0,visatype,visa_type_key
0,F2,103079215104
1,GMB,352187318272
2,B2,369367187456
3,F1,498216206336
4,CPL,601295421440
5,I1,704374636544
6,WB,738734374912
7,M1,747324309504
8,B1,807453851648
9,WT,884763262976


In [62]:
# map the visa type code
@udf('string')
def map_visa_key(visa_type):
    keys = df_visa[df_visa['visatype']==visa_type]['visa_type_key']
    
    if not keys.empty:
        return str(keys.iloc[0])
    
    return None

# convert arrival date in SAS format to datetime
get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)

In [63]:
# rename columns to align with data model
df_fact = df_immigration_dropped.withColumnRenamed('ccid', 'record_id') \
    .withColumnRenamed('i94res', 'country_residence_code') \
    .withColumnRenamed('i94addr', 'state_code')

In [65]:
# add visa_type key
df_fact = df_fact.withColumn('visa_type_key', map_visa_key(df_fact.visatype))
    
# format arrival date into datetime object
df_fact = df_fact.withColumn("arrdate", get_date(df_fact.arrdate))

In [66]:
output_data = 'tables/'
df_fact.write.parquet(output_data + "immigration_fact", mode="overwrite")

In [69]:
# test pipeline function
df_fact = create_tables.create_immigration_fact(spark, df_immigration_dropped, output_data)

#### 4.2 Data Quality Checks
The data quality checks to ensure the pipeline ran as expected.

In [71]:
import importlib
importlib.reload(clean_data)
importlib.reload(utils)
importlib.reload(create_tables)
importlib.reload(quality_checks)

<module 'quality_checks' from '/home/workspace/quality_checks.py'>

In [None]:
# Perform quality checks

tables = {
    'immigration_fact': df_fact,
    'visa_dim': df_visa_dim,
    'arrdate_dim': df_arrdate_dim,
    'demographics_dim': df_demographics_dim,
    'country_dim': df_country_dim
}

# check if the table is loaded without error
for table_name, df_table in tables.items():
    # check if the tables are loaded successfully
    quality_checks.loading_checks(df_table, table_name)

In [None]:
# check if the table counts is same as the source table
quality_checks.count_checks(df_immigration_dropped, df_fact)
quality_checks.count_checks(df_demographics_dropped, df_demographics_dim)


#### 4.3 Data dictionary 
The following the a summary of data dictionary of the data model

#### Fact table
| Feature                | Description                                                                     |
|------------------------|---------------------------------------------------------------------------------|
| record_id              | Unique record ID                                                                |
| country_residence_code | 3 digit code for immigrant country of residence                                 |
| visa_type_key          | A numerical key that links to the visa_type dimension table                     |
| state_code             | US state of arrival                                                             |
| i94yr                  | 4 digit year                                                                    |
| i94mon                 | Numeric month                                                                   |
| i94port                | Port of admission                                                               |
| arrdate                | Arrival Date in the USA                                                         |
| i94mode                | Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not   reported)         |
| i94addr                | USA State of arrival                                                            |
| depdate                | Departure Date from the USA                                                     |
| i94bir                 | Age of Respondent in Years                                                      |
| i94visa                | Visa codes collapsed into three categories                                      |
| count                  | Field used for summary statistics                                               |
| dtadfile               | Character Date Field - Date added to I-94 Files                                 |
| visapost               | Department of State where where Visa was issued                                 |
| occup                  | Occupation that will be performed in U.S                                        |
| entdepa                | Arrival Flag - admitted or paroled into the U.S.                                |
| entdepd                | Departure Flag - Departed, lost I-94 or is deceased                             |
| entdepu                | Update Flag - Either apprehended, overstayed, adjusted to perm   residence      |
| matflag                | Match flag - Match of arrival and departure records                             |
| biryear                | 4 digit year of birth                                                           |
| dtaddto                | Character Date Field - Date to which admitted to U.S. (allowed   to stay until) |
| gender                 | Non-immigrant sex                                                               |

#### Country dimension table
| Feature             | Description                    |
|---------------------|--------------------------------|
| country_code        | Unique country code            |
| country_name        | Name of country                |
| average_temperature | Average temperature of country |

#### Visa dimension table
| Feature       | Description                    |
|---------------|--------------------------------|
| visa_type_key | Unique id for each visa issued |
| visa_type     | Name of visa                   |

#### Arrive date dimension table
| Feature         | Description          |
|-----------------|----------------------|
| id              | Unique id            |
| arrdate         | Arrival date into US |
| arrival_year    | Arrival year into US |
| arrival_month   | Arrival MonthS       |
| arrival_day     | Arrival Day          |
| arrival_week    | Arrival Week         |
| arrival_weekday | Arrival WeekDay      |

#### Demographics dimension table
| Feature                | Description                                                   |
|------------------------|---------------------------------------------------------------|
| id                     | Record id                                                     |
| state_code             | US state code                                                 |
| City                   | City Name                                                     |
| State                  | US State where city is located                                |
| Median Age             | Median age of the population                                  |
| Male Population        | Count of male population                                      |
| Female Population      | Count of female population                                    |
| Total Population       | Count of total population                                     |
| Number of Veterans     | Count of total Veterans                                       |
| Foreign born           | Count of residents of the city that were not born in the city |
| Average Household Size | Average city household size                                   |
| Race                   | Respondent race                                               |
| Count                  | Count of city's individual per race                           |

#### Step 5: Complete Project Write Up
* What's the goal? What queries will you want to run? How would Spark or Airflow be incorporated? Why did you choose the model you chose?  
    - This project is going to utilize I94 immigration, global land temperatures and US demographics data to build up analytics database. By combining the datasets appropriately, analysis can be done on the dataset to explore the pattern and insight of US immigration along the time.  

    - The data will load in i94 immigration event, land temperature and US demographic dataset, then clean and process the datasets efficiently with the help of module functions and pipeline, then extract the dimension information from cleaned source data and create the dimension tables. With the fact table which store the immigration events and all dimension tables which store the arrive date, visa type, country averag temperature and demographics data, we can do analysis on:
        - How international traveler distributes within months over the whole year?
        - How international visitor arrival date expand?
        - How people choose which port and which travel mode to enter the US? 
        - How does age and visa status of international visitor look?
        - Is there any relationship between where international visitors come from and the land temperature?  

* The rationale for the choice of tools and technologies for the project.
    - Technology and tools  
The project will store the data on Amazon S3 and use Apache Spark to read in source data from staging tables, extract necessary columns needed for analysis and populate the fact and dimension tables. Then will use Spark to write the data back to S3 if needed. For the data modeling part, the project will use the dimensional model which will make it easy for business users to work with the data and also improve analytical queries performance. So, in this case, we will use Star Schema which fits OLAP (online analytical processing) very well.

    - Why S3 and Spark  
When dealing with the large dataset in this project, with the combining of both batch and streaming capabilities, Spark can support the use case very well where the data need to be stored and analyzed in real-time. It will have more flexibility when more type and volume of data sources need to be added. Therefore, storing the data on S3 will eliminate need to invest in costly hardware and scale up with full flexibility when needed. And speaking of Parquet files, the columnar format that being used will be a good option to store big data set and for analytics purpures as well. And Spark can efficiently read data from S3 and process the data with full sets of data analytics and machine learning libraries. Especially when dealing with large dataset, Spark has more capacity to handle the performance and efficiency.

* How often the data should be updated and why.  
The I94 immigration data used in this project is updated monthly, therefore it would be a good choice to update the data model designed in this project monthly as well.

* More scenarios:
 * The data was increased by 100x.  
     Since Spark is designed for handling big data set, the increased data set won't be a big issue for Spark. But it might be an option to change some setting when setup the clusters like node numbers, computer power, etc.  
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.  
     We can utilize the Apache Airflow to schedule the pipeline running so that we can get time on time everyday.
 * The database needed to be accessed by 100+ people.  
     We can move the database to cloud like using Redshift so that we can support more access better.