# Project Title
### Data Engineering Capstone Project

#### Project Summary
The objective of this project was to create an ETL pipeline for I94 immigration, the main dataset includes data on immigration to the United States, and supplementary datasets will include data on airport codes, U.S. city demographics, and temperature data. 
The data model will be the base of an analytical database related to immigration events to find immigration patterns to the United States.


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data

In [31]:
# Do all imports and installs here
import pandas as pd
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, trim, col, substring,countDistinct


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
To create the analytics database, we have performed the following steps

1. Use Spark to load the data into dataframes (csv files and sas file)
2. Explore and Assess the Data
    - explore and clean `U.S. City Demographic` Data
    - explore and clean `Airport Code Table` Data
    - explore and clean `I94 Immigration` Data
    - explore and clean `World Temperature` Data
    - explore and clean `country` Data (new csv file attached)
    
3. Define the Data Model
    Model definition to create fact and dimension tables.
4. Run ETL to Model the Data
    - create Dimension tables
        - Create immigration calendar, this table comes from I94 immigration dataset (arrival and departure dates)
        - Create immigration visatype, this table comes from I94 immigration dataset (visatype)
        - Create US_city_demographic, this table comes from `U.S. City Demographic` Data
        - create immigration country, this table has temperature and country name data and it is generated with `I94 Immigration` and `World Temperature` Data.
        - create US_airport, Airport information for United States.
    - create fact tables
        - create immigration_fact, this table has relation to the dimension tables to be analyzed

The technology used in this project [Apache Spark](https://spark.apache.org/) and the output are [parquet files](https://databricks.com/glossary/what-is-parquet#:~:text=Back%20to%20glossary,like%20CSV%20or%20TSV%20files.) which are useful to be scaled and load to AWS S3. The staging tables will be defined as Spark data frames.

The whole project has been implemented on this notebook, it reads and creates local files.

#### Describe and Gather Data 



---
- **U.S. City Demographic Data:** This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).


In [4]:
# Read Demographic Data
demographic_file_name = "us-cities-demographics.csv"
demographic_df = spark.read.format("csv").option("header", "true").option("delimiter", ";").load(demographic_file_name)
demographic_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


---
- **Airport Code Table:** This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).

In [5]:
# Read Demographic Data
airport_file_name = "airport-codes_csv.csv"
airport_df = spark.read.format("csv").option("header", "true").load(airport_file_name)
airport_df.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


---
- **I94 Immigration Data:** This data comes from the US National Tourism and Trade Office.[This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. 
The International Visitor Arrivals Program provides the U.S. government and the public with the official monthly and annual overseas visitor arrivals to the United States. For more information, visit: [Visitor Arrivals Program (I-94 Data)](https://travel.trade.gov/research/programs/i94/index.asp#:~:text=The%20International%20Visitor%20Arrivals%20Program,arrivals%20to%20the%20United%20States.&text=Subscribers%20include%3A%20airlines%2C%20airports%2C,and%20state%20governments%20and%20consultants)

In [6]:
# read I94 Immigration data
I94_immigration_file_name = "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
I94_immigration_df = spark.read.format("com.github.saurfang.sas.spark").load(I94_immigration_file_name)
I94_immigration_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


---
- **World Temperature Data:** This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

In [7]:
# Read World Temperature Data
world_temperature_file_name = "../../data2/GlobalLandTemperaturesByCity.csv"
temperature_df = spark.read.format("csv").option("header", "true").load(world_temperature_file_name)
temperature_df.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


---
- **country Data:** This data comes from the US National Tourism and Trade Office.[This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. 
It only includes country name description

In [8]:
# Read country data
country_file_name = "country_data.csv"
country_df = spark.read.format("csv").option("header", "true").option("delimiter", "|").load(country_file_name)
country_df.limit(5).toPandas()

Unnamed: 0,country_code,country_name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.



---
##### 1. U.S. City Demographic Data
- Exploratore the data: 

In [9]:
demographic_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



**data dictionary**

|column name|description|
|----|---|
|City| City name|
|State| Satate name|
|Median Age| Population median age|
|Male Population| count of male population|
|Female Population|count of female population|
|Total Population|count of total population|
|Number of Veterans| count of veterans|
|Foreign-born| count of residents in the city that were born outsite of the city|
|Average Household Size|average household in the city|
|State Code| code of the US state|
|Race| race data|
|Count| count of population by race|

In [10]:
print ('Data description of U.S. City Demographic Data')
demographic_df.describe().toPandas()

Data description of U.S. City Demographic Data


Unnamed: 0,summary,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,count,2891,2891,2891.0,2888.0,2888.0,2891.0,2878.0,2878.0,2875.0,2891,2891,2891.0
1,mean,,,35.49488066413016,97328.4262465374,101769.6308864266,198966.77931511588,9367.832522585128,40653.59867963864,2.742542608695655,,,48963.77447250087
2,stddev,,,4.401616730099886,216299.93692873296,231564.5725714828,447555.9296335903,13211.21992386408,155749.1036650984,0.4332910878973046,,,144385.58856460615
3,min,Abilene,Alabama,22.9,100135.0,100260.0,100247.0,10001.0,10024.0,2.0,AK,American Indian and Alaska Native,100055.0
4,max,Yuma,Wisconsin,70.5,99967.0,99430.0,99897.0,9988.0,9929.0,4.98,WI,White,99948.0


- **Clean the data:**
remove duplicates for the subset: `['City', 'state','State Code', 'Race']` and drop data that doesn't have data related to: `['Number of Veterans', 'Foreign-born', 'Average Household Size']`

In [11]:
demographic_df = demographic_df.dropna(subset=['Number of Veterans', 'Foreign-born', 'Average Household Size'])
demographic_cleaned_df = demographic_df.drop_duplicates(subset=['City', 'state','State Code', 'Race'])
demographic_cleaned_df.describe().toPandas()

Unnamed: 0,summary,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,count,2875,2875,2875.0,2875.0,2875.0,2875.0,2875.0,2875.0,2875.0,2875,2875,2875.0
1,mean,,,35.43467826086957,97445.02365217391,101846.92139130436,199291.94504347825,9361.714434782609,40691.81043478261,2.742542608695653,,,48863.79060869565
2,stddev,,,4.250501324453714,216757.16015572677,232051.3382982911,448714.3704256733,13216.7544736134,155825.87501034045,0.4332910878973047,,,144631.49939191734
3,min,Abilene,Alabama,22.9,100135.0,100260.0,100247.0,10001.0,10024.0,2.0,AK,American Indian and Alaska Native,100055.0
4,max,Yuma,Wisconsin,48.8,99967.0,99430.0,99897.0,9988.0,9929.0,4.98,WI,White,99948.0


---
##### 2. Airport Code Table
- Exploratore the data: 

In [12]:
airport_df.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



**data dictionary**

|column name|description|
|---|---|
|ident|ident code|
|type| type of airport|
|name| name of the airport|
|elevation_ft| elvation in ft|
|continent| continent|
|iso_country| country reference|
|iso_region| country-state|
|municipality| city|
|gps_code| gps code|
|iata_code|IATA airport code|
|local_code| local code|
|coordinates|latitud and longitud|

In [13]:
# Performing cleaning tasks here
airport_df.count()

55075

In [14]:
print ('Data description of Airport Code Table')
airport_df.describe().toPandas()

Data description of Airport Code Table


Unnamed: 0,summary,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,count,55075,55075,55075,48069.0,55075,55075,55075,49399,41030,9189,28686,55075
1,mean,2.3873375337777779E8,,,1240.7896773388254,,,,,2.1920446610204083E8,0.0,8.580556178571428E7,
2,stddev,9.492375382267495E8,,,1602.3634593484142,,,,,9.1123224377024E8,0.0,5.747026415216715E8,
3,min,00A,balloonport,"""""""Der Dingel"""" Airfield""",-1.0,AF,AD,AD-04,'S Gravenvoeren,0000,-,-,"-0.004722000099718571, 9.425000190734863"
4,max,spgl,small_airport,Çá¸¾á¸á¸ á¸®á¸Ç{+91-9680118734} GiRLFRieNd...,999.0,SA,ZZ,ZZ-U-A,Å½ocene,ZYYY,ZZV,ZZV,"99.9555969238, 8.47115039825"


- **Clean the data:**
Drop data that doesn't have data related to: `municipality or local_code` 

In [15]:
airport_cleaned_df = airport_df.dropna(subset=['municipality','local_code'])
airport_cleaned_df.describe().toPandas()

Unnamed: 0,summary,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,count,28017,28017,28017,27630.0,28017,28017,28017,28017,24545,2936,28017,28017
1,mean,2.4983764688372093E8,,,1218.8138255519364,,,,,2.2853231572340426E8,,8.649200628E7,
2,stddev,9.701128365731996E8,,,1568.7057415076556,,,,,9.296601891065612E8,,5.769634728494486E8,
3,min,00A,balloonport,"""Agriturismo """"Podere Santa Apollonia""""""",-1.0,AF,AE,AE-DU,AbaetÃ©,00A,AAF,00A,"-0.412269, 47.580399"
4,max,crt2,small_airport,Å umvald UL,999.0,SA,ZW,ZW-MV,Å umvald,ZNC,ZZV,ZZV,"99.40945, 56.088096"


---
##### 3. I94 Immigration Data
- Exploratore the data: 

In [16]:
I94_immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

**data dictionary**

|column name|description|
|---|---|
|cicid| identity|
|i94yr| 4 digit year|
|i94mon|Numeric month|
|i94cit| Country code |
|i94res| Country code |
|i94port| airport Code|
|arrdate| is the Arrival Date in the USA. It is a SAS date numeric field that a permament format has not been applied. |
|i94mode| 1 = 'Air', 2 = 'Sea', 3 = 'Land', 9 = 'Not reported' |
|i94addr| U.S. states Abbreviations |
|depdate| is the Departure Date from the USA. It is a SAS date numeric field that a permament format has not been applied.|
|i94bir| Age of Respondent in Years |
|i94visa| Visa codes collapsed into three categories:  1 = Business, 2 = Pleasure, 3 = Student|
|count| count  of persons|
|dtadfile| Character Date Field - Date added to I-94 Files - CIC does not use|
|visapost|  Department of State where where Visa was issued - CIC does not use|
|occup| Occupation that will be performed in U.S. - CIC does not use |
|entdepa| Arrival Flag - admitted or paroled into the U.S. - CIC does not use |
|entdepd| Departure Flag - Departed, lost I-94 or is deceased - CIC does not use|
|entdepu| Update Flag - Either apprehended, overstayed, adjusted to perm residence - CIC does not use|
|matflag| Match flag - Match of arrival and departure records |
|biryear| 4 digit year of birth |
|dtaddto| Character Date Field - Date to which admitted to U.S. (allowed to stay until) - CIC does not use|
|gender| Non-immigrant sex |
|insnum| INS number |
|airline| Airline used to arrive in U.S.|
|admnum| Admission Number|
|fltno| Flight number of Airline used to arrive in U.S.|
|visatype| Class of admission legally admitting the non-immigrant to temporarily stay in U.S.|

In [17]:
# Performing cleaning tasks here
I94_immigration_df.count()

3096313

In [18]:
I94_immigration_df.describe().toPandas()

Unnamed: 0,summary,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,count,3096313.0,3096313.0,3096313.0,3096313.0,3096313.0,3096313,3096313.0,3096074.0,2943721,...,392,2957884,3095511.0,3095836,2682044,113708,3012686,3096313.0,3076764,3096313
1,mean,3078651.879075533,2016.0,4.0,304.9069344733559,303.28381949757664,,20559.84854179794,1.0736897761487614,51.652482269503544,...,,,1974.2323855415148,8291120.333841449,,4131.050016327899,59.477601493233784,70828850110.90295,1360.2463696420555,
2,stddev,1763278.099749858,4.2828296132610967e-14,0.0,210.02688853063327,208.5832129278886,,8.777339474881552,0.5158963131657236,42.979062313709846,...,,,17.420260534588262,1656502.4244925014,,8821.743471773656,172.63339952061747,22154415947.557632,5852.676345633782,
3,min,6.0,2016.0,4.0,101.0,101.0,5KE,20545.0,1.0,..,...,U,M,1902.0,/ 183D,F,0,*FF,0.0,00000,B1
4,max,6102785.0,2016.0,4.0,999.0,760.0,YSL,20574.0,9.0,ZU,...,Y,M,2019.0,D/S,X,YM0167,ZZ,99915565930.0,ZZZ,WT


- **Clean the data:**
No data to modify (it will be present in the fact table)

---
##### 4. Temperature Data
- **Exploratore the data:**

In [19]:
temperature_df.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



**data dictionary**

|column name|description|
|----|---|
|dt| Date: starts in 1750 for average land temperature and 1850 for max and min land temperatures and global ocean and land temperatures|
|AverageTemperature|global average temperature in celsius| 
|AverageTemperatureUncertainty|the 95% confidence interval around the average|
|City| City name|
|Country| Country name|
|Latitude| Latitude information|
|Longitude| Longitude information|

In [20]:
print ('Data description of Temperature Data')
temperature_df.describe().toPandas()

Data description of Temperature Data


Unnamed: 0,summary,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,count,8599212,8235082.0,8235082.0,8599212,8599212,8599212,8599212
1,mean,,16.727432636250835,1.0285747414536532,,,,
2,stddev,,10.353442482534422,1.1297332887133706,,,,
3,min,1743-11-01,-0.0009999999999994,0.034,A Coruña,Afghanistan,0.80N,0.00W
4,max,2013-09-01,9.999,9.998,Ürümqi,Zimbabwe,8.84S,99.91E


- **Clean the data:**
remove duplicates for the subset: `['dt', 'City', 'Country']` and drop data that doesn't count any AverageTemperature

In [21]:
temperature_df = temperature_df.dropna(subset=['AverageTemperature'])
temperature_cleaned_df = temperature_df.drop_duplicates(subset=['dt', 'City', 'Country'])
temperature_cleaned_df.describe().toPandas()

Unnamed: 0,summary,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,count,8190783,8190783.0,8190783.0,8190783,8190783,8190783,8190783
1,mean,,16.749488549507422,1.0294152102430236,,,,
2,stddev,,10.346850324690337,1.1308965732638567,,,,
3,min,1743-11-01,-0.0009999999999994,0.034,A Coruña,Afghanistan,0.80N,0.00W
4,max,2013-09-01,9.999,9.998,Ürümqi,Zimbabwe,8.84S,99.91E


---
##### 4. country data
- Exploratore the data: 

In [23]:
country_df.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- country_name: string (nullable = true)



**data dictionary**

|column name|description|
|---|---|
|country_code| country code (related to i94res)|
|country_name| country name|

In [24]:
country_df.describe().toPandas()

Unnamed: 0,summary,country_code,country_name
0,count,289.0,289
1,mean,389.2456747404845,
2,stddev,210.2035352697659,
3,min,0.0,AFGHANISTAN
4,max,999.0,ZIMBABWE


- **Clean the data:**
No data to modify (it will be present in the fact table)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

![data](images/datamodel.png)


The fact table is based on immigration data `immigration_fact` table; we keep this dataset's granularity (no roll-up operation performed). 

The `calendar`  dimension table is data transformed of the date events; it will be useful to classify the data.

The US_airport table will describe `i94port` (airport code) with the airport-related data to the location and name. 

The `country` dimension table contains global temperature by country; it is also related to country code in the `immigration_fact` table.

The `US_city_demographic` dimension table has been performed a roll-up operation to get the race data and obtain the data at the city level (using state_code and city as table keys). We can even perform another roll-up (state level) to check the data against the immigration fact on this table.  The data dictionary has detailed information about each column.

#### 3.2 Mapping Out Data Pipelines
The pipeline has the following steps:
the step 1 and 2 work on the staging data frames.
1. Load data into spark dataframes
    - load `U.S. City Demographic` Data
    - load `Airport Code Table` Data
    - load `I94 Immigration` Data
    - load `World Temperature` Data
    - load `country` Data (new csv file attached)
2. clean data (check duplicates and missing values)
    - clean `U.S. City Demographic` Data
    - clean `Airport Code Table` Data
    - clean `I94 Immigration` Data
    - clean `World Temperature` Data
    - clean `country` Data (new csv file attached)   
3. create Dimension tables
    - Create immigration calendar table, this table comes from I94 immigration dataset (arrival and departure dates)
    - Create immigration visatype table, this table comes from I94 immigration dataset (visatype)
    - Create US_city_demographic table, this table comes from `U.S. City Demographic` Data
    - create immigration country table , this table has temperature and country name data and it is generated with `I94 Immigration` and `World Temperature` Data.
    - create US_airport table, Airport information for United States.
4. create fact tables
    - create immigration_fact table, this table has relation to the dimension tables to be analyzed
5. check data
    - perform quality check operation 

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [26]:
# Create views to use Spark SQL 
I94_immigration_df.createOrReplaceTempView("I94_immigration_view")
demographic_cleaned_df.createOrReplaceTempView("demographic_cleaned_view")
temperature_cleaned_df.createOrReplaceTempView("temperature_cleaned_view")
airport_cleaned_df.createOrReplaceTempView("airport_cleaned_view")
country_df.createOrReplaceTempView("country_view")

In [27]:
#define directory to output the data
output_path = "tables/"

In [28]:
def create_table (sql_script, output_path, prefix, keys, partition_columns =[], mode="overwrite"):
    """
    description: generate parquet files on the directory generated
    :param df: spark data frame to save
    :param output_path: rooth path to use
    :param prefix: specific folder to save the data related to the data frame to save
    :param partitionBy: columns to partition the parquet file
    :param mode: one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore' save mode (it is 'error' by default)
    """
    #generate new spark dataframe, and validate that there is not duplicates related to the key
    df = spark.sql(sql_script).drop_duplicates(subset=keys)
    df.write.parquet(os.path.join(output_path, prefix) , partitionBy=partition_columns, mode="overwrite")
    return df
    

--- 
create the **calendar** dimension table

In [29]:
# script for table definition for time dimension table
sql_calendar_table = """
    SELECT  
        distinct 
        date_add(to_date('1960-01-01'),arrdate) as event_date,
        year(date_add(to_date('1960-01-01'),arrdate)) as event_year,
        month(date_add(to_date('1960-01-01'),arrdate)) as event_month,
        day(date_add(to_date('1960-01-01'),arrdate)) as event_day,
        weekofyear(date_add(to_date('1960-01-01'),arrdate)) as event_week_of_year,
        dayOfWeek(date_add(to_date('1960-01-01'),arrdate)) as event_day_of_week
    FROM I94_immigration_view
    WHERE arrdate IS NOT NULL 
    UNION 
        SELECT  
        distinct 
        date_add(to_date('1960-01-01'),depdate) as event_date,
        year(date_add(to_date('1960-01-01'),depdate)) as event_year,
        month(date_add(to_date('1960-01-01'),depdate)) as event_month,
        day(date_add(to_date('1960-01-01'),depdate)) as event_day,
        weekofyear(date_add(to_date('1960-01-01'),depdate)) as event_week_of_year,
        dayOfWeek(date_add(to_date('1960-01-01'),depdate)) as event_day_of_week
    FROM I94_immigration_view
    WHERE depdate IS NOT NULL 
"""


In [32]:
# create table and show sample data for time dimension table
calendar_dimension_df = create_table(sql_calendar_table, output_path, 'calendar', ['event_date'],['event_year','event_month'])
calendar_dimension_df.limit(5).toPandas()

Unnamed: 0,event_date,event_year,event_month,event_day,event_week_of_year,event_day_of_week
0,2016-03-01,2016,3,1,9,3
1,2016-04-25,2016,4,25,17,2
2,2016-08-31,2016,8,31,35,4
3,2016-05-03,2016,5,3,18,3
4,2016-08-15,2016,8,15,33,2


--- 
create the **visatype** dimension table

In [33]:
# sql script table definition for visatype dimension table
sql_visatype_table = """
    SELECT  
        row_number() OVER (ORDER BY visatype asc) as visatype_id,
        visatype
    FROM I94_immigration_view
    group by visatype
"""

In [34]:
# create table and show sample data for visatype dimension table
visatype_dimension_df = create_table(sql_visatype_table, output_path, 'visatype', ['visatype_id'])
visatype_dimension_df.limit(5).toPandas()

Unnamed: 0,visatype_id,visatype
0,1,B1
1,2,B2
2,3,CP
3,4,CPL
4,5,E1


--- 
create the **US_city_demographic** dimension table

In [35]:
df1 = spark.sql("""
    SELECT  
        distinct Race
    FROM demographic_cleaned_view

""")
df1.toPandas()

Unnamed: 0,Race
0,Black or African-American
1,Hispanic or Latino
2,White
3,Asian
4,American Indian and Alaska Native


In [36]:
# sql script table definition for US_city_demographic dimension table
sql_US_city_demographic_table ="""
    SELECT 
        state_code,
        city,
        state,
        median_age,
        male_population,
        female_population,
        total_population,
        number_veterans,
        foreign_born,
        average_household_size,
        `Black or African-American` as race_black_or_african_american,
        `Hispanic or Latino` as race_hispanic_or_latino,
        `White` as race_white,
        `Asian` as race_asian,
        `American Indian and Alaska Native` as race_america_indian_and_alaska_native
        
    FROM 
    (
        SELECT  
            `State Code` as state_code,
            `City` as city,
            `State` as state,
            `Median Age` as median_age,
            `Male Population` as male_population,
            `Female Population` as female_population,
            `Total Population` as total_population,
            `Number of Veterans` as number_veterans ,
            `Foreign-born` as foreign_born,
            `Average Household Size` as average_household_size,
            `Race` as race,
            `Count` as count
        FROM demographic_cleaned_view
    )
    PIVOT(
    sum (count) for race in ('Black or African-American',
                            'Hispanic or Latino',
                            'White',
                            'Asian',
                            'American Indian and Alaska Native')
    )
"""

In [37]:
# create table and show sample data for US_city_demographic dimension table
US_city_demographic_df = create_table(sql_US_city_demographic_table, output_path, 'US_city_demographic',['state_code', 'city' ])
US_city_demographic_df.limit(5).toPandas()

Unnamed: 0,state_code,city,state,median_age,male_population,female_population,total_population,number_veterans,foreign_born,average_household_size,race_black_or_african_american,race_hispanic_or_latino,race_white,race_asian,race_america_indian_and_alaska_native
0,IA,Ames,Iowa,23.0,33814,31238,65052,2265,8606,2.16,1103.0,2024.0,56157.0,8979.0,
1,IN,Indianapolis,Indiana,34.1,410615,437808,848423,42186,72456,2.53,253932.0,83426.0,553665.0,29307.0,8656.0
2,LA,Shreveport,Louisiana,35.2,93138,103856,196994,14287,5658,2.53,112923.0,5081.0,79319.0,4033.0,1647.0
3,MO,Kansas City,Missouri,35.9,228430,246931,475361,24710,37787,2.35,147739.0,46037.0,296623.0,17061.0,5796.0
4,NC,Fayetteville,North Carolina,30.7,101051,100914,201965,28089,12863,2.5,90625.0,25080.0,102075.0,8949.0,6603.0


--- 
create the **country** dimension table

In [48]:
# sql script table definition for country dimension table
sql_country_table ="""
    SELECT 
        cv.country_code,
        cv.country_name,
        avg(AverageTemperature) AS AverageTemperature,
        AVG(AverageTemperatureUncertainty) AS AverageTemperatureUncertainty
    FROM country_view cv
    LEFT OUTER JOIN temperature_cleaned_view t ON (upper(t.Country) = cv.country_name)
    GROUP BY cv.country_code,
        cv.country_name
   
"""

In [49]:
# create table and show sample data for country dimension table
country_dimension_df = create_table(sql_country_table, output_path, 'country',['country_code'])
country_dimension_df.limit(5).toPandas()

Unnamed: 0,country_code,country_name,AverageTemperature,AverageTemperatureUncertainty
0,296,UNITED ARAB EMIRATES,26.572681,0.838493
1,467,COOK ISLANDS,,
2,691,COLOMBIA,22.683264,0.71384
3,944,No Country Code (944),,
4,124,NORWAY,3.612553,1.741349


--- 
create the **airport** dimension table

In [50]:
# sql script table definition for US airport dimension table
sql_airport_table ="""
    SELECT 
         local_code  as i94port
        ,type 
        ,name 
        ,elevation_ft 
        ,substring(iso_region,4,2) as state_code
        ,municipality  as city
        ,gps_code 
        ,coordinates 
    FROM airport_cleaned_view 
    WHERE iso_country  ='US'
    and local_code is not null 
   
"""

In [51]:
# create table and show sample data for airport dimension table
US_airport_dimension_df = create_table(sql_airport_table, output_path, 'US_airport', ['i94port'])
US_airport_dimension_df.limit(5).toPandas()

Unnamed: 0,i94port,type,name,elevation_ft,state_code,city,gps_code,coordinates
0,06VA,small_airport,Mount Horeb Field,1160,VA,Grottoes,06VA,"-78.85530090332031, 38.249000549316406"
1,08C,small_airport,Riverview Airport,603,MI,Jenison,K08C,"-85.80500030517578, 42.9359016418457"
2,0LA0,heliport,West Hackberry Heliport,10,LA,Hackberry,0LA0,"-93.40019989013672, 30.008499145507812"
3,0MD6,small_airport,Walters Airport,750,MD,Mount Airy,0MD6,"-77.10579681396484, 39.38119888305664"
4,0OH7,small_airport,Apple Airport,1000,OH,Piqua,0OH7,"-84.1718978881836, 40.1432991027832"


--- 
create the **Immigration** fact table

In [52]:
# create views for the dimention tables
visatype_dimension_df.createOrReplaceTempView("visatype_dimension_view")

In [53]:
# sql script table definition for immigration fact table

sql_I94_immigration_fact_table = """
    SELECT  
        im.cicid as record_id,
        im.i94cit,
        im.i94res as country_code,
        im.i94port,
        date_add(to_date('1960-01-01'),im.arrdate) as arrival_date,
        im.i94mode,
        im.i94addr as state_code,
        date_add(to_date('1960-01-01'),im.depdate) as departure_date,
        im.i94bir,
        im.i94visa,
        im.count,
        im.dtadfile,
        im.visapost,
        im.occup,
        im.entdepa,
        im.entdepd,
        im.entdepu,
        im.matflag,
        im.biryear,
        im.dtaddto,
        im.gender,
        im.insnum,
        im.airline,
        im.admnum,
        im.fltno,
        vt.visatype_id
    FROM I94_immigration_view im
    LEFT OUTER JOIN visatype_dimension_view vt ON (vt.visatype = im.visatype)
    
"""

In [54]:
# create table and show sample data for immigration fact  table
immigration_fact_df = create_table(sql_I94_immigration_fact_table, output_path, 'immigration_fact',['record_id'])
immigration_fact_df.limit(5).toPandas()

Unnamed: 0,record_id,i94cit,country_code,i94port,arrival_date,i94mode,state_code,departure_date,i94bir,i94visa,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype_id
0,299.0,103.0,103.0,NYC,2016-04-01,1.0,NY,2016-04-06,54.0,2.0,...,,M,1962.0,6292016,,,OS,55425870000.0,87,17
1,305.0,103.0,103.0,NYC,2016-04-01,1.0,NY,2016-04-11,63.0,2.0,...,,M,1953.0,6292016,,,OS,55425820000.0,87,17
2,496.0,103.0,103.0,CHI,2016-04-01,1.0,IL,2016-04-04,64.0,1.0,...,,M,1952.0,6292016,,,OS,55428620000.0,65,16
3,558.0,103.0,103.0,SFR,2016-04-01,1.0,CA,2016-04-03,42.0,1.0,...,,M,1974.0,6292016,M,,LH,55433310000.0,454,16
4,596.0,103.0,103.0,NAS,2016-04-01,1.0,FL,2016-04-03,24.0,2.0,...,,M,1992.0,6292016,M,,UP,55406110000.0,221,17


#### 4.2 Data Quality Checks
The data quality checks performed are the following:
 * Count checks to ensure completeness
 * Integrity constraints on the relational database (unique key)

Run Quality Checks

In [55]:
def quality_check_count(table_name, df):
    """
    description: generate parquet files on the directory generated
    :param table_name: table name (df name) to check 
    :param df: spark data frame to evaluate
    """
    count = df.count()

    if count == 0:
        print(f"Data quality check failed for the table: '{table_name}' (0 records found)")
    else:
        print(f"Data quality check passed for the table: '{table_name}' with {count:,} records.")

In [56]:
def quality_check_keys(table_name, df, keys):
    """
    description: check if the key is unique (count of the table is equal to distinct values for the key)
    :param table_name: table name (df name) to check 
    :param df: spark data frame to evaluate
    :param keys: keys to evaluate if they are unique
    """
    count = df.count()
    unique_values_count = df.select(countDistinct(*keys)).toPandas().iloc[0][0]

    if count != unique_values_count:
        print(f"Data quality check for unique key failed for the table: '{table_name}',  the table has {count} rows and the key has {unique_values_count} distinct values")
    else:
        print(f"Data quality check for unique key passed for the table: '{table_name}',  the table has {count} rows and distinct values for the key (unique) ")
        

In [57]:
# tables to perform the quality chekc
final_dfs = {
    'calendar': [calendar_dimension_df, ['event_date']],
    'visatype': [visatype_dimension_df, ['visatype_id']],
    'US_city_demographic': [US_city_demographic_df, ['state_code','city']],
    'country_dimension': [country_dimension_df, ['country_code']],
    'US_airport': [US_airport_dimension_df, ['i94port']],
    'immigration_fact': [immigration_fact_df, ['record_id']],
}


In [58]:
# perform the `quality_check_count`
for table_name, value in final_dfs.items():
    quality_check_count( table_name,value[0])

Data quality check passed for the table: 'calendar' with 235 records.
Data quality check passed for the table: 'visatype' with 17 records.
Data quality check passed for the table: 'US_city_demographic' with 588 records.
Data quality check passed for the table: 'country_dimension' with 289 records.
Data quality check passed for the table: 'US_airport' with 21,151 records.
Data quality check passed for the table: 'immigration_fact' with 3,096,313 records.


In [59]:
# perform the `quality_check_keys`
for table_name, value in final_dfs.items():
    quality_check_keys( table_name,value[0], value[1])

Data quality check for unique key passed for the table: 'calendar',  the table has 235 rows and distinct values for the key (unique) 
Data quality check for unique key passed for the table: 'visatype',  the table has 17 rows and distinct values for the key (unique) 
Data quality check for unique key passed for the table: 'US_city_demographic',  the table has 588 rows and distinct values for the key (unique) 
Data quality check for unique key passed for the table: 'country_dimension',  the table has 289 rows and distinct values for the key (unique) 
Data quality check for unique key passed for the table: 'US_airport',  the table has 21151 rows and distinct values for the key (unique) 
Data quality check for unique key passed for the table: 'immigration_fact',  the table has 3096313 rows and distinct values for the key (unique) 


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

##### Dimension tables
- **calendar**

|column name|description|
|----|---|
|event_date| date of the event (also it is the key of the table)|
|event_year| year of the event_date|
|event_month| month of the event_date |
|event_day| day of month|
|event_week_of_year| week of the year|
|event_day_of_week| day of week|

- **visatype**

|column name|description|
|----|---|
|visatype_id| identifier of the visa (also it is the key of the table)|
|visatype| visa type description|

- **US_city_demographic**

|column name|description|
|----|---|
|state_code|code of the US state (also it is the part of key of the table)|
|city| City name (also it is the part of key of the table)|
|state| Satate name|
|median_age| Population median age|
|male_population| count of male population|
|female_population|count of female population|
|total_population|count of total population|
|number_of_veterans| count of veterans|
|foreign_born| count of residents in the city that were born outsite of the city|
|average_Household Size|average household in the city|
|race_black_or_african_american| count of race: Black or African-American|
|race_hispanic_or_latino| count of race: Hispanic or Latino|
|race_white| count of race: White|
|race_asian|count of race: Asian|
|race_america_indian_and_alaska_native|count of race: American Indian and Alaska Native|

- **country**

|column name|description|
|----|---|
|country_code|code of the US state (also it is the key of the table)|
|country_name| name of the country|
|AverageTemperature|global average temperature in celsius| 
|AverageTemperatureUncertainty|the 95% confidence interval around the average|

- **US_airport**

|column name|description|
|----|---|
|i94port|code of the US state (also it is the key of the table)|
|type|  type of airport|
|name|name of the airport|
|elevation_ft|elvation in ft|
|state_code|U.S. states Abbreviations|
|city| city name|
|gps_code| gps code|
|coordinates|latitud and longitud|

##### Fact tables
- **immigration_fact**

|column name|description|
|---|---|
|record_id| record identifier (also it is the key of the table)|
|country_code| Country code   (reference to `country` table) |
|i94port| airport Code (reference to `US_airport` table)|
|arrival_date| is the Arrival Date in the USA. date format. (reference to `calendar` table) |
|i94mode| 1 = 'Air', 2 = 'Sea', 3 = 'Land', 9 = 'Not reported' |
|state_code| U.S. states Abbreviations (reference to `US_city_demographic` table) |
|depdate| is the Departure Date from the USA. date format. (reference to `calendar` table)|
|i94bir| Age of Respondent in Years |
|i94visa| Visa codes collapsed into three categories:  1 = Business, 2 = Pleasure, 3 = Student|
|count| count  of persons|
|dtadfile| Character Date Field - Date added to I-94 Files - CIC does not use|
|visapost|  Department of State where where Visa was issued - CIC does not use|
|occup| Occupation that will be performed in U.S. - CIC does not use |
|entdepa| Arrival Flag - admitted or paroled into the U.S. - CIC does not use |
|entdepd| Departure Flag - Departed, lost I-94 or is deceased - CIC does not use|
|entdepu| Update Flag - Either apprehended, overstayed, adjusted to perm residence - CIC does not use|
|matflag| Match flag - Match of arrival and departure records |
|biryear| 4 digit year of birth |
|dtaddto| Character Date Field - Date to which admitted to U.S. (allowed to stay until) - CIC does not use|
|gender| Non-immigrant sex |
|insnum| INS number |
|airline| Airline used to arrive in U.S.|
|admnum| Admission Number|
|fltno| Flight number of Airline used to arrive in U.S.|
|visatype_id| Class of admission legally admitting the non-immigrant to temporarily stay in U.S. (reference to `visatype` table) |

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.