# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [31]:
# Do all imports and installs here
import pandas as pd
import pyspark
import psycopg2
import re
from collections import defaultdict
from datetime import datetime, timedelta
from pyspark.sql.functions import udf

### Step 1: Scope the Project and Gather Data

#### Scope 
This project's aim is to give a wider view of the US I94 immigration data in the light of temperature data for further analysis of immigration data.We can conclude whether temperature affects the selection of destination cities for immigration.Our data model will be based on aggregating the I94 immigration data by destination city to form our first-dim table then aggregate city temperature data by city to create dim2 table. Both tables will be joined on destination city to form the fact table creating a final database to query on immigration events. 

#### I94 Immigration Data

This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from.Check the sample data for more understanding.

In [32]:
i94_path = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_i94 = pd.read_sas(i94_path, 'sas7bdat', encoding="ISO-8859-1")
df_i94.head(5)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [33]:
# Read in the data here
df_im = pd.read_csv("immigration_data_sample.csv")
df_im.head(5)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [34]:
# Read in the temperature data
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname, sep=',')
# Display first 5 rows of df_temp
df_temp.head()


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [37]:
#Create a Spark session
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
spark = SparkSession.builder\
        .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11")\
        .enableHiveSupport().getOrCreate()
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.



In [38]:
df_i94.describe()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,arrdate,i94mode,depdate,i94bir,i94visa,count,biryear,admnum
count,3096313.0,3096313.0,3096313.0,3096313.0,3096313.0,3096313.0,3096074.0,2953856.0,3095511.0,3096313.0,3096313.0,3095511.0,3096313.0
mean,3078652.0,2016.0,4.0,304.9069,303.2838,20559.85,1.07369,20573.95,41.76761,1.845393,1.0,1974.232,70828850000.0
std,1763278.0,0.0,0.0,210.0269,208.5832,8.777339,0.5158963,29.35697,17.42026,0.398391,0.0,17.42026,22154420000.0
min,6.0,2016.0,4.0,101.0,101.0,20545.0,1.0,15176.0,-3.0,1.0,1.0,1902.0,0.0
25%,1577790.0,2016.0,4.0,135.0,131.0,20552.0,1.0,20561.0,30.0,2.0,1.0,1962.0,56035230000.0
50%,3103507.0,2016.0,4.0,213.0,213.0,20560.0,1.0,20570.0,41.0,2.0,1.0,1975.0,59360940000.0
75%,4654341.0,2016.0,4.0,512.0,504.0,20567.0,1.0,20579.0,54.0,2.0,1.0,1986.0,93509870000.0
max,6102785.0,2016.0,4.0,999.0,760.0,20574.0,9.0,45427.0,114.0,3.0,1.0,2019.0,99915570000.0


#### Cleaning Steps
Document steps necessary to clean the data

In [41]:
# Get port locations from SAS text file
with open("./I94_SAS_Labels_Descriptions.SAS") as f:
    content = f.readlines()
content = [x.strip() for x in content]
ports = content[302:962]
splitted_ports = [p.split("=") for p in ports]
port_codes = [x[0].replace("'","").strip() for x in splitted_ports]
port_locations = [x[1].replace("'","").strip() for x in splitted_ports]
port_cities = [x.split(",")[0] for x in port_locations]
port_states = [x.split(",")[-1] for x in port_locations]
df_pl = pd.DataFrame({"port_code" : port_codes, "port_city": port_cities, "port_state": port_states})
df_pl.head(20)

Unnamed: 0,port_code,port_city,port_state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK
5,DTH,DUTCH HARBOR,AK
6,EGL,EAGLE,AK
7,FRB,FAIRBANKS,AK
8,HOM,HOMER,AK
9,HYD,HYDER,AK


I94 immigration data - discarding all data points with the destination city code i94port is not a valid value (XXX, 99, NaN, etc)

Temperature Data - discarding all data points where AverageTemperature is duplicate locations, and add the i94port of the location for each entry.



In [87]:
# Create dictionary of valid i94port codes
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94port_valid = {}
with open('i94port_valid.txt') as f:
     for line in f:
         match = re_obj.search(line)
         i94port_valid[match[1]]=[match[2]]

# Clean I94 immigration data
def clean_i94_data(file):
    '''    
    Input: Path to I94 immigration data file
    Output: Spark dataframe of I94 immigration data with valid i94port
    '''    
    # Read I94 data into Spark
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(file)

    # Filter out entries where i94port is invalid
    df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(i94port_valid.keys())))

    return df_immigration



In [88]:
# Clean I94 immigration data
def clean_i94_data(file):
    '''    
    Input: Path to I94 immigration data file
    Output: Spark dataframe of I94 immigration data with valid i94port
    '''    
    # Read I94 data into Spark
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(file)
    return df_immigration

In [93]:
# Test function
immigration_test_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 
df_immigration_test = clean_i94_data(immigration_test_file)
df_immigration_test.show(10)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [90]:
@udf()
def get_i94port(city):
    '''
    Input: City name
    
    Output: Corresponding i94port
    
    '''
    
    for key in i94port_valid:
        if city.lower() in i94port_valid[key][0].lower():
            return key

# Add iport94 code based on city name
df_temp = df_temp.withColumn("i94port", get_i94port(df_temp.City))

# Remove data points with no iport94 code
df_temp = df_temp.filter(df_temp.i94port != 'null')



In [91]:
# Clean temperature data
df_temp = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")
# Remove duplicate locations
df_temp = df_temp.dropDuplicates(['City', 'Country'])

In [94]:
df_temp.show(10)

+----------+--------------------+-----------------------------+-----------+-------------+--------+---------+
|        dt|  AverageTemperature|AverageTemperatureUncertainty|       City|      Country|Latitude|Longitude|
+----------+--------------------+-----------------------------+-----------+-------------+--------+---------+
|1743-11-01|               3.264|                        1.665|  Allentown|United States|  40.99N|   74.56W|
|1779-11-01|0.011999999999999985|                        2.714|     Atyrau|   Kazakhstan|  47.42N|   50.92E|
|1825-01-01|  26.069000000000003|                         2.16|    Bintulu|     Malaysia|   2.41N|  113.30E|
|1825-01-01|              26.517|           2.5839999999999996|Butterworth|     Malaysia|   5.63N|  100.09E|
|1845-01-01|              24.995|                        1.871|     Cainta|  Philippines|  15.27N|  120.83E|
|1825-01-01|              24.753|           2.1519999999999997|     Ciamis|    Indonesia|   7.23S|  107.84E|
|1850-01-01|       

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model


Map out the conceptual data model and explain why you chose that model
 
Star model was used due to the simple structure of data which would make it easier to get the analysis results where we depended on i94port to be the primary key by which linked the fact table to the 2 dimension tables.


Step 3: Define the Data Model

3.1 Conceptual Data Model

Fact Table - All info from the I94 immigration data joined with temperature data of the city on i94port

Columns:

    i94port = 3-character code of destination city
    arrdate = arrival date
    i94mode = single digit travel code
    depdate = departure date
    AverageTemperature = average temperature of destination city
    i94visa = reason for immigration
    i94yr = 4-digit year
    i94mon = numeric month
    i94cit = 3-digit code of origin city
    
 Dimension Table -  events from the I94 immigration data.

Columns:

    i94yr = 4-digit year
    i94mon = numeric month
    arrdate = arrival date
    i94mode = 1-digit travel code
    depdate = departure date
    i94visa = reason for immigration
    i94cit = 3-digit code of origin city
    i94port = 3-character code of destination city
 Dimension Table -  city temperature data.

Columns:

    i94port = 3-character code of destination city 
    AverageTemperature = average temperature
    City = city 
    Country = country 
    Latitude= latitude
    Longitude = longitude


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

Clean I94 data to create a Spark dataframe df_immigration for each month.

Clean temperature data to create Spark dataframe df_temp.

Create immigration dimension table from df_immigration and write to parquet file grouped by i94port.

Create temperature dimension table from df_temp and write to parquet file grouped by i94port.

Create fact table by joining immigration and temperature dimension tables on i94port and write to parquet file grouped by i94port.


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [95]:
# Path to I94 immigration data 
immigration_data = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

# Clean I94 immigration data and store as Spark dataframe
df_immigration = clean_i94_data(immigration_data)

# Extract columns for immigration dimension table
immigration_table = df_immigration.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

# Write immigration dimension table to parquet files partitioned by i94port
immigration_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [None]:
# Create temporary views of the immigration and temperature data
df_immigration.createOrReplaceTempView("immigration_view")
df_temp.createOrReplaceTempView("temp_view")

# Create the fact table by joining the immigration and temperature views
fact_table = spark.sql('''
SELECT immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,

   immigration_view.i94port as i94port,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       temp_view.AverageTemperature as temperature,
       temp_view.Latitude as latitude,
       temp_view.Longitude as longitude
FROM immigration_view
JOIN temp_view ON (immigration_view.i94port = temp_view.i94port)
''')



In [None]:
# Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [97]:
# Perform quality checks here
def quality_check(df, description):
    '''
    Input: Spark dataframe, description of Spark datafram
    Output: Print outcome of data quality check
    '''  
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0
quality_check(df_immigration, "immigration table")
quality_check(df_temp, "temperature table")

Data quality check passed for immigration table with 3096313 records
Data quality check passed for temperature table with 3490 records


0

#### 4.3 Data dictionary 

Fact Table - All info from the I94 immigration data joined with temperature data of the city on i94port

Columns:

    i94port = 3-character code of destination city
    arrdate = arrival date
    i94mode = single digit travel code
    depdate = departure date
    AverageTemperature = average temperature of destination city
    i94visa = reason for immigration
    i94yr = 4-digit year
    i94mon = numeric month
    i94cit = 3-digit code of origin city
    
 Dimension Table -  events from the I94 immigration data.

Columns:

    i94yr = 4-digit year
    i94mon = numeric month
    arrdate = arrival date
    i94mode = 1-digit travel code
    depdate = departure date
    i94visa = reason for immigration
    i94cit = 3-digit code of origin city
    i94port = 3-character code of destination city
 Dimension Table -  city temperature data.

Columns:

    i94port = 3-character code of destination city 
    AverageTemperature = average temperature
    City = city 
    Country = country 
    Latitude= latitude
    Longitude = longitude



#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
  Tools used were python pandas for data exploration and spark to build pipelines for big data processing to handle different file formats(SAS, csv, etc) and SQL joins for final database. 
* Propose how often the data should be updated and why.
  
  Data should be updated monthly as this is how the tables are set while demographics can be updated annually
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

 * The data was increased by 100x.
   * Use Spark to process the data efficiently utilizing distributed computing using EMR clusters.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
   * Use Airflow and create a DAG that outlines the logic of the pipeline and controls it.
 * The database needed to be accessed by 100+ people.
   * Use RedShift for data storage to be accessible by multiple individuals.