# ETL Pipeline: Immigration Data and Temperature Data
### Data Engineering Capstone Project

#### Project Summary
In this project an ETL pipeline is created. Immagratin data and tempature data is used to setup a database. The database is optimized for queries to analize the impact of temperature on imagration.

The new dataset allows answering questions regarding immigration and temperature:
* Which are the top cities in the US where most immigrants enter?
* What month do most immigrants enter?
* Is there an increase or decrease of immigration over the years?
* Does temperature have an impact on immigration?

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
pd.set_option('display.max_columns', None)

import re

### Step 1: Scope the Project and Gather Data

#### Scope 

Dimension tables: 
- I94 Immigration Data will be aggregated by destination city. Additional Data is used to convert port codes to cities.
- World Temperature Data will be aggregated by city. 

These tables will be joined as a fact table and stored in a database to query on immigration events.

A star shema is used because of its simplicity. Fact table is placed in the middle and dimension tables arround it.

#### Describe and Gather Data 

- I94 Immigration Data: This dataset is from the US National Tourism and Trade Office. It stores the i94 forms filled when going to the united states.

- World Temperature Data: This dataset is from Kaggle. It stores the temperatures of larger cities from 1743 to 2013.

In [2]:
# Read in the data here
#fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
#df_immigration = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")
# reading the full dataset as described in the introduction is extremly slow therefore we only read the sample data
df_immigration = pd.read_csv('immigration_data_sample.csv')

In [3]:
df_immigration.shape

(1000, 29)

In [4]:
df_immigration.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [5]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = pd.read_csv(fname)

In [6]:
df_temperature.shape

(8599212, 7)

In [7]:
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [9]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

In [10]:
df_spark.count()

3096313

In [11]:
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

The sas7bdat has the same structure as the sample data, but way more rows (3096313 instead of 1000). Spark seems to be handling that data faster than pandas.

### Step 2: Explore and Assess the Data

#### I94 Immigration Data

##### Explore the Data 

In [12]:
df_immigration.describe()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,arrdate,i94mode,depdate,i94bir,i94visa,count,dtadfile,entdepu,biryear,insnum,admnum
count,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,951.0,1000.0,1000.0,1000.0,1000.0,0.0,1000.0,35.0,1000.0
mean,1542097.0,3040461.0,2016.0,4.0,302.928,298.262,20559.68,1.078,20575.037855,42.382,1.859,1.0,20160420.0,,1973.618,3826.857143,69372370000.0
std,915287.9,1799818.0,0.0,0.0,206.485285,202.12039,8.995027,0.485955,24.211234,17.903424,0.386353,0.0,49.51657,,17.903424,221.742583,23381340000.0
min,10925.0,13208.0,2016.0,4.0,103.0,103.0,20545.0,1.0,20547.0,1.0,1.0,1.0,20160400.0,,1923.0,3468.0,0.0
25%,721442.2,1412170.0,2016.0,4.0,135.0,131.0,20552.0,1.0,20561.0,30.75,2.0,1.0,20160410.0,,1961.0,3668.0,55993010000.0
50%,1494568.0,2941176.0,2016.0,4.0,213.0,213.0,20560.0,1.0,20570.0,42.0,2.0,1.0,20160420.0,,1974.0,3887.0,59314770000.0
75%,2360901.0,4694151.0,2016.0,4.0,438.0,438.0,20567.25,1.0,20580.0,55.0,2.0,1.0,20160420.0,,1985.25,3943.0,93436230000.0
max,3095749.0,6061994.0,2016.0,4.0,746.0,696.0,20574.0,9.0,20715.0,93.0,3.0,1.0,20160800.0,,2015.0,4686.0,95021510000.0


##### Cleaning Steps

- Remove rows where there is no reverence for i94port in I94_SAS_Labels_Descriptions.SAS file.

In [13]:
# Get ports and cities dictionaries from Labels file:
with open("./I94_SAS_Labels_Descriptions.SAS") as f:
    lines = f.readlines()
pattern = re.compile(r"\'(.*)\'.*\'([A-Z\-a-z]+)(.*)\'")
ports = {}
cities = {}
for target in lines[302:962]:
    res = pattern.search(target)
    ports[res.group(1)] = res.group(2)
    cities[res.group(2).title()] = res.group(1)

In [14]:
# Check for correct ports
df_immigration_clean = df_immigration[(df_immigration.i94port.isin(list(ports.keys())))]

In [15]:
df_immigration_clean.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


#### World Temperature Data

##### Explore the Data 

In [16]:
df_temperature.describe()

Unnamed: 0,AverageTemperature,AverageTemperatureUncertainty
count,8235082.0,8235082.0
mean,16.72743,1.028575
std,10.35344,1.129733
min,-42.704,0.034
25%,10.299,0.337
50%,18.831,0.591
75%,25.21,1.349
max,39.651,15.396


##### Cleaning Steps

- Remove rows where AverageTemperature is NaN
- Add the i94port of the location

In [17]:
# Remove rows where temperature is nan:
df_temperature = df_temperature[df_temperature['AverageTemperature'].notna()]

In [18]:
# Check for correct city:
df_temperature = df_temperature[(df_temperature.City.isin(list(cities.keys())))]

In [19]:
# add i94port
df_temperature['i94port'] = df_temperature.City.map(cities)

In [20]:
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,i94port
38481,1743-11-01,8.758,1.886,Aberdeen,United Kingdom,57.05N,1.48W,ABE
38486,1744-04-01,6.07,2.934,Aberdeen,United Kingdom,57.05N,1.48W,ABE
38487,1744-05-01,7.751,1.494,Aberdeen,United Kingdom,57.05N,1.48W,ABE
38488,1744-06-01,10.62,1.574,Aberdeen,United Kingdom,57.05N,1.48W,ABE
38489,1744-07-01,12.35,1.591,Aberdeen,United Kingdom,57.05N,1.48W,ABE


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Explaining the data dictionary:

I94 Immigration data as dimension table:

| name | data |
| --- | --- |
| i94yr | 4 digit year |
| i94mon | numeric month |
| i94cit | 3 digit code of origin city |
| i94port | 3 character code of destination city |
| arrdate | arrival date in the USA |
| depdate | departure date from the USA |
| i94visa | reason for immigration |

Temperature Data as dimension table:

| name | data |
| --- | --- |
| AverageTemperature | average temperature |
| City | city name |
| Country | country name |
| i94port | 3 character code of destination city |

I94 Immigration data joined with the Temperature Data on i94port as fact table:

| name | data |
| --- | --- |
| i94yr | 4 digit year |
| i94mon | numeric month |
| i94cit | 3 digit code of origin city |
| i94port | 3 character code of destination USA city |
| arrdate | arrival date in the USA |
| depdate | departure date from the USA |
| i94visa | reason for immigration |
| AverageTemperature | average temperature of destination city |
| City | city name |
| Country | country name |

#### 3.2 Mapping Out Data Pipelines


* For the I94 Immigration data is switched to the full dataset as spark.
* I94 Immigration data is cleaned as bevore. (Only cleaned by i94ports)
* Temperature Data is already cleaned previously. (Remove NaN values, cities without port and added i94port.)
* Temperature Data is converted to spark dataset.
* Create I94 Immigration Data dimension table and store.
* Create Temperature Data dimension table and store.
* Create fact table.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [21]:
output_path = "./tables"

In [22]:
# We use the spark version of the dataset, not the sample
df_immigration = df_spark

In [23]:
# Remove rows with wrong ports
df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(ports.keys())))

In [24]:
# Only use columns as defined in our dictionary
table_immigration = df_immigration.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "depdate", "i94visa"])

In [25]:
table_immigration.head()

Row(i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94port='LOS', arrdate=20574.0, depdate=20582.0, i94visa=1.0)

In [26]:
# Store I94 Immigration Data dimension table
table_immigration.write.mode("append").partitionBy("i94port").parquet(f'{output_path}/dimension_immigration', mode='overwrite')

In [27]:
# Only use columns as defined in our dictionary
table_temperature = df_temperature[["AverageTemperature", "City", "Country", "i94port"]]

In [28]:
# convert Temperature Data from Pandas to Spark
table_temperature_spark = spark.createDataFrame(table_temperature);

In [29]:
# Use Temperature Data as Spark version
table_temperature = table_temperature_spark

In [30]:
table_temperature.head()

Row(AverageTemperature=8.758, City='Aberdeen', Country='United Kingdom', i94port='ABE')

In [31]:
# Store Temperature Data dimension table 
table_temperature.write.mode("append").partitionBy("i94port").parquet(f'{output_path}/dimension_temperature', mode='overwrite')

In [34]:
# Create Immigration-Temperature Data fact table
table_fact = table_immigration.join(table_temperature, table_immigration.i94port == table_temperature.i94port).drop(table_temperature.i94port)

In [35]:
table_fact.head()

Row(i94yr=2016.0, i94mon=4.0, i94cit=582.0, i94port='ABQ', arrdate=20572.0, depdate=20642.0, i94visa=3.0, AverageTemperature=-3.42, City='Albuquerque', Country='United States')

In [37]:
# Store Immigration-Temperature Data fact table
table_fact.write.mode("append").partitionBy("i94port").parquet(f'{output_path}/facts_immigration', mode='overwrite')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [40]:
def quality_check_values (df, description):
    '''Checks if there is any data at all.'''
    
    result = df.count()
    
    if (result < 1):
        raise ValueError("Data quality check failed: {} has no records".format(description))
        print("Data quality check failed: {} has no records".format(description))        

In [41]:
# Do Data Quality Checks
quality_check_values(df_immigration, "immigration data")
quality_check_values(df_temperature, "temperature data")

In [42]:
def quality_check_null_values (df, column, description):
    '''Checks if there are null values.'''
    
    result = df.where(col(column).isNull()).count()
    
    if (result > 1):
        raise ValueError("Data quality check failed: {} has null values".format(description))
        print("Data quality check failed: {} null values".format(description))

In [43]:
# Do more Data Quality Checks
quality_check_null_values(df_temperature, "AverageTemperature", "temperature data AverageTemperature")

#### 5 Complete Project Write Up

__The data was increased by 100x.__ - what will we do, increase node or partition the data etc

The Immigration data is the largest table with about 3 million rows. In contrary to Pandas Spark is designed to handle such large datasets. I would recommend S3 and Spark if the data is increased 100x. 

__The pipelines would be run on a daily basis by 7 am every day.__ - Airflow yes, but why airflow

The immigration data is updated monthly, the other data less frequently. If a more frequent update is needed Airflow would be good solution to schedul and run the described pipeline as a DAG.

__The database needed to be accessed by 100+ people.__

It is recommended to store the data on a AWS redshift cluster. PostgreSQL, AWS Aurora or Apache Cassandra would be possible Database solutions.
