# ETL Pipeline for Immigration and Temperature data
### Data Engineering Capstone Project

#### Project Summary
The goal of this project is to create an ETL pipeline using I94 immigration data and city temperature data to form a database that is optimized for queries on immigration events. This database can then be used to answer questions relating to immigration behavior regarding destination area temperature, such as: Do people prefer to immigrate to warmer places?

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import re
import numpy as np
import pandas as pd

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, we will aggregate I94 immigration data by destination city to form our first dimension table. Next we will aggregate city temperature data by city to form the second dimension table. The two datasets will be joined on destination city to form the fact table. The final database is optimized to query on immigration events to determine if temperature affects the selection of destination cities. Spark will be used to process the data.

#### Describe and Gather Data 
The I94 immigration [data](https://travel.trade.gov/research/reports/i94/historical/2016.html) comes from the US National Tourism and Trade Office. It is provided in SAS7BDAT [format](https://cran.r-project.org/web/packages/sas7bdat/vignettes/sas7bdat.pdf) which is a binary database storage format. Some relevant attributes include:

* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination USA city
* arrdate = arrival date in the USA
* i94mode = 1 digit travel code
* depdate = departure date from the USA
* i94visa = reason for immigration

The temperature [data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) comes from Kaggle. It is provided in csv format. Some relevant attributes include:

* AverageTemperature = average temperature
* City = city name
* Country = country name
* Latitude= latitude
* Longitude = longitude

In [2]:
# Read July 2016 I94 immigration data into Pandas for exploration
file_loc = '../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat'
july_df = pd.read_sas(file_loc, 'sas7bdat', encoding="ISO-8859-1")

# Display the first 5 rows of the dataframe
july_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,1.0,2016.0,7.0,254.0,276.0,LOS,20636.0,1.0,CA,20640.0,...,,M,1978.0,9282016,M,,OZ,63092900000.0,202,WT
1,2.0,2016.0,7.0,140.0,140.0,NYC,20636.0,1.0,NY,20657.0,...,,M,1971.0,9282016,F,,DL,63092900000.0,9858,WT
2,3.0,2016.0,7.0,135.0,135.0,ORL,20636.0,1.0,FL,20657.0,...,,M,2006.0,9282016,M,,VS,63092900000.0,71,WT
3,4.0,2016.0,7.0,124.0,124.0,TAM,20636.0,1.0,FL,20645.0,...,,M,1999.0,9282016,M,,LH,63092900000.0,482,WT
4,5.0,2016.0,7.0,130.0,130.0,LOS,20636.0,1.0,CA,20662.0,...,,M,2015.0,9282016,M,,SU,63092900000.0,106,WT


In [3]:
# Display the last 5 rows of the dataframe
july_df.tail()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
4265026,686686.0,2016.0,7.0,745.0,745.0,THO,20638.0,3.0,NJ,20644.0,...,,M,1951.0,1022017,F,233734.0,,1636851000.0,,B2
4265027,689855.0,2016.0,7.0,745.0,745.0,THO,20638.0,3.0,NJ,20644.0,...,,M,1946.0,1022017,M,,,1636906000.0,LAND,B2
4265028,1033772.0,2016.0,7.0,111.0,749.0,NIA,20640.0,3.0,MD,20690.0,...,,M,1990.0,12072016,F,,,1165249000.0,LAND,B2
4265029,1033774.0,2016.0,7.0,111.0,749.0,NIA,20640.0,3.0,MD,20690.0,...,,M,2007.0,12072016,F,,,1166088000.0,LAND,B2
4265030,2718366.0,2016.0,7.0,574.0,749.0,NIA,20647.0,3.0,NY,20655.0,...,,M,1956.0,12142016,M,,,99156440000.0,LAND,B2


In [4]:
# Dislay the shape of the dataframe
july_df.shape

(4265031, 28)

In [5]:
# Read in the temperature data into Pandas for exploration
file_loc = '../../data2/GlobalLandTemperaturesByCity.csv'
temp_df = pd.read_csv(file_loc, sep=',')

# Display the first 5 rows of the dataframe
temp_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [6]:
# Display the last 5 rows of the dataframe
temp_df.tail()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
8599207,2013-05-01,11.464,0.236,Zwolle,Netherlands,52.24N,5.26E
8599208,2013-06-01,15.043,0.261,Zwolle,Netherlands,52.24N,5.26E
8599209,2013-07-01,18.775,0.193,Zwolle,Netherlands,52.24N,5.26E
8599210,2013-08-01,18.025,0.298,Zwolle,Netherlands,52.24N,5.26E
8599211,2013-09-01,,,Zwolle,Netherlands,52.24N,5.26E


In [7]:
temp_df.shape

(8599212, 7)

In [8]:
# Create a spark session with sas7bdat
spark = SparkSession \
    .builder \
    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
    .enableHiveSupport().getOrCreate()

### Step 2: Clean the data

For the I94 immigration data, we want to drop all entries where the destination city code i94port is not a valid value (e.g., XXX, 99, etc) as described in I94_SAS_Labels_Description.SAS. 

For the temperature data, we want to drop all entries where AverageTemperature is NaN, then drop all entries with duplicate locations, and then add the i94port of the location in each entry.

In [9]:
# Clean I94 immigration data

# Create dictionary of valid i94port codes
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94port_valid = {}
with open('i94port_valid.txt') as f:
     for line in f:
         match = re_obj.search(line)
         i94port_valid[match[1]]=[match[2]]

def clean_i94_data(file):
    """
    Input: Path to I94 immigration data file
    Output: Spark dataframe of I94 immigration data with valid i94port
    """
    
    # Read I94 data into Spark
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(file)

    # Filter out entries where i94port is invalid
    df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(i94port_valid.keys())))

    return df_immigration


In [10]:
# Test function
file_loc = '../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat'
df_immigration_test = clean_i94_data(file_loc)
df_immigration_test.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|  1.0|2016.0|   7.0| 254.0| 276.0|    LOS|20636.0|    1.0|     CA|20640.0|  38.0|    2.0|  1.0|20160701|    null| null|      G|      O|   null|      M| 1978.0|09282016|     M|  null|     OZ|6.3092898033E10|00202|      WT|
|  2.0|2016.0|   7.0| 140.0| 140.0|    NYC|20636.0|    1.0|     NY|20657.0|  45.0|    2.0|  1.0|20160701|   

In [11]:
# Clean temperature data
df_temp=spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

# Filter out entries with NaN average temperature
df_temp=df_temp.filter(df_temp.AverageTemperature != 'NaN')

# Remove duplicate locations
df_temp=df_temp.dropDuplicates(['City', 'Country'])

@udf()
def get_i94port(city):
    """
    Input: City name
    Output: Corresponding i94port
    """
    for key in i94port_valid:
        if city.lower() in i94port_valid[key][0].lower():
            return key

In [12]:
# Add iport94 code based on city name
df_temp=df_temp.withColumn("i94port", get_i94port(df_temp.City))

In [13]:
# Remove entries with no iport94 code
df_temp=df_temp.filter(df_temp.i94port != 'null')

In [14]:
# Show results
df_temp.show()

+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|     City|             Country|Latitude|Longitude|i94port|
+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|1852-07-01|             15.488|                        1.395|    Perth|           Australia|  31.35S|  114.97E|    PER|
|1828-01-01|             -1.977|                        2.551|  Seattle|       United States|  47.42N|  121.97W|    SEA|
|1743-11-01|              2.767|                        1.905| Hamilton|              Canada|  42.59N|   80.73W|    HAM|
|1849-01-01|  7.399999999999999|                        2.699|  Ontario|       United States|  34.56N|  116.76W|    ONT|
|1821-11-01|              2.322|                        2.375|  Spokane|       United States|  47.42N|  117.24W|    SPO|
|1843-01-01| 18.874000000000002|

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The first dimension table will contain events from the I94 immigration data. 
The columns below will be extracted from the immigration dataframe:

* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination city
* arrdate = arrival date
* i94mode = 1 digit travel code
* depdate = departure date
* i94visa = reason for immigration

The second dimension table will contain city temperature data. 
The columns below will be extracted from the temperature dataframe:

* i94port = 3 character code of destination city (mapped from immigration data during cleanup step)
* AverageTemperature = average temperature
* City = city name
* Country = country name
* Latitude= latitude
* Longitude = longitude

The fact table will contain information from the I94 immigration data joined with the city temperature data on i94port:

* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination city
* arrdate = arrival date
* i94mode = 1 digit travel code
* depdate = departure date
* i94visa = reason for immigration
* AverageTemperature = average temperature of destination city

The tables will be saved to Parquet files partitioned by city (i94port).

#### 3.2 Mapping Out Data Pipelines
The pipeline steps are described below:

1. Clean I94 data as described in step 2 to create Spark dataframe df_immigration for each month
2. Clean temperature data as described in step 2 to create Spark dataframe df_temp (already performed)
3. Create immigration dimension table by selecting relevant columns from df_immigration and write to parquet file partitioned by i94port
4. Create temperature dimension table by selecting relevant columns from df_temp and write to parquet file partitioned by i94port
5. Create fact table by joining immigration and temperature dimension tables on i94port and write to parquet file partitioned by i94port

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [15]:
# Location to I94 immigration data 
file_loc = '../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat'

# Clean I94 immigration data and store as Spark dataframe
df_immigration = clean_i94_data(file_loc)

# Show results
df_immigration.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|  1.0|2016.0|   7.0| 254.0| 276.0|    LOS|20636.0|    1.0|     CA|20640.0|  38.0|    2.0|  1.0|20160701|    null| null|      G|      O|   null|      M| 1978.0|09282016|     M|  null|     OZ|6.3092898033E10|00202|      WT|
|  2.0|2016.0|   7.0| 140.0| 140.0|    NYC|20636.0|    1.0|     NY|20657.0|  45.0|    2.0|  1.0|20160701|   

In [16]:
# Get the row count
df_immigration.count()

4247749

In [17]:
# Create immigration dimension table by extracting columns from df_immigration
immigration_table = df_immigration.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

# Show results
immigration_table.show()

+------+------+------+-------+-------+-------+-------+-------+
| i94yr|i94mon|i94cit|i94port|arrdate|i94mode|depdate|i94visa|
+------+------+------+-------+-------+-------+-------+-------+
|2016.0|   7.0| 254.0|    LOS|20636.0|    1.0|20640.0|    2.0|
|2016.0|   7.0| 140.0|    NYC|20636.0|    1.0|20657.0|    2.0|
|2016.0|   7.0| 135.0|    ORL|20636.0|    1.0|20657.0|    2.0|
|2016.0|   7.0| 124.0|    TAM|20636.0|    1.0|20645.0|    2.0|
|2016.0|   7.0| 130.0|    LOS|20636.0|    1.0|20662.0|    2.0|
|2016.0|   7.0| 135.0|    SFR|20636.0|    1.0|20648.0|    2.0|
|2016.0|   7.0| 115.0|    OAK|20636.0|    1.0|20646.0|    2.0|
|2016.0|   7.0| 438.0|    HHW|20636.0|    1.0|20644.0|    2.0|
|2016.0|   7.0| 131.0|    LVG|20636.0|    1.0|20657.0|    2.0|
|2016.0|   7.0| 140.0|    LOS|20636.0|    1.0|20656.0|    2.0|
|2016.0|   7.0| 131.0|    LVG|20636.0|    1.0|20663.0|    2.0|
|2016.0|   7.0| 140.0|    LOS|20636.0|    1.0|20656.0|    2.0|
|2016.0|   7.0| 135.0|    SFR|20636.0|    1.0|20642.0| 

In [18]:
# Get the row count
immigration_table.count()

4247749

In [19]:
# Write immigration dimension table to parquet files partitioned by i94port
immigration_table.write.mode("append").partitionBy("i94port").parquet("/home/workspace/results/immigration.parquet")

In [20]:
# Create temperature dimension table by extracting columns from df_temp
temp_table = df_temp.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])

# Show results
temp_table.show()

+-------------------+---------+--------------------+--------+---------+-------+
| AverageTemperature|     City|             Country|Latitude|Longitude|i94port|
+-------------------+---------+--------------------+--------+---------+-------+
|             15.488|    Perth|           Australia|  31.35S|  114.97E|    PER|
|             -1.977|  Seattle|       United States|  47.42N|  121.97W|    SEA|
|              2.767| Hamilton|              Canada|  42.59N|   80.73W|    HAM|
|  7.399999999999999|  Ontario|       United States|  34.56N|  116.76W|    ONT|
|              2.322|  Spokane|       United States|  47.42N|  117.24W|    SPO|
| 18.874000000000002|Abu Dhabi|United Arab Emirates|  24.92N|   54.98E|    MAA|
|             25.229|    Anaco|           Venezuela|   8.84N|   64.05W|    ANA|
|              9.904|      Ica|                Peru|  13.66S|   75.14W|    CHI|
|              9.833|  Nogales|       United States|  31.35N|  111.20W|    NOG|
|  8.129999999999999|  Atlanta|       Un

In [21]:
# Get the row count
temp_table.count()

207

In [22]:
# Write temperature dimension table to parquet files partitioned by i94port
temp_table.write.mode("append").partitionBy("i94port").parquet("/home/workspace/results/temperature.parquet")

In [23]:
# Create temporary views of the immigration and temperature data
df_immigration.createOrReplaceTempView("immigration_view")
df_temp.createOrReplaceTempView("temp_view")

In [24]:
# Create the fact table by joining the immigration and temperature views
fact_table = spark.sql('''
SELECT immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as i94port,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       temp_view.AverageTemperature as temperature,
       temp_view.Latitude as latitude,
       temp_view.Longitude as longitude
FROM immigration_view
JOIN temp_view ON (immigration_view.i94port = temp_view.i94port)
''')

# Show results
fact_table.show()

+------+-----+-----+-------+------------+--------------+------+-----------------+--------+---------+
|  year|month| city|i94port|arrival_date|departure_date|reason|      temperature|latitude|longitude|
+------+-----+-----+-------+------------+--------------+------+-----------------+--------+---------+
|2016.0|  7.0|690.0|    SNA|     20636.0|       20645.0|   2.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  7.0|690.0|    SNA|     20636.0|       20645.0|   2.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  7.0|690.0|    SNA|     20636.0|       20645.0|   2.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  7.0|690.0|    SNA|     20636.0|       20645.0|   2.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  7.0|129.0|    SNA|     20636.0|       20650.0|   2.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  7.0|690.0|    SNA|     20636.0|       20645.0|   2.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  7.0|690.0|    SNA|     20636.0|       20645.0|   2.0|7.168999999999999|  29.74N| 

In [None]:
# Get the row count
fact_table.count()

### Taking too long to run this cell... Stopped it
### https://stackoverflow.com/questions/45142105/count-on-spark-dataframe-is-extremely-slow

In [26]:
# Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/home/workspace/results/fact.parquet")

#### 4.2 Data Quality Checks
The data quality check will that there are correct number of entries in each table.

In [27]:
def quality_check(df, description):
    """
    Input: spark_dataframe table, description of spark dataframe table
    Output: print outcome of data quality check
    """
    result = df.count()
    if result == 0:
        print(f"Data quality check failed for {description} with zero records")
    else:
        print(f"Data quality check passed for {description} with {result} records")


In [28]:
# Quality check on immigration table
quality_check(immigration_table, "immigration table")

Data quality check passed for immigration table with 4247749 records


In [None]:
# Quality check on temperature table
quality_check(temp_table, "temperature table")

### Taking too long, please refer to cell 21
### https://stackoverflow.com/questions/45142105/count-on-spark-dataframe-is-extremely-slow

In [None]:
# Quality check on fact table
quality_check(temp_table, "temperature table")

### Taking too long to run this cell... Stopped it
### https://stackoverflow.com/questions/45142105/count-on-spark-dataframe-is-extremely-slow

#### 4.3 Data dictionary 
The first dimension table will contain events from the I94 immigration data. The columns below will be extracted from the immigration dataframe:

* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination city
* arrdate = arrival date
* i94mode = 1 digit travel code
* depdate = departure date
* i94visa = reason for immigration

The second dimension table will contain city temperature data. The columns below will be extracted from the temperature dataframe:

* i94port = 3 character code of destination city (mapped from immigration data during cleanup step)
* AverageTemperature = average temperature
* City = city name
* Country = country name
* Latitude = latitude
* Longitude = longitude

The fact table will contain information from the I94 immigration data joined with the city temperature data on i94port:

* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination city
* arrdate = arrival date
* i94mode = 1 digit travel code
* depdate = departure date
* i94visa = reason for immigration
* AverageTemperature = average temperature of destination city

#### Step 5: Complete Project Write Up
1. Clearly state the rationale for the choice of tools and technologies for the project.

    * For this project I used pandas for initial investigation of data, and then apache spark to read, transform and create data outputs for further analysis as it can easily handle multiple file formats (including SAS) containing large amounts of data.  


2. Propose how often the data should be updated and why.

    * Since data was taken from monthly files, data should be run monthly. This should give the most up to date data for government and organisations.

Write a description of how you would approach the problem differently under the following scenarios:

3. The data was increased by 100x.

    * If the data was increased by 100x, I would go with Apache Hadoop to create a distributed processing system for faster processing.


4. The data populates a dashboard that must be updated on a daily basis by 7am every day.

    * To update on a daily basis I would go with Apache Airflow to create a schedule to run a distributed update on all tables with data streamed from the source.


5. The database needed to be accessed by 100+ people.

    * If the data needs to be accessed by 100+ people, I would probably build a serverless web app running on Amazon AWS where scaling is set to auto so that it can scale up when required.