# ETL Project of I-94 Immigration & City Demographics
### Data Engineering Capstone Project

#### Project Summary

The project is intended to build the ETL pipeline to integrate I-94 immigration data and city demographic data and transform data into star-schema data model for databases. The database can be used for analytical process regarding I-94 immigration and city demographic information, such as relationship of immigration frequencies and city demographics like race distribution. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
import datetime as dt
import re

### Step 1: Scope the Project and Gather Data

#### Scope 
Our plan for the project is as follows: we first explore and assess the data quality such as missing values for primary keys, and we build the data model by connecting immigration data with city demographic data on city keys and normalize the database.The final tables will be checked and stored in parquet files. 

#### Describe and Gather Data 
Our data are consisted of two parts: 

I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. We focus on the records on Jan 2016, which is stored as i94_jan16_sub.sas7bdat.

U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

In [3]:
# I-94 immigration data illustration
# Read in the data here
fname = 'immigration_data_sample.csv'
df = pd.read_csv(fname)

In [4]:
# illustration of immigration data samples
df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [5]:
# columns except Unnamed column
df.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [6]:
# city demographic data
fname = 'us-cities-demographics.csv'
df_demo = pd.read_csv(fname,sep=';')

In [7]:
# illustration of city demographic data
df_demo.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [9]:
# columns of city demographic data
df_demo.columns

Index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code', 'Race', 'Count'],
      dtype='object')

In [10]:
# Spark cluster ready for ETL Process
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 2: Explore and Assess the Data
#### Explore the Data 
We focus on the key immigration data and demographic data will join on. We join the dataset on i94port marking the location for both immigration and demographics. 

#### Cleaning Steps
We would clean the keys that the datasets join on. We define the unique cities in the US using city and state. For i94 immigration data, it is defined using i94port, so we must convert code to full address and extract columns of city and states. For city demographic data, we clean the null and duplicated records. 

In [11]:
# Performing cleaning tasks
# read i94port location dictionary
port_dict={}
with open('Port Map.txt','r') as f:
    for line in f:
        matchObj=re.search(r'\'(.*)\'.*\'(.*)\'',line)
        port_dict[matchObj.group(1)] = matchObj.group(2)

In [12]:
# illustration i94port dictionary
count=0
for i in port_dict.items():
    print(i)
    count+=1
    if count==10:
        break

('ALC', 'ALCAN, AK             ')
('ANC', 'ANCHORAGE, AK         ')
('BAR', 'BAKER AAF - BAKER ISLAND, AK')
('DAC', 'DALTONS CACHE, AK     ')
('PIZ', 'DEW STATION PT LAY DEW, AK')
('DTH', 'DUTCH HARBOR, AK      ')
('EGL', 'EAGLE, AK             ')
('FRB', 'FAIRBANKS, AK         ')
('HOM', 'HOMER, AK             ')
('HYD', 'HYDER, AK             ')


In [13]:
# load sas file of immigration data
df_i94 = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat')
df_i94_clean=df_i94.filter(df_i94.i94port.isin(list(port_dict.keys())))

In [14]:
# immigration data illustration
df_i94_clean.show(1)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|      admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------------+-----+--------+
|  7.0|2016.0|   1.0| 101.0| 101.0|    BOS|20465.0|    1.0|     MA|   null|  20.0|    3.0|  1.0|    null|    null| null|      T|   null|   null|   null| 1996.0|    D/S|     M|  null|     LH|3.46608285E8|  424|      F1|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+----

In [15]:
# Records Filtered
df_i94_clean.count()

2829058

In [16]:
# compare with original data, around 20000 records without i94port location removed
df_i94.count()

2847924

In [17]:
# define udf function to extract city and state code
def find_city(text):
    if re.search("-",text):
        city_state=text.split(sep='-')
        city=city_state[1].split(sep=',')[0].strip()
    else:
        city=text.split(sep=',')[0].strip()
        
    return city

def find_state(text):
    if re.search("-",text):
        city_state=text.split(sep='-')
        state=city_state[1].split(sep=',')[1].strip()
    else:
        state=text.split(sep=',')[1].strip()
        
    return state

In [18]:
# udf registration
get_city=udf(find_city)
get_state=udf(find_state)
get_full=udf(lambda x: port_dict[x])

In [19]:
# map full name of i94 port
df_i94_clean=df_i94_clean.withColumn("i94_full",get_full("i94port"))

In [20]:
# create city column
df_i94_clean=df_i94_clean.withColumn("City",get_city("i94_full"))

In [21]:
# create state column
df_i94_clean=df_i94_clean.withColumn("State Code",get_state("i94_full"))

In [22]:
# illustration after cleaning
df_i94_clean.show(1)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------------+-----+--------+--------------------+------+----------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|      admnum|fltno|visatype|            i94_full|  City|State Code|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------------+-----+--------+--------------------+------+----------+
|  7.0|2016.0|   1.0| 101.0| 101.0|    BOS|20465.0|    1.0|     MA|   null|  20.0|    3.0|  1.0|    null|    null| null|      T|   null|   null|   null| 1996.0|    D/S|     M|  null|     LH|3.46608285E8|  424|      F1|BOSTON, 

In [23]:
# drop duplicates from city demographic data
df_demo_clean=df_demo.drop_duplicates(subset=("City","State","Race"))

In [24]:
# Nothing duplicated
df_demo_clean.shape

(2891, 12)

In [25]:
# No empty primary keys
df_demo_clean[["City","State","Race"]].isna().sum().sum()

0

In [26]:
# read city demographic data with Spark
df_demo_clean=spark.read.csv(fname,sep=';',header=True)

In [27]:
# illustrate city demographic data with Spark
df_demo_clean.show(1)

+-------------+--------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|         City|   State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|              Race|Count|
+-------------+--------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|Silver Spring|Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|Hispanic or Latino|25924|
+-------------+--------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
only showing top 1 row



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The conceptual data model aggregating i94 immigration and city demographics consists of two dimension tables and one fact tables. The fact table concentrates on i94 admission information, while the two dimension tables manage applicants and city demographic information. I94 admission information and applicant information join on admission number, while I94 admission information and city demographic information join on city and state code.

#### 3.2 Mapping Out Data Pipelines

1. Clean I94 data to create Spark dataframe, which is done in Step 2
2. Clean city demographic table and create Spark dataframe, which is done in Step 2
3. Create admission and applicant tables by extracting columns respectively from I94 data
4. Write fact and dimension tables into parquet files

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [28]:
# A look at data source: immigration data
df_i94_clean.show(1)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------------+-----+--------+--------------------+------+----------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|      admnum|fltno|visatype|            i94_full|  City|State Code|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------------+-----+--------+--------------------+------+----------+
|  7.0|2016.0|   1.0| 101.0| 101.0|    BOS|20465.0|    1.0|     MA|   null|  20.0|    3.0|  1.0|    null|    null| null|      T|   null|   null|   null| 1996.0|    D/S|     M|  null|     LH|3.46608285E8|  424|      F1|BOSTON, 

In [29]:
# A look at data source: city demographics
df_demo_clean.show(1)

+-------------+--------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|         City|   State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|              Race|Count|
+-------------+--------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|Silver Spring|Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|Hispanic or Latino|25924|
+-------------+--------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
only showing top 1 row



In [30]:
# create temp tables for Spark SQL queries
df_i94_clean.createOrReplaceTempView("i94")
df_demo_clean.createOrReplaceTempView("city")

In [31]:
applicant=spark.sql("""
    select distinct admnum, 
           i94cit, 
           i94res, 
           i94bir, 
           i94visa, 
           visapost, 
           occup, 
           biryear, 
           gender, 
           insnum, 
           visatype
    from i94
""")

applicant.write.parquet("applicant.parquet",mode="append",partitionBy="biryear")

In [38]:
admission=spark.sql("""
    select distinct admnum, 
           city, 
           "State Code" as state_code, 
           i94yr, 
           i94mon, 
           i94port, 
           entdepa, 
           entdepd, 
           entdepu, 
           matflag, 
           airline,
           fltno,
           dtaddto,
           arrdate,
           i94mode,
           i94addr,
           depdate,
           count,
           dtadfile
    from i94
""")

admission.write.parquet("admission.parquet",mode="append",partitionBy="i94port")

In [41]:
city=spark.sql("""
    select distinct "City" as city,
           "State" as state,
           "Median Age" as median_age,
           "Male Population" as male_population,
           "Female Population" as female_population,
           "Total Population" as total_population,
           "Number of Veterans" as number_of_veterans,
           "Foreign-born" as foreign_born,
           "Average Household Size" as average_household_size,
           "State Code" as state_code,
           race,
           count
    from city
""")

city.write.parquet("city.parquet",mode="append",partitionBy="state")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [43]:
# Perform 3 quality checks over applicant, admission, and city demographic data. 
assert df_demo_clean.count()>0
assert applicant.count()>0
assert admission.count()>0

#### 4.3 Data dictionary 

Fact Table: I94 Admission & city

Columns: 

- ADMNUM: Admission Number
- City: city of i94 admission
- State Code: state code of i94 admission
- I94YR: 4 digit year
- I94MON: Numeric month
- I94PORT: I94 port code
- ENTDEPA: Arrival Flag, admitted or paroled into the U.S. 
- ENTDEPD: Departure Flag, Departed, lost I-94 or is deceased
- ENTDEPU: Update Flag, Either apprehended, overstayed, adjusted to perm residence 
- MATFLAG: Match flag, Match of arrival and departure records
- AIRLINE: Airline used to arrive in U.S.
- FLTNO: Flight number of Airline used to arrive in U.S.
- DTADDTO: Character Date Field, Date to which admitted to U.S. (allowed to stay until) 
- ARRDATE: the Arrival Date in the USA.
- I94MODE: which transportation route I94 is granted (Air, Land, Sea) 
- I94ADDR: the state applicant is located
- DEPDATE: the Departure Date from the USA
- COUNT: Used for summary statistics  
- DTADFILE: Character Date Field, Date added to I-94 Files

Dimension Table 1: I94 applicant information

Columns:

- ADMNUM: Admission Number
- I94CIT: Citizen code
- I94RES: Residency code
- I94BIR: Age of Respondent in Years
- I94VISA: Visa codes collapsed into three categories (Business, Pleasure, Student)
- VISAPOST: Department of State where where Visa was issued
- OCCUP: Occupation that will be performed in U.S.
- BIRYEAR: 4 digit year of birth
- GENDER: Non-immigrant sex
- INSNUM: INS number
- VISATYPE: Class of admission legally admitting the non-immigrant to temporarily stay in U.S. 

Dimension Table 2: City demographic information

Columns: 

- city: name of city
- state: name of state
- Median Age: median age of residents
- Male Population: male population of city
- Female Population: female population of city
- Total Population: total population of city
- Number of Veterans: number of veterans in the city
- Foreign-born: number of foreign born residents
- Average Household Size: average household size of 
- State Code: state code where the city is located
- Race: the race to be counted by Count
- Count: count of race population by Race


#### Step 5: Complete Project Write Up
In this project, to deal with millions of data, we applied Spark to read and transform the data. It is because Spark is designed for large data processing and can read data from multiple resources such as sas7bdat and csv file. Spark SQL is available for data manipulation for databases. 

We notice that according to the data structure from US National Tourism and Trade Office, the I94 immigration data is issued monthly in SAS format. To follow monthly update of immigration data, our data & database should be updated monthly.

##### Scenarios

1. The data was increased by 100x.

An increase in data scales means urgent needs of efficiency. We would like to introduce bid data tools to 100x increase of data. While distributed system and data processing tool like Spark/Hadoop are suggested, cloud computation like Amazon Redshift is also scheduled for this because Redshift is optimized for heavy aggregation and reading operations.

2. The data populates a dashboard that must be updated on a daily basis by 7am every day.

The data pipeline and data streaming management need professional tools, thus Apache Airflow is suggested to control and monitor the data streaming from i94 immigration files to parquet files. To follow the daily updates, the frequency should be set daily by Airflow and data quality checks should also follow the daily basis. 

3. The database needed to be accessed by 100+ people

Heavy access and connection from multiple users requires good connection settings and potential computation scale-up. The easiest way to handle user access is to use cloud databases like Amazon Redshift, thus avoiding complicated access setting process. 