# Data Engineering Capstone Project

*Udacity Project for Data Engineer Nanodegree*

##  Project Summary
***
The objective of this project is to create an ETL pipeline using the data of I94 immigration data and city temperature data to create anaytic database 

The project follows the follow steps:

Step 1: Scope the Project and Gather Data
Step 2: Explore and Assess the Data
Step 3: Define the Data Model
Step 4: Run ETL to Model the Data
Step 5: Complete Project Write Up



In [1]:
import configparser
from datetime import datetime,timedelta
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
import psycopg2
import re
from collections import defaultdict
from pyspark.sql.functions import udf

## Step 1: Scope the Project and Gather Data

## Scope
This projects aims to perform exploratory data analysis on the US I94 immigration,demographics and temperature to create anaytic database.

## I94 Immigration Data
This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.


In [2]:
#Build spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").enableHiveSupport().getOrCreate()

In [3]:
# Read in the data
path='../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration1_df =spark.read.format("path").option("header","true").option("inferSchema","true").load(path)

In [4]:
# display the first ten records
immigration1_df.show(10)

+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|    _c0|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|2027561|4084316.0|2016.0|   4.0| 209.0| 209.0|    HHW|20566.0|    1.0|     HI|20573.0|  61.0|    2.0|  1.0|20160422|    null| null|      G|      O|   null|      M| 1955.0|07202016|     F|  null|     JL|5.6582674633E10|00782|      WT|
|2171295|4422636.0|2016.0|   4.0| 582.0| 582.0|    MCA|20567

### World Temperature Data

This dataset came from Kaggle. You can read more about it here.https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data


In [5]:
filename='../../data2/GlobalLandTemperaturesByCity.csv'
temperature_df =spark.read.format("filename").option("header","true").option("inferSchema","true").load(filename)

In [6]:
temperature_df.show(20)

+-------------------+-------------------+-----------------------------+-----+-------+--------+---------+
|                 dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+-------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01 00:00:00| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01 00:00:00|             10.644|           1.2

In [7]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname)

In [8]:
df_temp.head(20)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


In [9]:
temp_us = df_temp[df_temp["Country"] == "United States"]
temp_us.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


### U.S. City Demographic Data

This data comes from OpenSoft. You can read more about it here.

In [10]:
demog_df = pd.read_csv("./us-cities-demographics.csv", delimiter=";")

In [11]:
demog_df.head(20)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229.0,62432.0,118661,6634.0,7517.0,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712.0,41971.0,80683,4815.0,8355.0,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629.0,56860.0,108489,3800.0,37038.0,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762.0,43270.0,85032,5783.0,3269.0,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751.0,58077.0,109828,5204.0,16315.0,2.65,NC,Asian,11060


### Airport Code 
This is a simple table of airport codes and corresponding cities. It comes from here.

In [12]:
airport_df = pd.read_csv("airport-codes_csv.csv")

In [13]:
airport_df.head(20)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100.0,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810.0,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038.0,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87.0,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350.0,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


## Step 2: Explore and Assess the Data

### Explore the Data
Identify data quality issues, like missing values, duplicate data, etc

In [14]:
df_temp.describe()

Unnamed: 0,AverageTemperature,AverageTemperatureUncertainty
count,1988429.0,1988429.0
mean,17.70804,1.033803
std,9.987524,1.127493
min,-34.932,0.036
25%,11.898,0.345
50%,20.06,0.599
75%,25.608,1.353
max,39.156,15.396


In [15]:
temperature_df = spark.read.format("csv").option("header", "true").load(" .../../data2/GlobalLandTemperaturesByCity.csv")

In [16]:
temperature_df = temperature_df.filter(temperature_df.AverageTemperature != 'NaN')

In [19]:
temperature_df.show()

+-------------------+-------------------+-----------------------------+-----+-------+--------+---------+
|                 dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+-------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01 00:00:00| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01 00:00:00|             10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01 00:00:00| 14.050999999999998|                        1.347|Århus|Denmark|  57.05N|   10.33E|
|1744-07-01 00:00:00|             16.082|                        1.396|Århus|Denmark|  57.05N|   10.33E|
|1744-09-01 00:00:00| 12.780999999999999|                        1.454|Århus|Denmark|  57.05N|   10.33E|
|1744-10-01 00:00:00|               7.95|              

In [20]:
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    content = f.readlines()
content = [x.strip() for x in content]
ports = content[302:962]
splitted_ports = [port.split("=") for port in ports]
port_codes = [x[0].replace("'","").strip() for x in splitted_ports]
port_locations = [x[1].replace("'","").strip() for x in splitted_ports]
port_cities = [x.split(",")[0] for x in port_locations]
port_states = [x.split(",")[-1] for x in port_locations]
df_port_locations = pd.DataFrame({"port_code" : port_codes, "port_city": port_cities, "port_state": port_states})
df_port_locations.head(20)

Unnamed: 0,port_code,port_city,port_state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK
5,DTH,DUTCH HARBOR,AK
6,EGL,EAGLE,AK
7,FRB,FAIRBANKS,AK
8,HOM,HOMER,AK
9,HYD,HYDER,AK


In [26]:
path = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration_df = pd.read_sas(i94_path, 'sas7bdat', encoding="ISO-8859-1")​path = 'immigration_data_sample.csv'


In [28]:
print(f"First port in SAS file: {df_port_locations['port_city'].values[0]}, last port {df_port_locations['port_city'].values[-1]}")
irregular_ports_df = df_port_locations[df_port_locations["port_city"] == df_port_locations["port_state"]]
irregular_ports = list(set(irregular_ports_df["port_code"].values))
print(irregular_ports)

First port in SAS file: ALCAN, last port No PORT Code (OSN)
['CXO', 'FRG', '888', 'XXX', 'ISP', 'CHN', 'JMZ', 'PCW', 'SCH', 'OTS', 'OGS', 'AMT', 'CLX', 'GAC', 'PLB', 'UNK', 'AG', 'NK', 'NC8', 'WTR', 'W55', '5T6', 'PHF', 'X96', 'ZZZ', 'MAA', 'WA5', 'DEC', 'JFA', 'SUS', 'WAS', 'NYL', 'A2A', 'LIT', '74S', 'JSJ', 'Y62', 'ASI', 'FTB', 'BUS', 'MAP', 'BKF', 'BCM', 'CPX', 'MTH', 'EGE', 'JIG', 'VMB', 'T01', 'PHN', 'CP', 'DRV', '060', 'OLM', 'HRL', 'AUH', 'NGL', 'AKT', 'TIW', 'DAY', 'GPI', 'IAG', 'GMT', 'SP0', 'XNA', 'STN', 'X44', 'ATW', 'YGF', '.GA', 'JBQ', 'FSC', 'ADU', 'PFN', 'OSN', 'OAI', 'RYY']


In [31]:
# drop all irregular ports from i94 data
print(f"i94 data contains {len(immg_df)} rows before cleaning.")
immigration_df_filtered = immg_df[~immigration_df["i94port"].isin(irregular_ports)]
print(f"i94 data contains {len(immg_df_filtered)} rows after")
immigration_df_filtered.drop(columns=["insnum", "entdepu", "occup", "visapost"], inplace=True)
_filtered.dropna(inplace=True)
print(f"i94 data contains {len(immg_df_filtered)} rows after removing NaN values.")

i94 data contains 1000 rows before cleaning.
i94 data contains 973 rows after
i94 data contains 735 rows after removing NaN values.


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  errors=errors)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


In [32]:
airport_df.dropna(subset=['iata_code'], inplace=True)
airport_df.head(20)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
223,03N,small_airport,Utirik Airport,4.0,OC,MH,MH-UTI,Utirik Island,K03N,UTK,03N,"169.852005, 11.222"
440,07FA,small_airport,Ocean Reef Club Airport,8.0,,US,US-FL,Key Largo,07FA,OCA,07FA,"-80.274803161621, 25.325399398804"
594,0AK,small_airport,Pilot Station Airport,305.0,,US,US-AK,Pilot Station,,PQS,0AK,"-162.899994, 61.934601"
673,0CO2,small_airport,Crested Butte Airpark,8980.0,,US,US-CO,Crested Butte,0CO2,CSE,0CO2,"-106.928341, 38.851918"
1088,0TE7,small_airport,LBJ Ranch Airport,1515.0,,US,US-TX,Johnson City,0TE7,JCY,0TE7,"-98.62249755859999, 30.251800537100003"
1402,13MA,small_airport,Metropolitan Airport,418.0,,US,US-MA,Palmer,13MA,PMX,13MA,"-72.31140136719999, 42.223300933800004"
1438,13Z,seaplane_base,Loring Seaplane Base,0.0,,US,US-AK,Loring,13Z,WLR,13Z,"-131.636993408, 55.6012992859"
1555,16A,small_airport,Nunapitchuk Airport,12.0,,US,US-AK,Nunapitchuk,PPIT,NUP,16A,"-162.440454, 60.905591"
1574,16K,seaplane_base,Port Alice Seaplane Base,0.0,,US,US-AK,Port Alice,16K,PTC,16K,"-133.597, 55.803"
1722,19AK,small_airport,Icy Bay Airport,50.0,,US,US-AK,Icy Bay,19AK,ICY,19AK,"-141.662002563, 59.96900177"


## Step 3: Define the Data Model

### 3.1 Conceptual Data Model

The FACT TABLE from the I94 immigration data. The columns below will be extracted from the I94 immigration data :

cicid,i94yr	,i94mon	,i94cit,i94res,	i94port,arrdate,i94mode	,i94addr,depdate,i94bir	,i94visa,count,	dtadfile,	visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum	fltno	visatype


DIMENSION TABLE
-Temperature dataframe:
AverageTemperature 
City 
Country 
Latitude
Longitude 

-Airport dataframe
iata_code,name ,type ,local_code ,coordinates,city

-Demographics dataframe
city - state - media_age - male_population - female_population - total_population - num_veterans - foreign_born - average_household_size - state_code - race - count


### 3.2 Mapping Out Data Pipelines

The pipeline steps are described below:
Create tables by executing create_tables.py.
Join city to airports data.
Insert data.

## Step 4: Run Pipelines to Model the Data

### 4.1 Create the data model

Build the data pipelines to create the data model.

In [42]:

# Extract columns for temperature dimension table
temp_table = df_temp.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])

# Write temperature dimension table to parquet files partitioned by i94port
temp_table.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

In [43]:
for index, row in df_i94_filtered.iterrows():
    cur.execute(immigration_insert, list(row.values))
    conn.commit()

In [44]:
for index, row in df_demographics.iterrows():
    cur.execute(demographic_insert, list(row.values))
    conn.commit()

### 4.2 Data Quality Checks
The data quality check will ensure there are adequate number of entries in each table.

In [45]:
def quality_check(df, description):
    '''
    Input: Spark dataframe, description of Spark datafram

    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

# Perform data quality check
quality_check(immigration_df, "immigration table")
quality_check(df_temp, "temperature table")

Data quality check passed for immigration table with 3088544 records

### 4.3 Data dictionary
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

### Step 5: Complete Project Write Up

-Rationale for the choice of tools and technologies for the project
 Apache spark was used because of:
 it's ability to handle multiple file formats with large amounts of data.
 Apache Spark offers a lightning-fast unified analytics engine for big data.
 Spark has easy-to-use APIs for operating on large datasets
-Propose how often the data should be updated and why.
 The current I94 immigration data is updated monthly, and hence the data will be updated monthly.
-Write a description of how you would approach the problem differently under the following scenarios:
 The data was increased by 100x.
 Spark can handle the increase but we would consider increasing the number of nodes in our cluster.
 The data populates a dashboard that must be updated on a daily basis by 7am every day.
 In this scenario, Apache Airflow will be used to schedule and run data pipelines.
 The database needed to be accessed by 100+ people.
 In this scenario, we would move our analytics database into Amazon Redshift.

