# US Immigration Insights
### Data Engineering Capstone Project

#### Project Summary
This Project creates a Data Lake type of ETL pipeline to process, clean and store data related to US I94 Immigration data using Spark SQL.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
import os
import glob
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *
import re
from pyspark.sql import types as t

### Step 1: Scope the Project and Gather Data

#### Scope 
Scope of the project is to create an ETL pipeline for processing, cleaning and storing data related to US I94 immigration data, and country codes, processed data stored in Star schema model to parquet files. 

#### Describe and Gather Data 

* **World Temperature Data**: comes from kaggle and includes data on temperature changes in the U.S. since 1850.
    * Source: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data

* **data/18-83510-I94-Data-2016/**: US I94 immigration data from 2016 (Jan-Dec).
    * Source: https://travel.trade.gov/research/reports/i94/historical/2016.html
    * Description: I94_SAS_Labels_Descriptions.txt file contains descriptions for the I94 data.
        * I94 dataset has SAS7BDAT file per each month of the year (e.g. i94_jan16_sub.sas7bdat).
        * Each file contains about 3M rows
    * Data is available for Udacity DEND course
    
* **data/i94_airport_codes.xlsx**: Airport codes and related cities defined in I94 data description file.
    * Source: https://travel.trade.gov/research/reports/i94/historical/2016.html
    * Description: I94 Airport codes data contains information about different airports around the world.
    * NOTE: I94 data uses its own codes for airports instead of using standard codes (like IATA). Therefore, I94 airport codes have been taken from I94 data description file and processed for ETL use.  

* **data/i94_country_codes.xlsx**: Country codes defined in US I94 Immigration data description file. 
    * Source: https://travel.trade.gov/research/reports/i94/historical/2016.html
    * Description: I94 Country codes data contains information about countries people come to US from.
    * NOTE: I94 data uses its own codes for countries instead of using ISO-3166 standard codes. Therefore, I94 country codes have been taken from I94 data description file and processed for ETL use.
  
* **data/airport-codes.csv**: Airport codes and related cities.
    * Source: https://datahub.io/core/airport-codes#data
    * Description: Airpot codes data contains information about different airports around the world.

* **data/iso-3166-country-codes.json**: World country codes (ISO-3166)
    * Source: https://www.iso.org/iso-3166-country-codes.html



## Step 2: Explore and Assess the Data
### Explore the Data
Identify data quality issues, like missing values, duplicate data, etc.

### Cleaning Steps

- World Temperature Data:
    * Filter average temperature data only for year == 2013 and create new fields with year, month, fahrenheit and run abbreviations function and drop duplicates.
- I94 data:
    * Remove nulls then select important columns from the immigration data and drop duplicates.
    
- I94 Airport data: 
    * Remove quote marks and extra white spaces from the data.
- I94 Country Code data: 
    * Remove quote marks and extra white spaces from the data.
- ISO Country Code data:
    * No action required. Antarctica is handled as a special case to avoid duplicate data.

In [2]:
#Build spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [3]:
#Build SQL context object
sqlContext = SQLContext(spark)

In [4]:
# Read in the data here
airport=spark.read.format("csv").option("header", "true").load("airport-codes_csv.csv")

## Clean and read I94 Airport code data to Spark

In [30]:
# Read I94 Airport codes data:
airport_codes_i94_df = pd.read_excel("i94_airport_codes.xlsx", header=0, index_col=0)
airport_codes_i94_df.head()

Unnamed: 0_level_0,i94_airport_name
i94port,Unnamed: 1_level_1
'ALC',"'ALCAN, AK '"
'ANC',"'ANCHORAGE, AK '"
'BAR',"'BAKER AAF - BAKER ISLAND, AK'"
'DAC',"'DALTONS CACHE, AK '"
'PIZ',"'DEW STATION PT LAY DEW, AK'"


In [8]:
# Cleaning I94 Airport data first
ac = {"i94port_clean": [], "i94_airport_name_clean": [], "i94_state_clean": []}
codes = []
names = []
states = []
for index, row in airport_codes_i94_df.iterrows():
    y = re.sub("'", "", index)
    x = re.sub("'", "", row[0])
    z = re.sub("'", "", row[0]).split(",")
    y = y.strip()
    z[0] = z[0].strip()
    
    if len(z) == 2:
        codes.append(y)
        names.append(z[0])
        z[1] = z[1].strip()
        states.append(z[1])
    else:
        codes.append(y)
        names.append(z[0])
        states.append("NaN")

ac["i94port_clean"] = codes
ac["i94_airport_name_clean"] = names
ac["i94_state_clean"] = states

airport_codes_i94_df_clean = pd.DataFrame.from_dict(ac)

In [9]:
airport_codes_i94_schema = t.StructType([
                            t.StructField("i94_port", t.StringType(), False),
                            t.StructField("i94_airport_name", t.StringType(), False),
                            t.StructField("i94_airport_state", t.StringType(), False)
                        ])
new_airport_data = spark.createDataFrame(airport_codes_i94_df_clean, schema=airport_codes_i94_schema)

In [10]:
new_airport_data.printSchema()
new_airport_data.show(5, truncate=False)

root
 |-- i94_port: string (nullable = false)
 |-- i94_airport_name: string (nullable = false)
 |-- i94_airport_state: string (nullable = false)

+--------+------------------------+-----------------+
|i94_port|i94_airport_name        |i94_airport_state|
+--------+------------------------+-----------------+
|ALC     |ALCAN                   |AK               |
|ANC     |ANCHORAGE               |AK               |
|BAR     |BAKER AAF - BAKER ISLAND|AK               |
|DAC     |DALTONS CACHE           |AK               |
|PIZ     |DEW STATION PT LAY DEW  |AK               |
+--------+------------------------+-----------------+
only showing top 5 rows



## Clean and read ISO Country Code data to Spark

In [36]:
country_codes_df = pd.read_csv('iso-3166-country-codes.csv', header=0)
country_codes_df.head()

Unnamed: 0,name,alpha-2,alpha-3,country-code,iso_3166-2,region,sub-region,intermediate-region,region-code,sub-region-code,intermediate-region-code
0,Afghanistan,AF,AFG,4,ISO 3166-2:AF,Asia,Southern Asia,,142.0,34.0,
1,Åland Islands,AX,ALA,248,ISO 3166-2:AX,Europe,Northern Europe,,150.0,154.0,
2,Albania,AL,ALB,8,ISO 3166-2:AL,Europe,Southern Europe,,150.0,39.0,
3,Algeria,DZ,DZA,12,ISO 3166-2:DZ,Africa,Northern Africa,,2.0,15.0,
4,American Samoa,AS,ASM,16,ISO 3166-2:AS,Oceania,Polynesia,,9.0,61.0,


In [11]:
country_code_schema = t.StructType([
                            t.StructField("name", t.StringType(), False),
                            t.StructField("alpha_2", t.StringType(), False),
                            t.StructField("alpha_3", t.StringType(), False),
                            t.StructField("country_code", t.StringType(), False),
                            t.StructField("iso_3166_2", t.StringType(), False),
                            t.StructField("region", t.StringType(), True),
                            t.StructField("sub_region", t.StringType(), True),
                            t.StructField("intermediate_region", t.StringType(), False),
                            t.StructField("region_code", t.StringType(), True),
                            t.StructField("sub_region_code", t.StringType(), True),
                            t.StructField("intermediate_region_code", t.StringType(), False)
                        ])
country_codes_iso_df_spark = spark.createDataFrame(country_codes_df, schema=country_code_schema)

In [12]:
country_codes_iso_df_spark.printSchema()
country_codes_iso_df_spark.show(5, truncate=False)

root
 |-- name: string (nullable = false)
 |-- alpha_2: string (nullable = false)
 |-- alpha_3: string (nullable = false)
 |-- country_code: string (nullable = false)
 |-- iso_3166_2: string (nullable = false)
 |-- region: string (nullable = true)
 |-- sub_region: string (nullable = true)
 |-- intermediate_region: string (nullable = false)
 |-- region_code: string (nullable = true)
 |-- sub_region_code: string (nullable = true)
 |-- intermediate_region_code: string (nullable = false)

+--------------+-------+-------+------------+-------------+-------+---------------+-------------------+-----------+---------------+------------------------+
|name          |alpha_2|alpha_3|country_code|iso_3166_2   |region |sub_region     |intermediate_region|region_code|sub_region_code|intermediate_region_code|
+--------------+-------+-------+------------+-------------+-------+---------------+-------------------+-----------+---------------+------------------------+
|Afghanistan   |AF     |AFG    |4      

## Clean and read i94_country_codes to Spark

In [37]:
country_codes_i94_df = pd.read_excel('i94_country_codes.xlsx', header=0, index_col=0)
country_codes_i94_df.head()

Unnamed: 0_level_0,i94_country_name,iso_country_code
i94cit,Unnamed: 1_level_1,Unnamed: 2_level_1
582,MEXICO',484
236,'AFGHANISTAN',4
101,'ALBANIA',8
316,'ALGERIA',12
102,'ANDORRA',20


In [13]:
# Cleaning I94 Country Code data first
cc = {"i94cit_clean": [],
      "i94_country_name_clean": [],
      "iso_country_code_clean" : []
      }
ccodes = []
cnames = []
ccodes_iso = []

for index, row in country_codes_i94_df.iterrows():
    cname = re.sub("'", "", row[0]).strip()
    ccode_iso = row[1]
    ccodes.append(index)
    cnames.append(cname)
    ccodes_iso.append(ccode_iso)

cc["i94cit_clean"] = ccodes
cc["i94_country_name_clean"] = cnames
cc["iso_country_code_clean"] = ccodes_iso
country_codes_i94_df_clean = pd.DataFrame.from_dict(cc)

In [14]:
country_codes_i94_schema = t.StructType([
                            t.StructField("i94_cit", t.StringType(), False),
                            t.StructField("i94_country_name", t.StringType(), False),
                            t.StructField("iso_country_code", t.StringType(), False)
                        ])
country_codes_i94_df_spark = spark.createDataFrame(country_codes_i94_df_clean, schema=country_codes_i94_schema)

In [15]:
country_codes_i94_df_spark.printSchema()
country_codes_i94_df_spark.show(5, truncate=False)

root
 |-- i94_cit: string (nullable = false)
 |-- i94_country_name: string (nullable = false)
 |-- iso_country_code: string (nullable = false)

+-------+----------------+----------------+
|i94_cit|i94_country_name|iso_country_code|
+-------+----------------+----------------+
|582    |MEXICO          |484             |
|236    |AFGHANISTAN     |4               |
|101    |ALBANIA         |8               |
|316    |ALGERIA         |12              |
|102    |ANDORRA         |20              |
+-------+----------------+----------------+
only showing top 5 rows



## Read and clean Temperature Data by Country to Spark

In [16]:
temperatureData=spark.read.format("csv").option("header", "true").load("GlobalLandTemperaturesByCountry.csv")

#filter the world Temperature Data for year only == 2013 and drop duplicates and convert celcius temp to fahrenheit
Temperatures=temperatureData.filter(year(temperatureData["dt"])==2013)\
.withColumn("year",year(temperatureData["dt"]))\
.withColumn("month",month(temperatureData["dt"]))\
.withColumn("avg_temp_fahrenheit",temperatureData["AverageTemperature"]*9/5+32)\
.withColumn("Country", upper(col("Country")))


new_Temperatures=Temperatures.select("year","month",round(col("AverageTemperature"),1).alias("avg_temp_celcius"),\
                                       round(col("avg_temp_fahrenheit"),1).alias("avg_temp_fahrenheit"),
                                       "Country").dropDuplicates()
new_Temperatures_joined = new_Temperatures.join(country_codes_i94_df_spark, \
                                            (country_codes_i94_df_spark.i94_country_name == \
                                                    new_Temperatures.Country))\
                                            .withColumn("i94_cit", col("i94_cit").cast("integer"))



In [17]:
new_Temperatures_joined.printSchema()
new_Temperatures_joined.show(10)
new_Temperatures_joined.count()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- avg_temp_celcius: double (nullable = true)
 |-- avg_temp_fahrenheit: double (nullable = true)
 |-- Country: string (nullable = true)
 |-- i94_cit: integer (nullable = true)
 |-- i94_country_name: string (nullable = false)
 |-- iso_country_code: string (nullable = false)

+----+-----+----------------+-------------------+-------+-------+----------------+----------------+
|year|month|avg_temp_celcius|avg_temp_fahrenheit|Country|i94_cit|i94_country_name|iso_country_code|
+----+-----+----------------+-------------------+-------+-------+----------------+----------------+
|2013|    7|            21.8|               71.2|ARMENIA|    151|         ARMENIA|              51|
|2013|    9|            null|               null|ARMENIA|    151|         ARMENIA|              51|
|2013|    8|            21.5|               70.8|ARMENIA|    151|         ARMENIA|              51|
|2013|    2|             0.6|              

1710

## Clean and read Immigration Data 

In [18]:
df_spark=spark.read.format('com.github.saurfang.sas.spark').load("../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat")

i94_data=df_spark.filter(df_spark.i94addr.isNotNull())\
.filter(df_spark.i94res.isNotNull())\
.withColumn("i94yr",col("i94yr").cast("integer"))\
.withColumn("i94mon",col("i94mon").cast("integer"))\
.withColumn("i94cit",col("i94cit").cast("integer"))\

new_I94_Data=i94_data.select("cicid",col("i94yr").alias("year"),col("i94mon").alias("month"),\
                             "i94port",col("i94addr").alias("state_code"), col("i94port").alias("airport_id"), col("i94cit").alias("country_id"))


In [19]:
new_I94_Data.show(5)
new_I94_Data.printSchema()

+-----+----+-----+-------+----------+----------+----------+
|cicid|year|month|i94port|state_code|airport_id|country_id|
+-----+----+-----+-------+----------+----------+----------+
|  7.0|2016|    4|    ATL|        AL|       ATL|       254|
| 15.0|2016|    4|    WAS|        MI|       WAS|       101|
| 16.0|2016|    4|    NYC|        MA|       NYC|       101|
| 17.0|2016|    4|    NYC|        MA|       NYC|       101|
| 18.0|2016|    4|    NYC|        MI|       NYC|       101|
+-----+----+-----+-------+----------+----------+----------+
only showing top 5 rows

root
 |-- cicid: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- airport_id: string (nullable = true)
 |-- country_id: integer (nullable = true)



# Step 3: Define the Data Model
## 3.1 Conceptual Data Model
### Star Schema

### **Data Dictionary Dimension Tables**
#### new_airport_data
 * i94_port: string (nullable = false)
 * i94_airport_name: string (nullable = false)
 * i94_airport_state: string (nullable = false)

 
#### country_codes_iso_df_spark
 * country_code: string (nullable = false)
 * country_name: string (nullable = false
 * alpha_2: string (nullable = false)
 * alpha_3: string (nullable = false)
 * iso_3166_2: string (nullable = false)
 * region: string (nullable = true)
 * sub_region: string (nullable = true)
 * intermediate_region: string (nullable = false)
 * region_code: string (nullable = true)
 * sub_region_code: string (nullable = true)
 * intermediate_region_code: string (nullable = false)
 
#### new_I94_Data
 * cicid: double (nullable = true)-ID number of each individual
 * year: integer (nullable = true)-Year of Immigration
 * month: integer (nullable = true)-Month of Immigration
 * i94port: string (nullable = true)-City Port Code where Immigrant entered
 
#### new_Temperatures_joined
 * year: integer (nullable = true)-Temperature Year
 * month: integer (nullable = true)-Temerpature Month
 * avg_temp_celcius: double (nullable = true)-Avg Temperature in Celcius
 * avg_temp_fahrenheit: double (nullable = true)-Avg Temperatrue in Fahrenheit
 * Country: string (nullable = true)- Country name
 * i94_cit: integer (nullable = true)
 * i94_country_name: string (nullable = false)
 * iso_country_code: string (nullable = false)


### Fact Table (immigrations_table)
 * cicid: double (nullable = true)-ID number of each individual
 * avg_temp_fahrenheit: double (nullable = true)-Avg Temperature in Celcius
 * airport_id: string (nullable = true)
 * country_id: integer (nullable = true)
 * country_name: string (nullable = false)
 
 
 
 
### 3.2 Mapping Out Data Pipelines
1. Dimension tables will be created from cleansed data.
2. Fact table is created as a SQL query with joins to dimension tables.
3. Fact table is converted back to a spark dataframe.
4. Fact table is written as final parquet file.








# Step 4: Run Pipelines to Model the Data 
### 4.1 Create the data model

1. Dimension tables will be created from cleansed data.
2. Fact table is created as a SQL query with joins to dimension tables.
3. Fact table is converted back to a spark dataframe.
4. Fact table is written as final parquet file.

## Create countries table 

In [21]:
# Join tables
country_codes_i94_df_spark_joined = country_codes_i94_df_spark\
                                        .join(country_codes_iso_df_spark, \
                                            (country_codes_i94_df_spark.iso_country_code == \
                                                    country_codes_iso_df_spark.country_code))

In [22]:
# Create table
country_codes_i94_df_spark_joined.createOrReplaceTempView("countries_table_DF")
countries_table = spark.sql("""
        SELECT DISTINCT i94_cit          AS country_code,
                        i94_country_name AS country_name,
                        iso_country_code AS iso_ccode,
                        alpha_2          AS iso_alpha_2,
                        alpha_3          AS iso_alpha_3,
                        iso_3166_2       AS iso_3166_2_code,
                        name             AS iso_country_name,
                        region           AS iso_region,
                        sub_region       AS iso_sub_region,
                        region_code      AS iso_region_code,
                        sub_region_code  AS iso_sub_region_code
        FROM countries_table_DF          AS countries
        ORDER BY country_name
    
""")
countries_table.printSchema()
countries_table.show(20)
countries_table.count()

root
 |-- country_code: string (nullable = false)
 |-- country_name: string (nullable = false)
 |-- iso_ccode: string (nullable = false)
 |-- iso_alpha_2: string (nullable = false)
 |-- iso_alpha_3: string (nullable = false)
 |-- iso_3166_2_code: string (nullable = false)
 |-- iso_country_name: string (nullable = false)
 |-- iso_region: string (nullable = true)
 |-- iso_sub_region: string (nullable = true)
 |-- iso_region_code: string (nullable = true)
 |-- iso_sub_region_code: string (nullable = true)

+------------+---------------+---------+-----------+-----------+---------------+-------------------+----------+--------------------+---------------+-------------------+
|country_code|   country_name|iso_ccode|iso_alpha_2|iso_alpha_3|iso_3166_2_code|   iso_country_name|iso_region|      iso_sub_region|iso_region_code|iso_sub_region_code|
+------------+---------------+---------+-----------+-----------+---------------+-------------------+----------+--------------------+---------------+-----

229

In [23]:
new_I94_Data.printSchema()
new_I94_Data.show(5)

root
 |-- cicid: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- airport_id: string (nullable = true)
 |-- country_id: integer (nullable = true)

+-----+----+-----+-------+----------+----------+----------+
|cicid|year|month|i94port|state_code|airport_id|country_id|
+-----+----+-----+-------+----------+----------+----------+
|  7.0|2016|    4|    ATL|        AL|       ATL|       254|
| 15.0|2016|    4|    WAS|        MI|       WAS|       101|
| 16.0|2016|    4|    NYC|        MA|       NYC|       101|
| 17.0|2016|    4|    NYC|        MA|       NYC|       101|
| 18.0|2016|    4|    NYC|        MI|       NYC|       101|
+-----+----+-----+-------+----------+----------+----------+
only showing top 5 rows



In [24]:
countries_table.printSchema()
countries_table.show(5)

root
 |-- country_code: string (nullable = false)
 |-- country_name: string (nullable = false)
 |-- iso_ccode: string (nullable = false)
 |-- iso_alpha_2: string (nullable = false)
 |-- iso_alpha_3: string (nullable = false)
 |-- iso_3166_2_code: string (nullable = false)
 |-- iso_country_name: string (nullable = false)
 |-- iso_region: string (nullable = true)
 |-- iso_sub_region: string (nullable = true)
 |-- iso_region_code: string (nullable = true)
 |-- iso_sub_region_code: string (nullable = true)

+------------+------------+---------+-----------+-----------+---------------+----------------+----------+------------------+---------------+-------------------+
|country_code|country_name|iso_ccode|iso_alpha_2|iso_alpha_3|iso_3166_2_code|iso_country_name|iso_region|    iso_sub_region|iso_region_code|iso_sub_region_code|
+------------+------------+---------+-----------+-----------+---------------+----------------+----------+------------------+---------------+-------------------+
|       

In [25]:
new_airport_data.printSchema()
new_airport_data.show(5)

root
 |-- i94_port: string (nullable = false)
 |-- i94_airport_name: string (nullable = false)
 |-- i94_airport_state: string (nullable = false)

+--------+--------------------+-----------------+
|i94_port|    i94_airport_name|i94_airport_state|
+--------+--------------------+-----------------+
|     ALC|               ALCAN|               AK|
|     ANC|           ANCHORAGE|               AK|
|     BAR|BAKER AAF - BAKER...|               AK|
|     DAC|       DALTONS CACHE|               AK|
|     PIZ|DEW STATION PT LA...|               AK|
+--------+--------------------+-----------------+
only showing top 5 rows



In [26]:
new_Temperatures_joined.printSchema()
new_Temperatures_joined.show(5)

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- avg_temp_celcius: double (nullable = true)
 |-- avg_temp_fahrenheit: double (nullable = true)
 |-- Country: string (nullable = true)
 |-- i94_cit: integer (nullable = true)
 |-- i94_country_name: string (nullable = false)
 |-- iso_country_code: string (nullable = false)

+----+-----+----------------+-------------------+-------+-------+----------------+----------------+
|year|month|avg_temp_celcius|avg_temp_fahrenheit|Country|i94_cit|i94_country_name|iso_country_code|
+----+-----+----------------+-------------------+-------+-------+----------------+----------------+
|2013|    7|            21.8|               71.2|ARMENIA|    151|         ARMENIA|              51|
|2013|    9|            null|               null|ARMENIA|    151|         ARMENIA|              51|
|2013|    8|            21.5|               70.8|ARMENIA|    151|         ARMENIA|              51|
|2013|    2|             0.6|              

In [31]:
Joint_imm_tables = new_I94_Data.join(new_Temperatures_joined,\
                           (new_I94_Data.country_id == new_Temperatures_joined.i94_cit) & \
                            (new_I94_Data.month == new_Temperatures_joined.month))\
                        .join(countries_table,\
                             (new_I94_Data.country_id == countries_table.country_code))\
                         .join(new_airport_data,\
                              (new_I94_Data.airport_id == new_airport_data.i94_port))

In [29]:
#allow unlimited time for SQL joins and parquet writes.
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "0")

In [32]:
Joint_imm_tables.createOrReplaceTempView("immigrations_table")
immigrations_table = spark.sql("""
           SELECT    cicid,
                     avg_temp_fahrenheit,
                     airport_id,
                     country_id,
                     country_name
                     
                     
           FROM immigrations_table
""")
immigrations_table.printSchema()
# immigrations_table.show(5)

root
 |-- cicid: double (nullable = true)
 |-- avg_temp_fahrenheit: double (nullable = true)
 |-- airport_id: string (nullable = true)
 |-- country_id: integer (nullable = true)
 |-- country_name: string (nullable = false)



In [None]:
immigrations_table.toDF('cicid', 'avg_temp_fahrenheit', 'airport_id', 'country_id', \
          'country_name').show(5)

In [None]:
# Write fact table to parquet
immigrations_table.write.parquet("immigrations_table")
print("Done with writing fact table to parquet file")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [85]:
# Check for NULL values in 'cicid', 'avg_temp_fahrenheit', 'airport_id', 'country_id', 'country_name'
# If false is retured for all columns selected then the data is fine across the dataset
immigrations_table.select(isnull('cicid'),\
                             isnull('avg_temp_fahrenheit'),\
                             isnull('airport_id'),\
                             isnull('country_id'),\
                             isnull('country_name')).dropDuplicates().show()

+---------------+-----------------------------+--------------------+--------------------+----------------------+
|(cicid IS NULL)|(avg_temp_fahrenheit IS NULL)|(airport_id IS NULL)|(country_id IS NULL)|(country_name IS NULL)|
+---------------+-----------------------------+--------------------+--------------------+----------------------+
|          false|                        false|               false|               false|                 false|
+---------------+-----------------------------+--------------------+--------------------+----------------------+



In [86]:
spark.sql('SELECT COUNT(*) FROM immigrations_table').show()

+--------+
|count(1)|
+--------+
| 2396484|
+--------+



#### 4.3 Data dictionary 
Create a data dictionary for your data model. 
- See the DataDictionary.md file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

### 1. I used Python, Pandas and Apache Spark to read, transform, and create data outputs for further analysis for this project, because of the he small amount of data and the speed of Spark.

### 2. ETL script should be run quarterly basis (assuming that new I94 data is available once per quarterly). This gives the most up-to-date data for government and organizations.

### 3. Under the following scenarios, I would approach the problem differently:
    
    - If the data was increased by 100x, Input data should be stoted in cloud storage e.g. AWS S3,    
    Clustered Spark will be used to enable parallel processing the data for faster processing.
    Clustered Cloud DB e.g. AWS Redshift should be used to store the data during the processing (staging and final tables).
    Output data (parquet files) should be stored to Cloud storage e.g. AWS S3 for easy access or to a Cloud DB for further analysis. 

    
    - To update on a daily basis I would use Apache Airflow to create a schedule to run a distributed update on all tables with data streamed from the source.
    
    - If the data needs to be accessed by 100+ people, I would use a web app running on Amazon AWS for increased capacity.

