# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project recaps the knowledge that I have gained during the Udacity Data Engineering NanoDegree Program.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Imports and installs that are required for the Project.
import pandas as pd
import psycopg2
import datetime as dt

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, LongType as Long
from pyspark.sql.functions import col, split, udf



### Step 1: Scope the Project and Gather Data

#### Scope 
In this project the US I94 immigration data are explored, enriched and connected with demographics, airport and temperature data for better analysis. The source of data are files and using Pandas and Spark dataframes we are cleaning and transforming, producing the output tables as parquet files.

#### Describe and Gather Data 

#### I94 Immigration Data

This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in.


In [2]:
# Read the sample Imigration data.csv and get an idea of how the fact table would look like.

filename = './immigration_data_sample.csv'
df = pd.read_csv (filename, index_col=[0])
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [3]:
# Read the i94 SAS data using Spark. 

spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark_i94 =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [24]:
# Write from Spark dataframe to Parquet files. Then read those files. 
# This is a demonstration of writing and reading parquet files.

df_spark_i94.write.parquet("sas_data")
df_spark_i94_f=spark.read.parquet("sas_data")
df_spark_i94_f.head(5)

In [6]:
# Read the i94 SAS data using Python dataframe.
# Using the Panda dataframes to read the same data, we can notice the big difference in the performance.
# Reading the file using Spark is faster than doing the same with Python dataframes.

fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_i94 = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")
pd.options.display.max_columns = None
df_i94.head(5)


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


#### World Temperature Data
This dataset came from Kaggle. You can read more about it here.

In [4]:
# Read the World Temperature Data.

fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname)
df_temp.head(20)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


#### U.S. City Demographic Data
This data comes from OpenSoft. You can read more about it here

In [5]:
# Read the Demographic data.

df_demographics = pd.read_csv("./us-cities-demographics.csv", delimiter=";")
df_demographics.head(20)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229.0,62432.0,118661,6634.0,7517.0,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712.0,41971.0,80683,4815.0,8355.0,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629.0,56860.0,108489,3800.0,37038.0,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762.0,43270.0,85032,5783.0,3269.0,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751.0,58077.0,109828,5204.0,16315.0,2.65,NC,Asian,11060


#### Airport Code Table
This is a simple table of airport codes and corresponding cities. It comes from here.

In [6]:
# Read the Airport Code table data.

df_airport_codes = pd.read_csv("./airport-codes_csv.csv")
df_airport_codes.head(20)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100.0,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810.0,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038.0,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87.0,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350.0,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.


In [7]:
# Explore the Spark dataframe that contains the I94 data.

df_spark_i94.head(5)

[Row(cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94res=692.0, i94port='XXX', arrdate=20573.0, i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2'),
 Row(cicid=7.0, i94yr=2016.0, i94mon=4.0, i94cit=254.0, i94res=276.0, i94port='ATL', arrdate=20551.0, i94mode=1.0, i94addr='AL', depdate=None, i94bir=25.0, i94visa=3.0, count=1.0, dtadfile='20130811', visapost='SEO', occup=None, entdepa='G', entdepd=None, entdepu='Y', matflag=None, biryear=1991.0, dtaddto='D/S', gender='M', insnum=None, airline=None, admnum=3736796330.0, fltno='00296', visatype='F1'),
 Row(cicid=15.0, i94yr=2016.0, i94mon=4.0, i94cit=101.0, i94res=101.0, i94port='WAS', arrdate=20545.0, i94mode=1.0, i94addr='MI', depdate=20691.0, i94bir=55.0, i94visa=2.0, count=1.0, dtadfile=

In [10]:
# Exploring the i94 data. (Pandas dataframe)
df_i94.describe()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,arrdate,i94mode,depdate,i94bir,i94visa,count,biryear,admnum
count,3096313.0,3096313.0,3096313.0,3096313.0,3096313.0,3096313.0,3096074.0,2953856.0,3095511.0,3096313.0,3096313.0,3095511.0,3096313.0
mean,3078652.0,2016.0,4.0,304.9069,303.2838,20559.85,1.07369,20573.95,41.76761,1.845393,1.0,1974.232,70828850000.0
std,1763278.0,0.0,0.0,210.0269,208.5832,8.777339,0.5158963,29.35697,17.42026,0.398391,0.0,17.42026,22154420000.0
min,6.0,2016.0,4.0,101.0,101.0,20545.0,1.0,15176.0,-3.0,1.0,1.0,1902.0,0.0
25%,1577790.0,2016.0,4.0,135.0,131.0,20552.0,1.0,20561.0,30.0,2.0,1.0,1962.0,56035230000.0
50%,3103507.0,2016.0,4.0,213.0,213.0,20560.0,1.0,20570.0,41.0,2.0,1.0,1975.0,59360940000.0
75%,4654341.0,2016.0,4.0,512.0,504.0,20567.0,1.0,20579.0,54.0,2.0,1.0,1986.0,93509870000.0
max,6102785.0,2016.0,4.0,999.0,760.0,20574.0,9.0,45427.0,114.0,3.0,1.0,2019.0,99915570000.0


In [9]:
# Exploring the temperature data.
df_temp.describe()

Unnamed: 0,AverageTemperature,AverageTemperatureUncertainty
count,8235082.0,8235082.0
mean,16.72743,1.028575
std,10.35344,1.129733
min,-42.704,0.034
25%,10.299,0.337
50%,18.831,0.591
75%,25.21,1.349
max,39.651,15.396


In [10]:
# Exploring the airport data.
df_airport_codes.describe()

Unnamed: 0,elevation_ft
count,48069.0
mean,1240.789677
std,1602.363459
min,-1266.0
25%,205.0
50%,718.0
75%,1497.0
max,22000.0


In [11]:
# Exploring the demographics data.
df_demographics.describe()

Unnamed: 0,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,Count
count,2891.0,2888.0,2888.0,2891.0,2878.0,2878.0,2875.0,2891.0
mean,35.494881,97328.43,101769.6,198966.8,9367.832523,40653.6,2.742543,48963.77
std,4.401617,216299.9,231564.6,447555.9,13211.219924,155749.1,0.433291,144385.6
min,22.9,29281.0,27348.0,63215.0,416.0,861.0,2.0,98.0
25%,32.8,39289.0,41227.0,80429.0,3739.0,9224.0,2.43,3435.0
50%,35.3,52341.0,53809.0,106782.0,5397.0,18822.0,2.65,13780.0
75%,38.0,86641.75,89604.0,175232.0,9368.0,33971.75,2.95,54447.0
max,70.5,4081698.0,4468707.0,8550405.0,156961.0,3212500.0,4.98,3835726.0


#### Cleaning Steps
I have created reference files using data from the I94_SAS_Labels_Descriptions file.
Cleaning and using the dataframes to replace column data in the I94 data.

In [7]:
def clean_countries():
    # Cleaning and loading the Countries data into a dataframe
    fname = './reference/i94cntyl.txt'
    df_countries = pd.read_csv(fname, sep = " =  ",names = ["country_code", "country"], header=None, skipinitialspace= True, engine='python')
    df_countries["country"] = df_countries["country"].astype(str).str.replace("'", "")
    df_countries.loc[df_countries["country"].str.contains("INVALID*"), "country"] = "Other"
    df_countries.loc[df_countries["country"].str.contains("No Country*"), "country"] = "Other"
    df_countries.loc[df_countries["country"].str.contains("Collapsed*"), "country"] = "Other"
    df_countries.loc[df_countries["country"].str.contains("Other*"), "country"] = "Other"

    # Create a spark Df from the Pandas Df
    df_spark_countries = spark.createDataFrame(df_countries)
    
    # perform data quality checks
    df_spark_countries.show(5, truncate=False)
    df_spark_countries.printSchema()
    df_spark_countries.count()
    
    return df_spark_countries



In [8]:
def clean_states():

    # Cleaning and loading the States data into a dataframe

    fname = './reference/i94addrl.txt'
    df_states = pd.read_csv(fname, sep = "=",names = ["state_code", "state"], header=None, skipinitialspace= True, engine='python')
    df_states["state"] = df_states["state"].astype(str).str.replace("'", "")
    df_states.iloc[ : , 0 ] = df_states.iloc[ : , 0].str.replace("'", "").str.replace("\t", "")

    # Create a spark Df from the Pandas Df
    df_spark_states = spark.createDataFrame(df_states)
    
    # perform data quality checks
    df_spark_states.show(5, truncate=False)
    df_spark_states.printSchema()
    df_spark_states.count()
    
    return df_spark_states


In [9]:
def clean_ports():

    # Cleaning and loading the Port data into a dataframe

    fname = './reference/i94prtl.txt'
    df_ports = pd.read_csv(fname, sep = "	=	",names = ["port_code", "city"], header=None, skipinitialspace= True, engine='python')
    df_ports["port_code"] = df_ports["port_code"].astype(str).str.replace("'", "")
    df_ports["city"] = df_ports["city"].astype(str).str.replace("'", "")
    df_new = df_ports["city"].str.split(", ", n = 1, expand = True) 

    # making separate state column from new data frame 
    df_ports["state"]= df_new[1].str.strip()

    # replacing the value of city column from new data frame 
    df_ports["city"]= df_new[0] 

    # Create a spark Df from the Pandas Df
    df_spark_ports = spark.createDataFrame(df_ports)
    
    # perform data quality checks
    df_spark_ports.show(5, truncate=False)
    df_spark_ports.printSchema()
    df_spark_ports.count()
    
    return df_spark_ports

In [10]:
def clean_modes():

    # Cleaning and loading the Mode data into a dataframe.

    fname = './reference/i94model.txt'
    df_modes = pd.read_csv(fname, sep = " = ",names = ["mode_code", "mode"], header=None, skipinitialspace= True, engine='python')
    df_modes["mode"] = df_modes["mode"].astype(str).str.replace("'", "")

    # Create a spark Df from the Pandas Df
    df_spark_modes = spark.createDataFrame(df_modes)
    
    # perform data quality checks
    df_spark_modes.show(5, truncate=False)
    df_spark_modes.printSchema()
    df_spark_modes.count()
    
    return df_spark_modes


In [11]:
def clean_visas():
    
    # Cleaning visas and loading into a dataframe.
    fname = './reference/i94visa.txt'
    df_visas = pd.read_csv(fname, sep = " = ",names = ["visa_code", "visa"], header=None, skipinitialspace= True, engine='python')
    df_visas["visa"] = df_visas["visa"].astype(str).str.replace("'", "")

    # Create a spark Df from the Pandas Df
    df_spark_visas = spark.createDataFrame(df_visas)
    
    # perform data quality checks
    df_spark_visas.show(5, truncate=False)
    df_spark_visas.printSchema()
    df_spark_visas.count()
    
    return df_spark_visas


In [12]:
def clean_temperatures():

    # Filtering the Temperature data to only the United States records. 
    # Then drop the unnecessary Country column.
    # Clean from empty values.

    df_us_temp = df_temp[df_temp["Country"] == "United States"]
    df_us_temp = df_us_temp.drop('Country', 1)
    df_us_temp.dropna(inplace=True)
    df_us_temp.head()
    
    return df_us_temp

In [13]:
# Call the functions that clean all the reference tables.

df_spark_countries = clean_countries()
df_spark_states = clean_states()
df_spark_ports = clean_ports()
df_spark_modes = clean_modes()
df_spark_visas = clean_visas()
df_us_temp = clean_temperatures()

+------------+---------------------------------------------------------+
|country_code|country                                                  |
+------------+---------------------------------------------------------+
|582         |MEXICO Air Sea, and Not Reported (I-94, no land arrivals)|
|236         |AFGHANISTAN                                              |
|101         |ALBANIA                                                  |
|316         |ALGERIA                                                  |
|102         |ANDORRA                                                  |
+------------+---------------------------------------------------------+
only showing top 5 rows

root
 |-- country_code: long (nullable = true)
 |-- country: string (nullable = true)

+----------+----------+
|state_code|state     |
+----------+----------+
|AL        |ALABAMA   |
|AK        |ALASKA    |
|AZ        |ARIZONA   |
|AR        |ARKANSAS  |
|CA        |CALIFORNIA|
+----------+----------+
only showing to

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The conceptual data model that I designed, contains the Immigrations fact table with direct relations to the airport, countries and demographics which are the dimensions tables. For further analysis of the temperatures data there is a direct relationship to the demographics table.

I have chosen the star schema, because the queries will focus on analysing the immigration data. Some examples of analytics queries on the current data model would be: 
From which countries do immigrants mostly come from?
What are the types of airports that they use mostly?
What is the total population of the state they arrive to?
What is the average temperature for the most visited locations?


![alt text](dmodel.png "Data Model")




### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [14]:
def create_temperature():

    # Create the Spark Dataframe schema that will host the temperature data.
    # Then create a Spark df from the existing pandas df.

    tempSchema = R([
                            Fld("date",Str()),
                            Fld("average_temperature",Dbl()),
                            Fld("average_temperature_uncertainty",Dbl()),
                            Fld("city",Str()),
                            Fld("latitude",Str()),
                            Fld("longitude",Str())    
                            ])
    df_us_temperature = spark.createDataFrame(df_us_temp,tempSchema)
    df_us_temperature.printSchema()
    
    return df_us_temperature

In [15]:
def create_airport_codes():

    # Clean the airport codes from empty iata_codes, because this is the field that will be used as PK in the dimension table.
    # Filter the records only to the US ones and drop the iso_country column.

    df_airport_codes.dropna(subset=['iata_code'], inplace=True)
    df_airport_codes_flt= df_airport_codes[df_airport_codes["iso_country"] == "US"]
    df_airport_codes_flt = df_airport_codes_flt.drop('iso_country', 1)


    # Create a spark Df from the Pandas Df.

    airportSchema = R([
                        Fld("airport_id",Str()),
                        Fld("type",Str()),
                        Fld("name",Str()),
                        Fld("elevation_ft",Str()),
                        Fld("continent",Str()),
                        Fld("iso_region",Str()),
                        Fld("municipality",Str()),
                        Fld("gps_code",Str()),
                        Fld("iata_code",Str()),
                        Fld("local_code",Str()),
                        Fld("coordinates",Str())
                        ])
    df_spark_airport_codes = spark.createDataFrame(df_airport_codes_flt,schema=airportSchema)

    # Split the iso_region field and keep the State as a new one.
    # Split the coordinates field and create latitude and longitude.
    # Drop coordinates, iso_region and continent.

    df_spark_airport_codes_flt = df_spark_airport_codes\
                                     .withColumn("state", split(col("iso_region"), "-")[1])\
                                     .withColumn("latitude", split(col("coordinates"), ",")[0].cast(Dbl()))\
                                     .withColumn("longitude", split(col("coordinates"), ",")[1].cast(Dbl()))\
                                     .drop("coordinates")\
                                     .drop("iso_region")\
                                     .drop("continent")
    
    # Clean dataframe from possible duplicates on the iata_code.
    df_spark_airport_codes_clean = df_spark_airport_codes_flt.dropDuplicates(["iata_code"])
    df_spark_airport_codes_clean.printSchema()
    
    return df_spark_airport_codes_clean



In [16]:
def create_demographics():

    # Create a schema to host the demographics data.

    demogrSchema = R([
                                Fld("city",Str()),
                                Fld("state",Str()),
                                Fld("median_age",Dbl()),
                                Fld("male_population",Int()),
                                Fld("female_population",Int()),
                                Fld("total_population",Int()),
                                Fld("number_of_veterans",Int()),
                                Fld("number_of_foreign_born",Int()),
                                Fld("average_household_size",Dbl()),
                                Fld("state_code",Str()),
                                Fld("race",Str()),
                                Fld("count",Int()) 
                                ])

    #Load the csv data into a Spark df.
    df_spark_demographics = spark.read.csv("./us-cities-demographics.csv", header='true', sep=";", schema=demogrSchema)

    # Filter the demographics df for the empty state records.
    # Additionally, filter the duplicates on the combination of state,city,race.
    df_demographics_cl = df_spark_demographics.filter(df_spark_demographics.state.isNotNull())\
                               .dropDuplicates(subset=['state', 'city', 'race'])

    df_demographics_cl.printSchema()
    
    return df_demographics_cl


In [17]:
# Call the functions that creates the dimension tables/dataframes

df_us_temperature = create_temperature() 
df_spark_airport_codes_clean = create_airport_codes()
df_demographics_cl = create_demographics()

root
 |-- date: string (nullable = true)
 |-- average_temperature: double (nullable = true)
 |-- average_temperature_uncertainty: double (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)

root
 |-- airport_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- state: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- number

In [18]:
def create_immigrations():

    # Create the function that will get the date from the SAS format.
    get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)

    # Transform the date format for the arrdate and depdate.
    df_spark_i94_new = df_spark_i94.withColumn("arrdate", get_date(df_spark_i94.arrdate)).withColumn("depdate", get_date(df_spark_i94.depdate))


    df_spark_i94_new.createOrReplaceTempView("immigrations")
    df_spark_states.createOrReplaceTempView("states")
    df_spark_visas.createOrReplaceTempView("visas")
    df_spark_modes.createOrReplaceTempView("modes") 

    # Get the modes and visas from the reference mappings, set missing states to 99.
    df_immigrations = spark.sql("""
                                            select i.cicid,
                                                    i.i94yr as year,
                                                    i.i94mon as month,
                                                    i.i94cit as birth_country,
                                                    i.i94res as residence_country,
                                                    i.i94port as port,
                                                    i.arrdate as arrival_date,
                                                    coalesce(m.mode, 'Not reported') as arrival_mode,
                                                    coalesce(c.state_code, '99') as us_state,
                                                    i.depdate as departure_date,
                                                    i.i94bir as age,
                                                    coalesce(v.visa, 'Other') as visa_type_code,
                                                    i.dtadfile as date_added,
                                                    i.visapost as visa_issued_department,
                                                    i.occup as occupation,
                                                    i.entdepa as arrival_flag,
                                                    i.entdepd as departure_flag,
                                                    i.entdepu as update_flag,
                                                    i.matflag as match_arrival_dep_flag,
                                                    i.biryear as birth_year,
                                                    i.dtaddto as allowed_date,
                                                    i.insnum as ins_number,
                                                    i.airline as airline,
                                                    i.admnum as admission_number,
                                                    i.fltno as flight_number,
                                                    i.visatype as visa_type
                                                from immigrations i left join states c on i.i94addr=c.state_code
                                                    left join visas v on i.i94visa=v.visa_code
                                                    left join modes m on i.i94mode=m.mode_code
                                            """)

    # Drop unnecessary flags from the dataframe.
    df_immigrations = df_immigrations.drop("arrival_flag","departure_flag","update_flag","match_arrival_dep_flag")
    df_immigrations.printSchema()
    
    return df_immigrations

In [19]:
# Call the function that creates the fact table.

df_immigrations = create_immigrations()


root
 |-- cicid: double (nullable = true)
 |-- year: double (nullable = true)
 |-- month: double (nullable = true)
 |-- birth_country: double (nullable = true)
 |-- residence_country: double (nullable = true)
 |-- port: string (nullable = true)
 |-- arrival_date: string (nullable = true)
 |-- arrival_mode: string (nullable = false)
 |-- us_state: string (nullable = false)
 |-- departure_date: string (nullable = true)
 |-- age: double (nullable = true)
 |-- visa_type_code: string (nullable = false)
 |-- date_added: string (nullable = true)
 |-- visa_issued_department: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- birth_year: double (nullable = true)
 |-- allowed_date: string (nullable = true)
 |-- ins_number: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admission_number: double (nullable = true)
 |-- flight_number: string (nullable = true)
 |-- visa_type: string (nullable = true)



#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [20]:
# Count and show 5 of the Demographics records.
df_demographics_cl.show(5, truncate=False)
df_demographics_cl.count()


+---------------+----------+----------+---------------+-----------------+----------------+------------------+----------------------+----------------------+----------+---------------------------------+------+
|city           |state     |median_age|male_population|female_population|total_population|number_of_veterans|number_of_foreign_born|average_household_size|state_code|race                             |count |
+---------------+----------+----------+---------------+-----------------+----------------+------------------+----------------------+----------------------+----------+---------------------------------+------+
|Mesa           |Arizona   |36.9      |234998         |236835           |471833          |31808             |57492                 |2.68                  |AZ        |Hispanic or Latino               |131425|
|Springdale     |Arkansas  |31.8      |36840          |43614            |80454           |3397              |19969                 |3.04                  |AR        |Am

2891

In [21]:
# Count and show 5 of the airport_codes records.

df_spark_airport_codes_clean.show(5, truncate=False)
df_spark_airport_codes_clean.count()


+----------+--------------+--------------------------------------------------+------------+------------+--------+---------+----------+-----+------------------+---------------+
|airport_id|type          |name                                              |elevation_ft|municipality|gps_code|iata_code|local_code|state|latitude          |longitude      |
+----------+--------------+--------------------------------------------------+------------+------------+--------+---------+----------+-----+------------------+---------------+
|KBGM      |medium_airport|Greater Binghamton/Edwin A Link field             |1636.0      |Binghamton  |KBGM    |BGM      |BGM       |NY   |-75.97979736      |42.20869827    |
|2TE0      |small_airport |Eagle Air Park                                    |15.0        |Brazoria    |2TE0    |BZT      |2TE0      |TX   |-95.579696655273  |28.982200622559|
|KCNU      |medium_airport|Chanute Martin Johnson Airport                    |1002.0      |Chanute     |KCNU    |CNU    

2014

In [22]:
# Count and show 5 of the temperature records. 
df_us_temperature.show(5, truncate=False)
df_us_temperature.count()

+----------+-------------------+-------------------------------+-------+--------+---------+
|date      |average_temperature|average_temperature_uncertainty|city   |latitude|longitude|
+----------+-------------------+-------------------------------+-------+--------+---------+
|1820-01-01|2.1010000000000004 |3.217                          |Abilene|32.95N  |100.53W  |
|1820-02-01|6.926              |2.853                          |Abilene|32.95N  |100.53W  |
|1820-03-01|10.767000000000001 |2.395                          |Abilene|32.95N  |100.53W  |
|1820-04-01|17.988999999999994 |2.202                          |Abilene|32.95N  |100.53W  |
|1820-05-01|21.809             |2.036                          |Abilene|32.95N  |100.53W  |
+----------+-------------------+-------------------------------+-------+--------+---------+
only showing top 5 rows



661524

In [23]:
# Count and show 5 of the immigrations records. 

df_immigrations.show(5, truncate=False)
df_immigrations.count()


+---------+------+-----+-------------+-----------------+----+------------+------------+--------+--------------+----+--------------+----------+----------------------+----------+----------+------------+----------+-------+----------------+-------------+---------+
|cicid    |year  |month|birth_country|residence_country|port|arrival_date|arrival_mode|us_state|departure_date|age |visa_type_code|date_added|visa_issued_department|occupation|birth_year|allowed_date|ins_number|airline|admission_number|flight_number|visa_type|
+---------+------+-----+-------------+-----------------+----+------------+------------+--------+--------------+----+--------------+----------+----------------------+----------+----------+------------+----------+-------+----------------+-------------+---------+
|1360029.0|2016.0|4.0  |116.0        |116.0            |ATL |2016-04-03  |Not reported|99      |2016-04-07    |55.0|Business      |20160408  |null                  |null      |1961.0    |null        |null      |null  

3096313

#### 4.3 Data dictionary 
The I94_SAS_Labels_Descriptions.SAS contains all the data field descriptions about the I94 immigration data. 

I94YR - 4 digit year

I94MON - Numeric month

I94CIT & I94RES - This format shows all the valid and invalid codes for processing

I94PORT - This format shows all the valid and invalid codes for processing

ARRDATE is the Arrival Date in the USA. It is a SAS date numeric field that a 
   permament format has not been applied.
   
I94MODE - There are missing values as well as not reported (9)

I94ADDR - There is lots of invalid codes in this variable and the list below.

DEPDATE is the Departure Date from the USA. It is a SAS date numeric field that 
   a permament format has not been applied.  

   I94BIR - Age of Respondent in Years
   
   I94VISA - Visa codes collapsed into three categories:
   1 = Business
   2 = Pleasure
   3 = Student
   
   COUNT - Used for summary statistics
   
   DTADFILE - Character Date Field - Date added to I-94 Files - CIC does not use
   
   VISAPOST - Department of State where where Visa was issued - CIC does not use
   
   OCCUP - Occupation that will be performed in U.S. - CIC does not use
   
   BIRYEAR - 4 digit year of birth
   
   DTADDTO - Character Date Field - Date to which admitted to U.S. (allowed to stay until) - CIC does not use
   
   GENDER - Non-immigrant sex
   
   INSNUM - INS number
   
   AIRLINE - Airline used to arrive in U.S.
   
   ADMNUM - Admission Number
   
   FLTNO - Flight number of Airline used to arrive in U.S.
   
   VISATYPE - Class of admission legally admitting the non-immigrant to temporarily stay in U.S.

#### 4.4 Export files 
Export the data tables into parquet files.

In [24]:
#Export the demographics table.
df_demographics_cl.write.mode("overwrite").parquet("output/demographics")

In [25]:
#Export the airport codes table.
df_spark_airport_codes_clean.write.mode("overwrite").parquet("output/airport_codes")

In [26]:
#Export the temperature table partitioned by city field.
df_us_temperature.write.mode("overwrite").partitionBy("city").parquet("output/temperatures")

In [27]:
#Export the countries table.
df_spark_countries.write.mode("overwrite").parquet("output/countries")

In [28]:
#Export the immigrations table partitioned by year and month.
df_immigrations.write.mode("overwrite").partitionBy("year","month").parquet("output/immigrations")

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Clearly state the rationale for the choice of tools and technologies for the project.

I have worked using the Pandas and Spark dataframes depending on the volumes and the complexity of the data.
In some cases, it's easier to use Pandas dataframes to perform cleaning / transformation of the data. When it comes to fast processing and Apache Spark dataframes is the best way to go. In addition, Spark gives the flexibility to handle different file formats (SAS) and perform standatd SQL queries on the tables (after creating Temporary Views on them).
Defining the Schema is also a plus for the Apache Spark choice of working. The output files are parquet format, allowing us to partition and upload large datasets to a cloud solution.

#### Propose how often the data should be updated and why.

The demographics data should be updated once a year. Immigration data, temperatures and airport codes should be updated once a month based on their lifecycle nature. In case they need to be updated more often, this should happen on a daily basis. Countries and other reference data can be updated rarely.

#### Write a description of how you would approach the problem differently under the following scenarios:

#### The data was increased by 100x.
If the project is heavy on reading over writing, I would store the data in AWS Redshift clusters, increasing the number of nodes and processing power.
The solution is already implemented in Spark, so in the case of heavy writing I would use EMR (or a similar distributed cloud solution) to store the data into HDFS with Spark distribution.

#### The data populates a dashboard that must be updated on a daily basis by 7am every day.
The ETL steps can be implemented as DAG steps in Apache Airflow or a similar product. In that case we can choose which of the datasets should be updated daily and which of them less often.
The DAG would be scheduled to run at 7am every day. In case of DAG failure, the Dashboard is not updated and a notification/email is sent to the affected teams/individuals.

#### The database needed to be accessed by 100+ people
A proven cloud solution like a DWH (Redshift) can be used so that the data can be consumed by a large number of people. However, considering the amount of queries and the availability in the day we can also run specific queries and store the results in S3 for quick access and consumption by the BI and Analytics apps.