# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName('Capstone_Proj_1') \
    .config("spark.jars.repositories", "https://repos.spark-packages.org/") \
    .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \
    .enableHiveSupport().getOrCreate()
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, TimestampType ,FloatType, StringType

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

- Plan is to build an ETL pipeline which includes fetching data from various different sources which relates to I94 immigration data of US and then writing it to a desired format in parquet to make it easier for analytics team to perform analysis on it.
- We'll be also using US city/state demographics data. This can be looked into by the analytics team so see if there's any correlation with the volume of people travelling to particular cities. 
- Tools used were Apache Spark, Pandas, Python. We want to use Spark as it distributes the workload over many different cluster nodes, so the data processing is much faster then compared to a tool like Pandas.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. This is where the data comes from: https://www.trade.gov/national-travel-and-tourism-office

- U.S. City Demographic Data: This data comes from OpenSoft. https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/

- I94_SAS_Labels_Descriptions.SAS: Provided by Udacity.

In [3]:
# Read in the data here
df_immigration = spark.read.parquet(r"data/sas_data")

df_demographics = spark.read.options(delimiter=';').csv(r"data/us-cities-demographics.csv", header=True)

country_label = spark.read.options(delimiter=';').csv(r"data/I94_country_label.csv", header=True)

port_label = spark.read.options(delimiter=';').csv(r"data/I94_port_label.csv", header=True)

mode_label = spark.read.options(delimiter=';').csv(r"data/I94_mode_label.csv", header=True)

state_label = spark.read.options(delimiter=';').csv(r"data/I94_state_label.csv", header=True)

visa_label = spark.read.options(delimiter=';').csv(r"data/I94_visa_label.csv", header=True)

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [4]:
df_immigration.count()

3096313

In [12]:
df_immigration.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [13]:
df_demographics.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

In [14]:
country_label.show(5)

+------------+--------------------+
|country_code|        country_name|
+------------+--------------------+
|         582|MEXICO Air Sea, a...|
|         236|         AFGHANISTAN|
|         101|             ALBANIA|
|         316|             ALGERIA|
|         102|             ANDORRA|
+------------+--------------------+
only showing top 5 rows



In [15]:
port_label.show(5)

+---------+--------------------+
|port_code|           port_name|
+---------+--------------------+
|      ALC|           ALCAN, AK|
|      ANC|       ANCHORAGE, AK|
|      BAR|BAKER AAF - BAKER...|
|      DAC|   DALTONS CACHE, AK|
|      PIZ|DEW STATION PT LA...|
+---------+--------------------+
only showing top 5 rows



In [16]:
mode_label.show(5)

+---------+------------+
|mode_code|   mode_name|
+---------+------------+
|        1|         Air|
|        2|         Sea|
|        3|        Land|
|        9|Not reported|
+---------+------------+



In [17]:
state_label.show(5)

+----------+----------+
|state_code|state_name|
+----------+----------+
|        AL|   ALABAMA|
|        AK|    ALASKA|
|        AZ|   ARIZONA|
|        AR|  ARKANSAS|
|        CA|CALIFORNIA|
+----------+----------+
only showing top 5 rows



In [18]:
visa_label.show(5)

+---------+---------+
|visa_code|visa_name|
+---------+---------+
|        1| Business|
|        2| Pleasure|
|        3|  Student|
+---------+---------+



In [19]:
# Performing cleaning tasks here


df_immigration = df_immigration.select('i94yr', 'i94mon', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa' \
                   ,'biryear', 'gender', 'airline', 'visatype')

df_immigration = df_immigration.withColumnRenamed('i94yr', 'year') \
                    .withColumnRenamed('i94mon', 'month') \
                    .withColumnRenamed('i94res', 'ctry_of_res_code') \
                    .withColumnRenamed('i94port', 'port_code') \
                    .withColumnRenamed('arrdate', 'arrvl_dt_sas') \
                    .withColumnRenamed('i94mode', 'mode_code') \
                    .withColumnRenamed('i94addr', 'dest_state_code') \
                    .withColumnRenamed('depdate', 'depart_dt_sas') \
                    .withColumnRenamed('i94bir', 'age') \
                    .withColumnRenamed('i94visa', 'visa_code') \
                    .withColumnRenamed('biryear', 'dob') \
                    .withColumnRenamed('visatype', 'visa_type')

In [20]:
df_immigration.filter(F.col("dest_state_code").isNull()).count()

152592

In [21]:
# We'll filter out the records with missing state code

df_immigration = df_immigration.filter(F.col("dest_state_code").isNotNull())

In [22]:

df_immigration.groupBy("gender").count().show()

+------+-------+
|gender|  count|
+------+-------+
|     F|1242009|
|  null| 392319|
|     M|1308290|
|     U|    325|
|     X|    778|
+------+-------+



In [23]:
# We'll filter out the records with gender data

df_immigration = df_immigration.filter(F.col("gender").isNotNull())

In [24]:
# Everything looks good here 

df_immigration.groupBy("mode_code").count().show()

+---------+-------+
|mode_code|  count|
+---------+-------+
|      1.0|2485666|
|      3.0|  47618|
|      2.0|  11792|
|      9.0|   6326|
+---------+-------+



In [25]:
# Everything looks good here 

df_immigration.groupBy("visa_code").count().show()

+---------+-------+
|visa_code|  count|
+---------+-------+
|      1.0| 394817|
|      3.0|  40238|
|      2.0|2116347|
+---------+-------+



In [26]:
# Everything looks good here , no nulls

df_immigration.filter(F.col("ctry_of_res_code").isNull()).count()

0

In [27]:
# Everything looks good here, no nulls

df_immigration.filter(F.col("port_code").isNull()).count()

0

In [28]:
# Convert SAS format date to readable format

df_immigration.createOrReplaceTempView("immigration_table")
df_immigration = spark.sql("SELECT *, date_add(to_date('1960-01-01'), int(arrvl_dt_sas)) AS arrvl_dt FROM immigration_table")

In [29]:
# Convert SAS format date to readable format

df_immigration.createOrReplaceTempView("immigration_table")
df_immigration = spark.sql("SELECT *, date_add(to_date('1960-01-01'), int(depart_dt_sas)) AS depart_dt FROM immigration_table")

In [30]:
# drop SAS format date columns

df_immigration = df_immigration.drop("arrvl_dt_sas","depart_dt_sas")

In [31]:
df_immigration.show(5)

+------+-----+----------------+---------+---------+---------------+----+---------+------+------+-------+---------+----------+----------+
|  year|month|ctry_of_res_code|port_code|mode_code|dest_state_code| age|visa_code|   dob|gender|airline|visa_type|  arrvl_dt| depart_dt|
+------+-----+----------------+---------+---------+---------------+----+---------+------+------+-------+---------+----------+----------+
|2016.0|  4.0|           438.0|      LOS|      1.0|             CA|40.0|      1.0|1976.0|     F|     QF|       B1|2016-04-30|2016-05-08|
|2016.0|  4.0|           438.0|      LOS|      1.0|             NV|32.0|      1.0|1984.0|     F|     VA|       B1|2016-04-30|2016-05-17|
|2016.0|  4.0|           438.0|      LOS|      1.0|             WA|29.0|      1.0|1987.0|     M|     DL|       B1|2016-04-30|2016-05-08|
|2016.0|  4.0|           438.0|      LOS|      1.0|             WA|29.0|      1.0|1987.0|     F|     DL|       B1|2016-04-30|2016-05-14|
|2016.0|  4.0|           438.0|      LOS|

In [32]:
# Everything looks good here , no nulls

df_immigration.filter(F.col("arrvl_dt").isNull()).count()

0

In [33]:
## Creat dimension table date from the extracted values df_immigration

dim_date = df_immigration.select(F.col("arrvl_dt"),
                            F.year(df_immigration["arrvl_dt"]).alias("year"),
                            F.month(df_immigration["arrvl_dt"]).alias("month"),
                            F.weekofyear(df_immigration["arrvl_dt"]).alias("week_of_year"),
                            F.dayofyear(df_immigration["arrvl_dt"]).alias("day_of_year"),
                            F.dayofmonth(df_immigration["arrvl_dt"]).alias("day_of_month"),
                            F.dayofweek(df_immigration["arrvl_dt"]).alias("day_of_week"),
                            F.quarter(df_immigration["arrvl_dt"]).alias("quarter")
                           ).dropDuplicates(["arrvl_dt"])

In [44]:
dim_date.show(5)

+----------+----+-----+------------+-----------+------------+-----------+-------+
|  arrvl_dt|year|month|week_of_year|day_of_year|day_of_month|day_of_week|quarter|
+----------+----+-----+------------+-----------+------------+-----------+-------+
|2016-04-25|2016|    4|          17|        116|          25|          2|      2|
|2016-04-22|2016|    4|          16|        113|          22|          6|      2|
|2016-04-30|2016|    4|          17|        121|          30|          7|      2|
|2016-04-26|2016|    4|          17|        117|          26|          3|      2|
|2016-04-04|2016|    4|          14|         95|           4|          2|      2|
+----------+----+-----+------------+-----------+------------+-----------+-------+
only showing top 5 rows



In [35]:
# Aggregate the Dimension demographics table over the state code column

dim_demographics = df_demographics.groupBy(df_demographics["State Code"].alias("state_code")).agg(
    F.avg("Median Age").cast(FloatType()).alias("medn_age"),
    F.sum("Male Population").cast(IntegerType()).alias("male_pop"),
    F.sum("Female Population").cast(IntegerType()).alias("female_pop"),
    F.sum("Total Population").cast(IntegerType()).alias("total_pop"),
    F.sum("Number of Veterans").cast(IntegerType()).alias("vet_cnt"),
    F.sum("Foreign-born").cast(IntegerType()).alias("foreign_born"),
    F.avg("Average Household Size").cast(FloatType()).alias("avg_hsehld_size")
)

In [45]:
dim_demographics.show(5)

+----------+---------+--------+----------+---------+-------+------------+---------------+
|state_code| medn_age|male_pop|female_pop|total_pop|vet_cnt|foreign_born|avg_hsehld_size|
+----------+---------+--------+----------+---------+-------+------------+---------------+
|        AZ|  35.0375|11137275|  11360435| 22497710|1322525|     3411565|       2.774375|
|        SC|   33.825| 1265291|   1321685|  2586976| 163334|      134019|      2.4695833|
|        LA|   34.625| 3134990|   3367985|  6502975| 348855|      417095|          2.465|
|        MN|35.579628| 3478803|   3565362|  7044165| 321738|     1069888|       2.496852|
|        NJ|35.254387| 3423033|   3507991|  6931024| 146632|     2327750|      2.9608772|
+----------+---------+--------+----------+---------+-------+------------+---------------+
only showing top 5 rows



In [46]:
# Everything looks good here. No duplicate values of state_code, 
# aggregation was performed successfully

dim_demographics.groupBy("state_code").count().where("count > 1").show(5)

+----------+-----+
|state_code|count|
+----------+-----+
+----------+-----+



In [39]:
# Create Dimension table state

dim_state = state_label.alias('dim_state')

dim_state.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- state_name: string (nullable = true)



In [40]:
# Create Dimension table country

dim_country = country_label.alias('dim_country')

In [41]:
# change datatype of numeric code value to appropriate integer value
dim_country = dim_country.withColumn("country_code", F.col("country_code").cast(IntegerType()))

dim_country.printSchema()

root
 |-- country_code: integer (nullable = true)
 |-- country_name: string (nullable = true)



In [42]:
# Create Dimension table port

dim_port = port_label.alias('dim_port')

dim_port.printSchema()

root
 |-- port_code: string (nullable = true)
 |-- port_name: string (nullable = true)



In [47]:
# Create Dimension table mode

dim_mode = mode_label.alias('dim_mode')

dim_mode.show(5)

+---------+------------+
|mode_code|   mode_name|
+---------+------------+
|        1|         Air|
|        2|         Sea|
|        3|        Land|
|        9|Not reported|
+---------+------------+



In [48]:
# change datatype of numeric code value to appropriate integer value

dim_mode = dim_mode.withColumn("mode_code", F.col("mode_code").cast(IntegerType()))

dim_mode.printSchema()

root
 |-- mode_code: integer (nullable = true)
 |-- mode_name: string (nullable = true)



In [49]:
# Create Dimension table state

dim_visa = visa_label.alias('dim_visa')

dim_visa.show(5)

+---------+---------+
|visa_code|visa_name|
+---------+---------+
|        1| Business|
|        2| Pleasure|
|        3|  Student|
+---------+---------+



In [50]:
# change datatype of numeric code value to appropriate integer value

dim_visa = dim_visa.withColumn("visa_code", F.col("visa_code").cast(IntegerType()))

dim_visa.printSchema()

root
 |-- visa_code: integer (nullable = true)
 |-- visa_name: string (nullable = true)



In [51]:
# Create Fact table immigration

fact_immigration = df_immigration.alias('fact_immigration')

fact_immigration.show(5)

+------+-----+----------------+---------+---------+---------------+----+---------+------+------+-------+---------+----------+----------+
|  year|month|ctry_of_res_code|port_code|mode_code|dest_state_code| age|visa_code|   dob|gender|airline|visa_type|  arrvl_dt| depart_dt|
+------+-----+----------------+---------+---------+---------------+----+---------+------+------+-------+---------+----------+----------+
|2016.0|  4.0|           438.0|      LOS|      1.0|             CA|40.0|      1.0|1976.0|     F|     QF|       B1|2016-04-30|2016-05-08|
|2016.0|  4.0|           438.0|      LOS|      1.0|             NV|32.0|      1.0|1984.0|     F|     VA|       B1|2016-04-30|2016-05-17|
|2016.0|  4.0|           438.0|      LOS|      1.0|             WA|29.0|      1.0|1987.0|     M|     DL|       B1|2016-04-30|2016-05-08|
|2016.0|  4.0|           438.0|      LOS|      1.0|             WA|29.0|      1.0|1987.0|     F|     DL|       B1|2016-04-30|2016-05-14|
|2016.0|  4.0|           438.0|      LOS|

In [52]:
fact_immigration = fact_immigration.withColumn("year", F.col("year").cast(IntegerType())) \
                                    .withColumn("month", F.col("month").cast(IntegerType())) \
                                    .withColumn("ctry_of_res_code", F.col("ctry_of_res_code").cast(IntegerType())) \
                                    .withColumn("mode_code", F.col("mode_code").cast(IntegerType())) \
                                    .withColumn("age", F.col("age").cast(IntegerType())) \
                                    .withColumn("visa_code", F.col("visa_code").cast(IntegerType())) \
                                    .withColumn("dob", F.col("dob").cast(IntegerType()))

fact_immigration.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- ctry_of_res_code: integer (nullable = true)
 |-- port_code: string (nullable = true)
 |-- mode_code: integer (nullable = true)
 |-- dest_state_code: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- visa_code: integer (nullable = true)
 |-- dob: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- arrvl_dt: date (nullable = true)
 |-- depart_dt: date (nullable = true)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model




- Conceptual Data Model is provided in the Readme.md file

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

Transform the 2 SAS date value columns 'arrvl_dt_sas' & 'depart_dt_sas' located in the immigrations table into standard readable date format.

Create a new dimension table 'dim_date' by extracting various date related values from the 'arrvl_dt' located in the immigrations table.

Transfrom the demographics table over the state code column to get demographics data per state.

Write all Dimension table to Parquet files

Write the Fact table to Parquet files.

Clean the data wherever necessary.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [55]:
# Run the ETL pipeline which fetches data from source and writes to parquet

%run -i etl.py

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [57]:
# Perform Data Quality checks to validate Counts and Integrity

%run -i quality_checks.py

Count Check Successful for fact_immigration
Integrity Check Successfull for fact_immigration
Count Check Successful for dim_date
Integrity Check Successfull for dim_date
Count Check Successful for dim_demographics
Integrity Check Successfull for dim_demographics
Count Check Successful for dim_country
Integrity Check Successfull for dim_country
Count Check Successful for dim_port
Integrity Check Successfull for dim_port
Count Check Successful for dim_mode
Integrity Check Successfull for dim_mode
Count Check Successful for dim_state
Integrity Check Successfull for dim_state
Count Check Successful for dim_visa
Integrity Check Successfull for dim_visa


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

- Data Dictionary is provided in the Readme.md file

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#####     Answers

- Apache Spark was chosen for this project since the source data is huge in size with records in millions. But since Spark distributes the workload over many different cluster nodes, the data processing is much faster then compared to a tool like Pandas.

- Data can be updated daily since the immigration facts table is partitioned on year and month, so new data could be joined frequently.

- If the data size was increased 100x then more nodes should be added in the cluster to increase the processing power.

- Apache Airflow can be utilized to schedule daily job runs so data could be updated by 7 AM on the dashboard.

- The data from parquet files could be copied over to Amazon Redshift which can handle data being accessed by 100+ people since it's a massive parallel   processing data warehouse.