# US Immigration Events ETL

### Data Engineering Capstone Project

#### Project Summary
This is an Udacity Data Engineering Capstone project to showcase all the learning & skills that have been acquired during the nano-degree program.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Do all imports and installs here
import pandas as pd
import re
import psycopg2
from collections import defaultdict
from datetime import datetime, timedelta
from pyspark.sql.functions import udf

import pandas as pd, re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
import pyspark
from pyspark.sql.functions import udf, col,weekofyear, date_format,to_date
from pyspark.conf import SparkConf
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format,to_date,to_timestamp

### Step 1: Scope the Project and Gather Data

#### Scope 

The scope of the this project is to build an ETL process for US immigration events. Data is gathered, assessed, cleaned, and built into data models stored in data warehouse. From there, they will be used for many different analytics purpose. 

Tools are used: Python, Spark, AWS S3, AWS EMR, AWS Redshift, Airflow.


#### Describe and Gather Data 

Data is provided by Udacity.

- **i94 SAS datasets 2016** The data contains 12 sas files of US immigration events in 2016 (6 GB).
- **US cities demographics** The data contains demographics information of US cities.
- **i94 SAS Labels Descriptions**. The data is processed into below datasets:
    - countries_codes.txt
    - modes.txt
    - ports.txt
    - us_states.txt
    - visa_intents_types.txt

### Step 2: Explore and Assess the Data

#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

##### US Immigration data

The I94 immigration data comes from the US National Tourism and Trade Office. It is provided in SAS7BDAT format which is a binary database storage format.

In [3]:
I94_sample_path = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
I94_sample = pd.read_sas(I94_sample_path, 'sas7bdat', encoding="ISO-8859-1")

In [4]:
I94_sample.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


Dataset info

In [9]:
I94_sample.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3096313 entries, 0 to 3096312
Data columns (total 28 columns):
cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port     object
arrdate     float64
i94mode     float64
i94addr     object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile    object
visapost    object
occup       object
entdepa     object
entdepd     object
entdepu     object
matflag     object
biryear     float64
dtaddto     object
gender      object
insnum      object
airline     object
admnum      float64
fltno       object
visatype    object
dtypes: float64(13), object(15)
memory usage: 661.4+ MB


Missing values

In [21]:
pd.DataFrame((I94_sample.isnull().sum() / len(I94_sample))*100, columns = ['% missing data']).sort_values('% missing data', ascending=False)

Unnamed: 0,% missing data
entdepu,99.98734
occup,99.737559
insnum,96.327632
visapost,60.757746
gender,13.379429
i94addr,4.921079
depdate,4.600859
matflag,4.470769
entdepd,4.470769
airline,2.700857


entdepu, insnum, occup, and visapost are the columns have the highest missing data.

##### Demographics data

In [138]:
demographics_data = pd.read_csv('us-cities-demographics.csv',sep=';')

In [139]:
demographics_data.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


Examize data in 1 city

In [179]:
demographics_data[demographics_data['City'] =='Quincy']

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
289,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,Hispanic or Latino,2566
426,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,American Indian and Alaska Native,351
2322,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,Black or African-American,3917
2578,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,Asian,30473


Missing values

In [25]:
pd.DataFrame((demographics_data.isnull().sum() / len(demographics_data))*100, columns = ['% missing data']).sort_values('% missing data', ascending=False)

Unnamed: 0,% missing data
Average Household Size,0.553442
Number of Veterans,0.449671
Foreign-born,0.449671
Male Population,0.10377
Female Population,0.10377
City,0.0
State,0.0
Median Age,0.0
Total Population,0.0
State Code,0.0


Countries

In [3]:
countries_data = pd.read_csv('countries_codes.txt', sep='=',header=None,index_col=None, names = ['code','name'])
countries_data['name'] = countries_data['name'].apply(lambda x: x.replace("'","").strip())
countries_data.head()

Unnamed: 0,code,name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


US States

In [35]:
us_states_data = pd.read_csv('us_states.txt', sep='=',header=None,index_col=None, names = ['code','name'])
us_states_data['code'] = us_states_data['code'].apply(lambda x: x.replace("'","").strip())
us_states_data['name'] = us_states_data['name'].apply(lambda x: x.replace("'","").strip())
us_states_data.head()

Unnamed: 0,code,name
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


Arrival Mode types

In [40]:
modes_data = pd.read_csv('modes.txt', sep='=',header=None,index_col=None, names = ['code','mode'])
modes_data['mode'] = modes_data['mode'].apply(lambda x: x.replace("'","").strip())
modes_data

Unnamed: 0,code,mode
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


US ports

In [173]:
    ports_df = pd.read_csv('ports.txt', sep='=',header=None,index_col=None, names = ['port_code','port_name'])
    
    #Strip '' from data
    ports_df['port_code'] = ports_df['port_code'].apply(lambda x: x.replace("'","").strip())
    ports_df['port_name'] = ports_df['port_name'].apply(lambda x: x.replace("'","").strip())
    
    #remove invalid and not us ports    
    ports_df = ports_df[~ports_df.port_name.str.lower().str.contains('collapsed')]
    ports_df = ports_df[~ports_df.port_name.str.lower().str.contains('no port')]
    ports_df = ports_df[~ports_df.port_name.str.lower().str.contains('unknown')]
    ports_df = ports_df[~ports_df.port_name.str.lower().str.contains('identifi')]
    
    #Separate port_name to port_city and port_state
    ports_df[['port_city', 'port_state']] = ports_df['port_name'].str.rsplit(",",n=1, expand=True)

i94visa

In [38]:
i94_data = pd.read_csv('i94visa.txt', sep='=',header=None,index_col=None, names = ['code','visa_type'])
i94_data

Unnamed: 0,code,visa_type
0,1,Business
1,2,Pleasure
2,3,Student


Visa types

In [43]:
visa_types_data = pd.read_csv('visa_types.txt', sep='|',header=None,index_col=None, names = ['code','visa_type'])
visa_types_data.head()

Unnamed: 0,code,visa_type
0,A1,"Ambassador, public minister, career diplomati..."
1,A2,Other foreign government official or employee...
2,A3,"Attendant, servant, or personal employee of A..."
3,B1,Temporary visitor for business (including Pea...
4,B2,Temporary visitor for pleasure.


#### Cleaning Steps


This is done by Spark, Pandas and AWS EMR.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Data Model is built with Star Schema. Fact table consists of US immigration events happened in 2016. Dimension tables consist of the data that are related to dimensions in the fact table: countries, states, cities, visa types, visa intention types, arrival mode types, ports.  

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

ETL pipeline:
- Raw data is moved from the source to S3 input bucket.
- ETL job copies raw data from S3 input bucket to processing bucket.
- Spark job is triggered to read the data from processing bucket and process the data on AWS EMR clusters. In this step, data is cleaned, transformed, repartitioned and finally moved to processed bucket. 
- ETL job picks up data from processed bucket and stages it into AWS Redshift staging tables.
- ETL job performs UPSERT operation to upload and insert data from staging tables into production tables in Redshift.
- ETL pipeline execution is completed.




#### 4.2 Data Quality Checks
Airflow DAG:
- Once the ELT pipeline execution is completed, DAG runs data quality check on all production tables in Redshift.
- Data quality check include: check null, run some analytics queries.
- DAG job execution is completed.


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

**Fact Table** - Immigration data Events: fact_immigration_events

- immigration_id = primary key
- cicid = unique key within a month
- i94yr = 4 digit year,
- i94mon = numeric month,
- i94cit = 3 digit code of immigrant's country of citizenship,
- i94res = immigrant's country of residence outside US,
- i94port = port of entry, 3 character code of destination USA city,
- arrdate = arrival date in the USA,
- i94addr = address in the USA,
- i94mode = mode of arrival, 1 digit travel code,
- depdate = departure date from the USA,
- i94bir = age,
- i94visa = visa,
- count = used for summary statistics; always 1 
- dtadfile = dates in the format YYYYMMDD,
- visapost = three-letter codes corresponding to where visa was issued,
- occup = occupation,
- entdepa = one-letter arrival code,
- entdepd = one-letter departure code,
- entdepu = one-letter update code,
- matflag = M if the arrival and departure records match,
- biryear = birth year,
- dtaddto = MMDDYYYY date field for when the immigrant is admitted until,
- gender = gender,
- insnum = Immigration and Naturalization Services number; many re-used,
- airline = Airline of entry for immigrant,
- admnum = admission number; many re-used, but not as much as insnum,
- fltno = flight number of immigrant,
- visatype = reason for immigration (short visa codes like WT, B2, WB, etc.),

**Dimension Table** - Countries: dim_countries

- country_code = country code
- country_name = country name

**Dimension Table** - States: dim_states

- state_code = State Code,
- state_name = state

**Dimension Table** - Cities: dim_cities_demographics_summary

- city = 3 character code of destination city (mapped from cleaned up immigration data),
- median_age = median age,
- male_pop = male population,
- female_pop = female population,
- total_pop = Total Population,
- num_vets = Number of Veterans,
- foreign_born = Foreign-born,
- avg_household_size = Average Household Size,
- state_code = State Code, can be null,
- PRIMARY KEY (state_code, city)

**Dimension Table** - Cities: dim_cities_demographics_race

- state_code = State Code, can be null,
- city = 3 character code of destination city (mapped from cleaned up immigration data),
- race = White, Hispanic or Latino, Asian, Black or African-American, or American Indian and Alaska Native,
- count = number of people of that race in the city

**Dimension port** A list of the ports of arrival: dim_ports

- port_code = a short code
- port_city = the name of the city where the port is in
- port_state = the name of the state where the port is in

**Dimension arrival mode** How immigrants arrived. Foreign key to fact_immigration.i94mode: dim_arrival_modes

- arrival_mode_code = 1, 2, 3, or 9
- arrival_mode_type = Air, Sea, Land, or Not reported, respectively

**Dimension visa type** The type of visa the immigrant is coming in on. Foreigy key to fact_immigration.i94visa: dim_visa_intents_types

- visa_intents_code = 1, 2, or 3
- visa_intents_type = Business, Pleasure, or Student, respectively

**Dimension visa type** The type of visa the immigrant is coming in on. Foreigy key to fact_immigration.i94visa: dim_visa_types

- visa_code = A1, A2,A3, B1, etc.
- visa_type = Ambassador, public minister, career diplomatic etc.


#### Step 5: Complete Project Write Up


##### Tools and technologies for the project.
- Python : The programming language used in the project.
- Pandas : Python data library used in initial data exploration and cleaning.
- Spark : Pyspark is used to process large immigration data in batch on aws emr.
- AWS S3 : Amazon Simple Storage Service used to store raw data before processing.
- AWS EMR: Amazon cluster used to process data with Spark.
- AWS Redshift : Amazon Redshift used as warehousing database to perform query analysis.
- Airflow: to run ETL and data quality check jobs.

##### How often the data should be updated.
- Fact table should be updated everyday as the immigration events happen everyday.
- Dimension tables are more static and can be updated weekly or monthly.

##### How to approach the problem differently under the following scenarios:

* **The data was increased by 100x:** Input data should be stored in S3 and data processing should be done in EMR and Spark. Redshift can handle 2.8 Billion rows easily as Single node of Redshift can handle of about 160GB and max 128 Compute nodes.
 

* **The data populates a dashboard that must be updated on a daily basis by 7am every day.** Scheduling with Airflow can be sheduled to run jobs.

 * **The database needed to be accessed by 100+ people.** Redshift can support upto 500 connections, so 100+ people can easily connect to Redshift.