# Project Title
### Data Engineering Capstone Project

#### Project Summary
The aim of the project is to build out an ETL pipeline that uses I94 immagratin data and city tempature data to create an optimized database for analysis of imagration events. In addition, imagration behavior to location tempatures will also be analysed.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import re
import pandas as pd
import psycopg2
from collections import defaultdict
from datetime import datetime, timedelta
from pyspark.sql.functions import udf

### Step 1: Scope the Project and Gather Data

#### Scope 

For the project, we will have to aggregate the I94 immigration data by destination city to form our first-dimension table. We will then aggregate city temperature data to create the second-dimension table. Both tables will be joined on destination city to form the fact table. Lastly, we will create a final database to query on immigration events. This is to determine if temperature affects the selection of destination cities for immigration. We will be using Spark to process the data.

#### Describe and Gather Data 
The I94 immigration data comes from the US National Tourism and Trade Office website. It is provided in SAS7BDAT format which is a binary database storage format.

Information included-

i94yr = 4 digit year
i94mon = numeric month
i94cit = 3 digit code of origin city
i94port = 3 character code of destination USA city
arrdate = arrival date in the USA
i94mode = 1 digit travel code
depdate = departure date from the USA
i94visa = reason for immigration

The temperature data set comes from Kaggle. It is in csv format.

Information included-

AverageTemperature = average temperature
City = city name
Country = country name
Latitude= latitude
Longitude = longitude

#### Immigration Data

In [2]:
# Read April 2016 I94 immigration data
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_im = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [3]:
# Display first 5 rows of df_im
df_im.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


#### Temperature Data

In [4]:
# Read in the temperature data
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname, sep=',')

In [5]:
# Display first 5 rows of df_temp
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [6]:
# Create Spark session with SAS7BDAT jar
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 2: Explore and Assess the Data
#### Exploring the data

I94 immigration data - Dropping all rows with the destination city code i94port is not a valid value (XXX, 99, NaN, etc). This is described in I94_SAS_Labels_Description.SAS

Temperature Data - Dropping all rows where AverageTemperature is NaN, duplicate locations, and add the i94port of the location in each entry.

In [7]:
# Create dictionary of valid i94port codes
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94port_valid = {}
with open('i94port_valid.txt') as f:
     for line in f:
         match = re_obj.search(line)
         i94port_valid[match[1]]=[match[2]]

In [8]:
# Clean I94 immigration data
def clean_i94_data(file):
    '''    
    Input: Path to I94 immigration data file
    Output: Spark dataframe of I94 immigration data with valid i94port
    '''    
    # Read I94 data into Spark
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(file)

    # Filter out entries where i94port is invalid
    df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(i94port_valid.keys())))

    return df_immigration

In [9]:
# Test function
immigration_test_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 
df_immigration_test = clean_i94_data(immigration_test_file)
df_immigration_test.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| null|      G|   null|      Y|   null| 1991.0|     D/S|     M|  null|   null|  3.73679633E9|00296|      F1|
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    nul

In [10]:
# Clean temperature data
df_temp = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

In [11]:
# Filter out data points with NaN average temperature
df_temp = df_temp.filter(df_temp.AverageTemperature != 'NaN')

In [12]:
# Remove duplicate locations
df_temp = df_temp.dropDuplicates(['City', 'Country'])

In [13]:
@udf()
def get_i94port(city):
    '''
    Input: City name
    
    Output: Corresponding i94port
    
    '''
    
    for key in i94port_valid:
        if city.lower() in i94port_valid[key][0].lower():
            return key

In [14]:
# Add iport94 code based on city name
df_temp = df_temp.withColumn("i94port", get_i94port(df_temp.City))

In [15]:
# Remove data points with no iport94 code
df_temp = df_temp.filter(df_temp.i94port != 'null')

In [16]:
# Show results
df_temp.show()

+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|     City|             Country|Latitude|Longitude|i94port|
+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|1852-07-01|             15.488|                        1.395|    Perth|           Australia|  31.35S|  114.97E|    PER|
|1828-01-01|             -1.977|                        2.551|  Seattle|       United States|  47.42N|  121.97W|    SEA|
|1743-11-01|              2.767|                        1.905| Hamilton|              Canada|  42.59N|   80.73W|    HAM|
|1849-01-01|  7.399999999999999|                        2.699|  Ontario|       United States|  34.56N|  116.76W|    ONT|
|1821-11-01|              2.322|                        2.375|  Spokane|       United States|  47.42N|  117.24W|    SPO|
|1843-01-01| 18.874000000000002|

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Fact Table - This will contain information from the I94 immigration data joined with the city temperature data on i94port

Columns:

i94yr = 4 digit year
i94mon = numeric month
i94cit = 3 digit code of origin city
i94port = 3 character code of destination city
arrdate = arrival date
i94mode = 1 digit travel code
depdate = departure date
i94visa = reason for immigration
AverageTemperature = average temperature of destination city

1st Dimension Table - This will contain events from the I94 immigration data.

Columns:

i94yr = 4 digit year
i94mon = numeric month
i94cit = 3 digit code of origin city
i94port = 3 character code of destination city
arrdate = arrival date
i94mode = 1 digit travel code
depdate = departure date
i94visa = reason for immigration

2nd Dimension Table - This will contain city temperature data.

Columns:

i94port = 3 character code of destination city (mapped from cleaned up immigration data)
AverageTemperature = average temperature
City = city name
Country = country name
Latitude= latitude
Longitude = longitude

#### 3.2 Mapping Out Data Pipelines

Pipeline Steps:

1. Clean I94 data as described in step 2 to create Spark dataframe df_immigration for each month.
2. Clean temperature data as described in step 2 to create Spark dataframe df_temp (this was already performed).
3. Create immigration dimension table by selecting relevant columns from df_immigration and write to parquet file partitioned by i94port.
4. Create temperature dimension table by selecting relevant columns from df_temp and write to parquet file partitioned by i94port.
5. Create fact table by joining immigration and temperature dimension tables on i94port and write to parquet file partitioned by i94port.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### Step 1:

In [17]:
# Path to I94 immigration data 
immigration_data = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

In [18]:
# Clean I94 immigration data and store as Spark dataframe
df_immigration = clean_i94_data(immigration_data)

In [19]:
# Extract columns for immigration dimension table
immigration_table = df_immigration.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

In [20]:
# Write immigration dimension table to parquet files partitioned by i94port
immigration_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

#### Step2

In [21]:
# Extract columns for temperature dimension table
temp_table = df_temp.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])

In [22]:
# Write temperature dimension table to parquet files partitioned by i94port
temp_table.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

#### Step3

In [23]:
# Create temporary views of the immigration and temperature data
df_immigration.createOrReplaceTempView("immigration_view")
df_temp.createOrReplaceTempView("temp_view")

In [24]:
# Create the fact table by joining the immigration and temperature views
fact_table = spark.sql('''
SELECT immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as i94port,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       temp_view.AverageTemperature as temperature,
       temp_view.Latitude as latitude,
       temp_view.Longitude as longitude
FROM immigration_view
JOIN temp_view ON (immigration_view.i94port = temp_view.i94port)
''')

In [25]:
# Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks

 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Runing Quality Checks

In [26]:
def quality_check(df, description):
    '''
    Input: Spark dataframe, description of Spark datafram
    Output: Print outcome of data quality check
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

In [None]:
# Perform data quality check
quality_check(df_immigration, "immigration table")
quality_check(df_temp, "temperature table")

#### 4.3 Data dictionary 

Fact Table - This will contain information from the I94 immigration data joined with the city temperature data on i94port

Columns:

i94yr = 4 digit year
i94mon = numeric month
i94cit = 3 digit code of origin city
i94port = 3 character code of destination city
arrdate = arrival date
i94mode = 1 digit travel code
depdate = departure date
i94visa = reason for immigration
AverageTemperature = average temperature of destination city

1st Dimension Table - This will contain events from the I94 immigration data.

Columns:

i94yr = 4 digit year
i94mon = numeric month
i94cit = 3 digit code of origin city
i94port = 3 character code of destination city
arrdate = arrival date
i94mode = 1 digit travel code
depdate = departure date
i94visa = reason for immigration

2nd Dimension Table - This will contain city temperature data.

Columns:

i94port = 3 character code of destination city (mapped from cleaned up immigration data)
AverageTemperature = average temperature
City = city name
Country = country name
Latitude= latitude
Longitude = longitude

#### Step 5: Complete Project Write Up
* Rationale- For this project, we used Spark since it can easily handle multiple file formats (SAS, csv, etc) that contain large amounts of data. Spark SQL was used to process the input files into dataframes and manipulated via standard SQL join operations to create the tables.

* Schedule of data update- Since the format of the raw files are monthly, we should continue pulling the data monthly.

#### Write a description of how you would approach the problem differently under the following scenarios:
1. The data was increased by 100x.- upload it to redshift
2. The data populates a dashboard that must be updated on a daily basis by 7am every day.- use airflow for automation
3. The database needed to be accessed by 100+ people.- use redshift because aws have great auto scale capabilities and good performance.