# Project Title
### Data Engineering Capstone Project

#### Project Summary
The goal of this project to build an ETL pipleline for the I94 Immigration, U.S. City Demographic and World Temperature data datasets with the result being an analytical database that can be use to extract insight. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Import Libraries

In [1]:
import os
import pandas as pd
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from sql_queries import *
from utilities import *
from etl_functions import *
from dotenv import load_dotenv

### Load secrets

In [2]:
load_dotenv("../.env")
POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
POSTGRES_DB = os.getenv("POSTGRES_DB")
POSTGRES_HOST = os.getenv("POSTGRES_HOST")

### Set pandas options

In [3]:
pd.set_option('display.max_columns', 30)

### Set Spark confirmation

In [4]:
conf = SparkConf(
).set("spark.driver.extraClassPath", "drivers/postgresql-42.2.18.jar"
).set("spark.executor.memory", "3g"
).set("spark.driver.extraJavaOptions", "-XX:+UseG1GC"
).set("spark.executor.extraJavaOptions", "-XX:+UseG1GC"
).set("spark.sql.autoBroadcastJoinThreshold","-1")
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data
### Scope 

The scope of this project is to uses Spark to:
* Load 'I94 Immigration', 'U.S. City Demographic' and 'World Temperature' datasets.
* Exploratory data of 'I94 Immigration', 'U.S. City Demographic' and 'World Temperature' datasets.
* Data cleaning to 'I94 Immigration', 'U.S. City Demographic' and 'World Temperature' datasets.
* Create dimension tables from 'I94 Immigration', 'U.S. City Demographic' and 'World Temperature' datasets.
* Create a fact table base on 'I94 Immigration'.

### End Goal

To build an analytical database that can be use to extract insight. 

### Tools:

Spark for data transformation and Docker with an image for Postgresql for storage.

### Datasets 
* I94 Immigration Data: This data comes from the US National Tourism and Trade Office. Each row describe a foreigner visting the U.S with facts about each visitors visa class, birth country, age, arrival date, ect. In this project we are only loading a subset of the data.

* U.S. City Demographic: This data comes from the US Census Bureau's 2015 American Community Survey. Each row describe a city with facts about its demographic like male/female population, foreign born population, median age, ect.

* World Temperature: dataset comes from Kaggle by a user named "Berkeley Earth". Each row describe a city with facts about its average temperature.

### Load I94 Immigration


In [5]:
df_immigration =spark.read.parquet('data/i94_data/part-00000-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet')

In [6]:
# show schema
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [7]:
# number of records
df_immigration.count()

219268

In [8]:
# show five records
df_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


#### Data description
* i94yr - 4 digit year
* i94mon - numeric month
* i94cit - 3 digit code describing country of birth
* i94res - 3 digit code describing country of residence
* i94port - 3 letter code describing port of admission
* arrdate - date of arrival to the US
* i94mode - 1 digit code describing methods of transportation (1 = air, 2 = sea, 3 = land, 9 = not reported)
* I94addr - 2 letter code describing which State of the visitor arrived at
* depdate - date of departure to the US
* i94bir - visitors age
* i94visa - purpose of travel
* count - used for summary statistics
* dtadfile - date of recording
* visapost - department of State where where visa was issued
* occup - occupation that will be performed in U.S
* entdepa - arrival flag (admitted or paroled into the U.S)
* entdepd - departure flag (departed, lost I-94 or is deceased)
* entdepu - update flag (either apprehended, overstayed, adjusted to perm residence)
* matflag - match flag (match of arrival and departure records) 
* biryeat - year of birth 
* dtaddto - allowed to stay until
* gender - visitors sex
* insnum - ins number
* airline - airline used to arrive in US
* admnum - admission number 
* fltni - flight number of Airline used to arrive in US
* visatype - US visa class

### Load World Temperature Data 

In [10]:
df_temperature = spark.read.parquet('data/global_temperature_data')

In [11]:
# show schema
df_temperature.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [12]:
# number of records
df_temperature.count()

8599212

In [13]:
# show five records
df_temperature.limit(5).show()

+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 03:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 03:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 03:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 03:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 03:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+



#### Data description
* dt - date of recording
* AverageTemperature - average temperature in °C
* AverageTemperatureUncertainty - the 95% confidence interval around the average
* City - city name
* Country - country name
* Latitude - latitude
* Longitude - longitude

### Load us cities demographics

In [14]:
df_demographics = spark.read.csv("data/us-cities-demographics.csv", inferSchema=True, header=True, sep=';')

In [15]:
# show schema
df_demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [16]:
# number of records
df_demographics.count()

2891

In [17]:
# show five records
df_demographics.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


#### Data description
* City - city name
* State - state name
* Median Age - median age of the city
* Male Population - male population of the city
* Female Population - female population of the city
* Total Population - total population of the city
* Number of Veterans - veterans population of the city
* Foreign-born - number of residents foreign born from the city in the city
* State Code - US state code 
* Race - respondent race
* Count - number of individual per race in the city

### Step 2: Explore and Assess the Data

#### Exploring I94 Immigration

In [18]:
display_missing_values(df_immigration)

Unnamed: 0,0,%
cicid,0,0.0
i94yr,0,0.0
i94mon,0,0.0
i94cit,0,0.0
i94res,0,0.0
i94port,0,0.0
arrdate,0,0.0
i94mode,1,0.000456
i94addr,7911,3.607914
depdate,7863,3.586023


#### Cleaning Steps I94 Immigration
* Columns 'occup', 'entdepu' and 'insnum' are useless because of the high amount of nulls thus must be dropped
* Assign each state a transportation_method_id from i94mode in I94_SAS_Labels_Descriptions.SAS
* Assign each row visa_category from i94visa in I94_SAS_Labels_Descriptions.SAS
* Cast i94cit and i94res to string

In [19]:
df_clean_immigration = i94_cleaning(df_immigration)

In [20]:
df_clean_immigration.limit(10).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype,transportation_method,visa_category
0,6.0,2016.0,4.0,692,692,XXX,20573.0,,,,37.0,2.0,1.0,,,T,,,1979.0,10282016,,,1897628000.0,,B2,not reported,pleasure
1,7.0,2016.0,4.0,254,276,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,G,,,1991.0,D/S,M,,3736796000.0,296.0,F1,air,student
2,15.0,2016.0,4.0,101,101,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,T,O,M,1961.0,09302016,M,OS,666643200.0,93.0,B2,air,pleasure
3,16.0,2016.0,4.0,101,101,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,O,O,M,1988.0,09302016,,AA,92468460000.0,199.0,B2,air,pleasure
4,17.0,2016.0,4.0,101,101,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,O,O,M,2012.0,09302016,,AA,92468460000.0,199.0,B2,air,pleasure
5,18.0,2016.0,4.0,101,101,NYC,20545.0,1.0,MI,20555.0,57.0,1.0,1.0,20160401.0,,O,O,M,1959.0,09302016,,AZ,92471040000.0,602.0,B1,air,business
6,19.0,2016.0,4.0,101,101,NYC,20545.0,1.0,NJ,20558.0,63.0,2.0,1.0,20160401.0,,O,K,M,1953.0,09302016,,AZ,92471400000.0,602.0,B2,air,pleasure
7,20.0,2016.0,4.0,101,101,NYC,20545.0,1.0,NJ,20558.0,57.0,2.0,1.0,20160401.0,,O,K,M,1959.0,09302016,,AZ,92471610000.0,602.0,B2,air,pleasure
8,21.0,2016.0,4.0,101,101,NYC,20545.0,1.0,NY,20553.0,46.0,2.0,1.0,20160401.0,,O,O,M,1970.0,09302016,,AZ,92470800000.0,602.0,B2,air,pleasure
9,22.0,2016.0,4.0,101,101,NYC,20545.0,1.0,NY,20562.0,48.0,1.0,1.0,20160401.0,,O,O,M,1968.0,09302016,,AZ,92478490000.0,608.0,B1,air,business


In [21]:
df_clean_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- transportation_method: str

#### Exploring World Temperature Data

In [22]:
display_missing_values(df_temperature)

Unnamed: 0,0,%
dt,0,0.0
AverageTemperature,364130,4.234458
AverageTemperatureUncertainty,364130,4.234458
City,0,0.0
Country,0,0.0
Latitude,0,0.0
Longitude,0,0.0


#### Cleaning Steps World Temperature Data
* Drop rows with missing average temperature
* Drop duplicate rows
* Assign each city a country code from I94_SAS_Labels_Descriptions.SAS

In [25]:
df_clean_temperature = df_clean_temperature.dropna(subset=['AverageTemperature'])

In [23]:
df_temperature = world_temperature_cleaning(df_temperature)

### Exploring Demographics 

In [25]:
display_missing_values(df_demographics)

Unnamed: 0,0,%
City,0,0.0
State,0,0.0
Median Age,0,0.0
Male Population,3,0.10377
Female Population,3,0.10377
Total Population,0,0.0
Number of Veterans,13,0.449671
Foreign-born,13,0.449671
Average Household Size,16,0.553442
State Code,0,0.0


#### Cleaning Steps Demographics
* Replace missing with the avg of the data set
* Drop rows


In [26]:
df_clean_demographics = demographics_cleaning(df_demographics)

In [29]:
def fill_null(df, columns):
    for column in columns:
        mean = df.agg(avg(column).alias(column)).first().asDict()
        df = df.fillna(mean)
    return df

def demographics_cleaning(df):
    columns =  ['Male Population','Female Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size']
    df = fill_null(df, columns)
    df = df.drop_duplicates(subset=['City','State'])
    return df

In [30]:
df_clean_demographics = df_demographics.drop_duplicates(subset=['City','State'])

In [27]:
df_clean_demographics.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Cincinnati,Ohio,32.7,143654,154883,298537,13699,16896,2.08,OH,White,162245
1,Kansas City,Kansas,33.4,74606,76655,151261,8139,25507,2.71,KS,Black or African-American,40177
2,Lynchburg,Virginia,28.7,38614,41198,79812,4322,4364,2.48,VA,Black or African-American,23271
3,Auburn,Washington,37.1,36837,39743,76580,5401,14842,2.73,WA,Asian,12341
4,Dayton,Ohio,32.8,66631,73966,140597,8465,7381,2.26,OH,Asian,1885


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

![Data Model](images/Data_Model.png "Data Model")

#### 3.2 Mapping Out Data Pipelines
1. Create the fact table
2. Create all dimensional tables
3. Load all datasets
4. World Temperature dataset
    1. Clean World Temperature
    2. Link World Temperature to a country code from I94_SAS_Labels_Descriptions.SAS
    3. Load World Temperature into Dim_country
5. Demographics dataset
    1. Clean Demographics
    2. Recalculated the Demographics fields to be by state instade of city
    3. Link Demographics to a state code from I94_SAS_Labels_Descriptions.SAS
    4. Load Demographics into Dim_state
6. I94 Immigration dataset
    1. Clean Immigration
    2. Load Immigration into Dim_airline, Dim_transportation, Dim_visa_category, Dim_allowed_stay, Dim_record_date, Dim_departure_date and Dim_arrival_date
    3. Load Immigration into Fact_immigration

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [3]:
create_tables(POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_DB, POSTGRES_HOST)

In [55]:
populate_tables(spark, df_clean_immigration, df_clean_demographics, df_clean_temperature, 
                POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_DB, POSTGRES_HOST)

#### 4.2 Data Quality Checks
Run Quality Checks

In [None]:
quality_checks(tables_names, POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_DB, POSTGRES_HOST)

#### 4.3 Data dictionary 

##### Fact_immigration 
* immigrant_id
* visa_category
* birth_country_code
* birth_year
* immigrant_age
* immigrant_gender
* residential_country_code
* admission_port
* arrival_state_code
* transportation_method_id
* arrival_date
* departure_date
* record_date
* arrival_status
* departure_status
* allowed_stay
* flight_number
* airline
* admission_number
* num_year 
* num_month

##### dim_country 
* country_code 
* country_name 
* average_temperature

##### dim_state 
* state_code
* state_name 
* median_age 
* male_population
* female_population
* total_population 
* veteran_population 
* foreign_born_population 
* average_hosehold_size 

##### dim_transportation
* transportation_method_id
* transportation_method

##### dim_visa_category
* visatype 
* purpose_of_travel

##### dim_arrival_date
* arrival_date
* arrival_date_year
* arrival_date_month 
* arrival_date_day 
* arrival_date_week 
* arrival_date_weekday

##### dim_departure_date
* departure_date
* departure_date_year
* departure_date_month 
* departure_date_day 
* departure_date_week 
* departure_date_weekday

##### dim_record_date
* record_date
* record_date_year
* record_date_month 
* record_date_day 
* record_date_week 
* record_date_weekday

##### dim_allowed_stay
* allowed_stay
* allowed_stay_year
* allowed_stay_month 
* allowed_stay_day 
* allowed_stay_week 
* allowed_stay_weekday


#### Step 5: Complete Project Write Up
* The rationale for the choice of tools and technologies for the project.
    * Apache spark:
        * It can handle large files in multiple formats
        * Sparks does analytics fast and efficient even at scale 
        * Easy to use
* Propose how often the data should be updated and why.
    * monthly, because I94 Immigration is partitioned by month.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     * Increase the number of spark nodes to handle the new demand   
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     * I would use an ETL scheduler like Apache airflow or Prefect to build pipelines. 
 * The database needed to be accessed by 100+ people.
     * Move our database to the cloud. service like redshift should do.