# Project Title
### Data Engineering Capstone Project

#### Project Summary
The goal of this project is to create an ETL pipeline using I94 immigration data and US cities demographics data to combine the two databases to display immigration events. This database can be used to answer questions relating immigration and the population of each city and states in the US such as: what are the cities with high population?

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
import numpy as np
from os import listdir
from pyspark.sql.types import IntegerType
from os.path import isfile, join
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,udf
import re

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, spark will be used to preocess the data which I94 immigration data will be aggregated by destination city to form our first dimension table, and  the US cities demographics data will be aggregted by city to form the second dimension table. The two datasets will be joined on destination city to form the fact table. The final database is optimized to query on immigration events to show the population and the number of Foreign-born.

#### Describe and Gather Data 
SAS7BDAT database format of the I94 immigration data comes from the US National Tourism and Trade Office and the following data was used in this project:
* i94yr = 4 digit year
* i94mon = month in numbers
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination USA city
* arrdate = arrival date in the USA
* i94mode = 1 digit travel code
* depdate = departure date from the USA
* i94visa = reason for immigration



The cities demographics data comes from Kaggle, it is provided in csv format, and the following categries were used:
* City = city name
* State = Satate Name
* Total Population = total number of population in the city
* Foreign-born = number of foreign born
* State Code = ISO state code

In [2]:
#Listing All I94 immigration files in the Directory 
dirName = '../../data/18-83510-I94-Data-2016'
fileNames = [f for f in listdir(dirName) if isfile(join(dirName, f))]
print (fileNames)

['i94_apr16_sub.sas7bdat', 'i94_sep16_sub.sas7bdat', 'i94_nov16_sub.sas7bdat', 'i94_mar16_sub.sas7bdat', 'i94_jun16_sub.sas7bdat', 'i94_aug16_sub.sas7bdat', 'i94_may16_sub.sas7bdat', 'i94_jan16_sub.sas7bdat', 'i94_oct16_sub.sas7bdat', 'i94_jul16_sub.sas7bdat', 'i94_feb16_sub.sas7bdat', 'i94_dec16_sub.sas7bdat']


In [3]:
# Save May 2016 I94 immigration data into Pandas
fname = '../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat'
immigration_df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [4]:
# Describe and display the df of May 2016 I94 immigration data
print(immigration_df.shape)
immigration_df.head()

(3444249, 28)


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2.0,2016.0,5.0,207.0,207.0,XXX,20605.0,,,,...,U,,1989.0,D/S,,,,1141634000.0,,F1
1,3.0,2016.0,5.0,209.0,209.0,XXX,20598.0,,,,...,U,,1989.0,05232018,,,,1863211000.0,,E2
2,4.0,2016.0,5.0,213.0,213.0,XXX,20578.0,,,,...,U,,1938.0,11032016,,,,4696371000.0,,B2
3,5.0,2016.0,5.0,213.0,213.0,XXX,20601.0,,,,...,U,,1987.0,D/S,,,,1141260000.0,,F1
4,13.0,2016.0,5.0,213.0,213.0,CHI,20577.0,1.0,IL,20270.0,...,,M,1987.0,D/S,F,,EK,64792870000.0,235.0,F1


In [5]:
#immigration_df.info()

In [6]:
# Save the US cities demographics data into Pandas
city_df = pd.read_csv('us-cities-demographics.csv', delimiter=';')
# Describe and display the df of US cities demographics data
print(city_df.shape)
city_df.head()

(2891, 12)


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [7]:
# Create Spark session with SAS7BDAT jar
spark = SparkSession\
.builder \
.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
.enableHiveSupport().getOrCreate()

# Create Spark session for csv
spark = SparkSession \
    .builder \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### Step 2: Explore and Assess the Data
#### Explore the Data 
For the I94 immigration data, we want to drop all entries of i94port as described in I94_SAS_Labels_Description.SAS for the destination city code with no port code, unknown (XXX, ..etc.), no port code as(CHN), collapsed code and considering the i94cit & i94res with valid value and making sure that it is not including the invaled values as (239,700,..etc), collapsed as (311,...etc), and without counrty code as (100,300,..etc).

In [8]:
#cleaning data function
editing = re.compile(r'[\s+\n\r\t\']')
i94port_cleaned = {}
i94cit_cleaned = {}

#cleaning i94port.txt
with open('i94port.txt', 'r') as f:
    for line in f:
        editedport = editing.sub("", line)
        (key1, val1) = editedport.split("=")
        i94port_cleaned[key1] = val1
        
#cleaning i94cit.txt
with open('i94cit.txt', 'r') as f2:
    for line2 in f2:
        edited2 = editing.sub("", line2)
        (key2, val2) = edited2.split("=")
        i94cit_cleaned[key2] = val2
#list(i94cit_i94res_cleaned)

In [10]:
#removing additional words for cleaning 
for val1 in i94port_cleaned.values():
    val1 = val1.replace('(BPS)', '')
    val1 = val1.replace('#ARPT', '') 
    val1 = val1.replace('(I-91)', '')
    val1 = val1.replace('(RT.5)', '')
    val1 = val1.replace('(BP-SECTORHQ)', '')
    val1 = val1.replace('#INTL', '')
    i94port_cleaned[key1] = val1
    
#print(i94port_cleaned)

In [11]:
def clean_i94_data(file):
    '''
    Input: Path to I94 immigration data file
    Output: Spark dataframe with I94 immigration data filtered with the i94port
    '''
    
    # Read I94 data into Spark
    df_immig_filtered = spark.read.format('com.github.saurfang.sas.spark').load(file)
    df_immig_filtered = df_immig_filtered.withColumn("i94cit", df_immig_filtered["i94cit"].cast(IntegerType()))
    
    # Filter out i94port & i94cit
    df_immig_filtered1 = df_immig_filtered.filter(df_immig_filtered.i94port.isin(list(i94port_cleaned.keys())))
    df_immig_filtered2 = df_immig_filtered1.filter(df_immig_filtered1.i94cit.isin(list(i94cit_cleaned.keys())))
    return df_immig_filtered2

# Testing 
# fname is the '../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat' 
#df_test = clean_i94_data(fname)
#df_test.show()

### City Demographics Data
For the US cities demographics data, the male and female populations were added to make sure having the right total population, and the Number of Veteransit seems & Foreign-bornthat should be less than or equal to the total population. It seems that the data is cleaned, and no big changes are required. 

In [13]:
#saving the city data to spark dataframe
city_df = spark.read.csv("us-cities-demographics.csv",header=True,sep=";")

# Clean City Demographics Data
city_df = city_df.withColumn('Total Population', city_df['Male Population'] + city_df['Female Population'])
city_df = city_df.withColumn("Total Population", city_df["Total Population"].cast(IntegerType()))
city_df = city_df.where((col("Foreign-born") <= col("Total Population")) & (col("Number of Veterans") <= col("Total Population")))
city_df = city_df.select('City', 'State', 'State Code', 'Total Population', 'Foreign-born')
city_df = city_df.withColumnRenamed("Total Population", "Total_Population")
city_df.show()
city_df.count() #2875 out of 2891

+----------------+--------------+----------+----------------+------------+
|            City|         State|State Code|Total_Population|Foreign-born|
+----------------+--------------+----------+----------------+------------+
|   Silver Spring|      Maryland|        MD|           82463|       30908|
|          Quincy| Massachusetts|        MA|           93629|       32935|
|          Hoover|       Alabama|        AL|           84839|        8229|
|Rancho Cucamonga|    California|        CA|          175232|       33878|
|          Newark|    New Jersey|        NJ|          281913|       86253|
|          Peoria|      Illinois|        IL|          118661|        7517|
|        Avondale|       Arizona|        AZ|           80683|        8355|
|     West Covina|    California|        CA|          108489|       37038|
|        O'Fallon|      Missouri|        MO|           85032|        3269|
|      High Point|North Carolina|        NC|          109828|       16315|
|          Folsom|    Cal

2875

In [14]:
#To check that the state code of the city database is the same state code of the i94port 

@udf()
def create_i94port(state_code):
    '''
    Input: State Code name form the I94 immigration data
    Output: Corresponding State Code name as in the i94port.txt  
    '''
    list_statecode = list()
    for val in i94port_cleaned.values():
        list_statecode.append(val[-2:])
        for element in list_statecode:
            if state_code in element:
                return element
    
# Add iport94 code based on city name
city_df= city_df.withColumn("i94port_State_code", create_i94port(city_df["State Code"]))

# Remove entries with no iport94 code
city_df=city_df.filter(city_df.i94port_State_code != 'null')

# Show results
city_df.show()

+----------------+--------------+----------+----------------+------------+------------------+
|            City|         State|State Code|Total_Population|Foreign-born|i94port_State_code|
+----------------+--------------+----------+----------------+------------+------------------+
|   Silver Spring|      Maryland|        MD|           82463|       30908|                MD|
|          Quincy| Massachusetts|        MA|           93629|       32935|                MA|
|          Hoover|       Alabama|        AL|           84839|        8229|                AL|
|Rancho Cucamonga|    California|        CA|          175232|       33878|                CA|
|          Newark|    New Jersey|        NJ|          281913|       86253|                NJ|
|          Peoria|      Illinois|        IL|          118661|        7517|                IL|
|        Avondale|       Arizona|        AZ|           80683|        8355|                AZ|
|     West Covina|    California|        CA|          108489

In [20]:
#filter out the city names from the city database and returning the equivalent i94port of it

@udf()
def create_i94port(city):
    '''
    #Input: City name
    #Output: Corresponding i94port (character code of destination city) 
    
    '''
    list_city = list()
    for key, val in i94port_cleaned.items():
        val = val.split(',')[0]
        if city.lower() in val.lower():
            return key
    
# Add iport94 code based on city name
city_df= city_df.withColumn("i94port", create_i94port(city_df["City"]))
#city_df= city_df.withColumn("i94port_city", create_i94port(city_df["City"]))
# Remove entries with no iport94 code
city_df=city_df.filter(city_df.i94port != 'null')

# Show results
city_df.show()

+------------+------------+----------+----------------+------------+------------------+-------+
|        City|       State|State Code|Total_Population|Foreign-born|i94port_State_code|i94port|
+------------+------------+----------+----------------+------------+------------------+-------+
|      Newark|  New Jersey|        NJ|          281913|       86253|                NJ|    NEW|
|      Peoria|    Illinois|        IL|          118661|        7517|                IL|    PIA|
|Philadelphia|Pennsylvania|        PA|         1567442|      205339|                PA|    PHI|
|      Laredo|       Texas|        TX|          255789|       68427|                TX|    LCB|
|       Allen|Pennsylvania|        PA|          120207|       19652|                PA|    MCA|
|     Suffolk|    Virginia|        VA|           88161|        2829|                VA|    FOK|
|       Tulsa|    Oklahoma|        OK|          403091|       43751|                OK|    TUL|
|     Seattle|  Washington|        WA|  

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
I94 immigration data is the first dimension table with the following data:
* i94yr = 4 digit year
* i94mon = month in numbers
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination USA city
* arrdate = arrival date in the USA
* i94mode = 1 digit travel code
* depdate = departure date from the USA
* i94visa = reason for immigration


The City Demographics Data is the second dimension table which contains the US cities with their population data. The following data were extracted from that data:
* City = city name
* State = Satate Name
* Total_Population = total number of population in the city
* Foreign-born = number of foreign born
* i94port_State_code = ISO state code (mapped from immigration data during cleanup step)
* i94port = 3 character code of destination city (mapped from immigration data during cleanup step)

The fact table will contain information from the I94 immigration data joined with the City Demographics Data on i94port:
* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination city
* arrdate = arrival date
* i94mode = 1 digit travel code
* depdate = departure date
* i94visa = reason for immigration
* Total_Population = total number of population in the city
* Foreign-born = number of foreign born
*The tables will be saved to Parquet files partitioned by city (i94port).

#### 3.2 Mapping Out Data Pipelines
The pipeline steps are described below:
1. Clean I94 data as showed in step 2 to create Spark dataframe immigration_df for each month
2. Clean City Demographics Data as showed in step 2 to create Spark dataframe city_df 
3. Create immigration dimension table and selecting related columns from immigration_df, and write to parquet file partitioned by i94port
4. Create City Demographics Data by selecting related columns from city_df and write to parquet file partitioned by i94port
5. Create fact table by joining immigration and city dimension tables on i94port and write to parquet file partitioned by i94port

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [21]:
# Path to I94 immigration data 
immigration_data = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

# Clean I94 immigration data and store as Spark dataframe
immigration_df = clean_i94_data(immigration_data)

# Extract columns for immigration dimension table
immigration_table = immigration_df.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

# parquet files partitioned by i94port for immigration table
immigration_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [22]:
# Extract columns for temperature dimension table
cities_table = city_df.select(['City', 'State', 'i94port_State_code', 'Total_Population','i94port', 'Foreign-born'])

# Write temperature dimension table to parquet files partitioned by i94port
cities_table.write.mode("append").partitionBy("i94port").parquet("/results/city.parquet")

In [34]:
# Create temporary views of the immigration and city data
immigration_df.createOrReplaceTempView("immigration_view")
city_df.createOrReplaceTempView("city_view")

# Create the fact table by joining the immigration and city views
fact_table = spark.sql("""
SELECT immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as i94port,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       city_view.Total_Population as population,
       city_view.i94port_State_code as state_code
       
FROM immigration_view
JOIN city_view ON (immigration_view.i94port = city_view.i94port)
""")

# Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [36]:
def quality_check(df, table_desciption):
    '''
    Input: Spark dataframe, description of Spark datafram   
    Output: Print outcome of data quality check
    
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} is zero record".format(table_desciption))
    else:
        print("Data quality check for {} is {} records".format(table_desciption, result))
    #return 0

# Perform data quality check
quality_check(immigration_df, "immigration_table")
quality_check(city_df, "cities_table")

Data quality check for immigration_table is 2695274 records
Data quality check for cities_table is 706 records


#### 4.3 Data dictionary 
I94 immigration data of the US National Tourism and Trade Officeis the first dimension table with the following data:
* i94yr = 4 digit year
* i94mon = month in numbers
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination city
* arrdate = arrival date
* i94mode = 1 digit travel code
* depdate = departure date
* i94visa = reason for immigration

The City Demographics Data is the second dimension table which contains the US cities with their population data. The following data were extracted from that data:
* City = city name
* State = Satate Name
* Total_Population = total number of population in the city
* Foreign-born = number of foreign born
* i94port_State_code = ISO state code (mapped from immigration data during cleanup step)
* i94port = 3 character code of destination city (mapped from immigration data during cleanup step)

The fact table will contain information from the I94 immigration data joined with the City Demographics Data on i94port:
* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination city
* arrdate = arrival date
* i94mode = 1 digit travel code
* depdate = departure date
* i94visa = reason for immigration
* Total_Population = total number of population in the city
* Foreign-born = number of foreign born
* The tables will be saved to Parquet files partitioned by city (i94port).

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
Spark was chosen because it is one of the easiest tool that handles multiple file formats with large amounts of data. Spark SQL was chosen to process the large input files into dataframes and manipulated via standard SQL join operations to form additional tables.
* Propose how often the data should be updated and why.
The data should be updated in a monthly bases.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 1. If the data was increased by 100x, we can consider moving Spark to cluster mode using a cluster manager.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 2. We use Airflow to run the ETL pipeline.
 * The database needed to be accessed by 100+ people.
 3. There should be no problem with 100 or so people accessing this data, we can consider publishing the parquet files to HDFS and have a reading access to them.