# Project Title
### Data Engineering Capstone Project

#### Project Summary
Create the data pipelines and design ETL to run data to data model using immigration data and other available sources.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [30]:
# Import all necessary packages and libraries for the project
import pandas as pd
import configparser
from datetime import datetime
import os
import glob

from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *
from pyspark.sql.functions import udf, col, to_timestamp, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampType

from us_state_abbrev import state_udf, abbrev_state, abbrev_state_udf,city_code_udf,city_codes
from immigration_codes import country_udf

from pyspark.sql.functions import *

### Step 1: Scope the Project and Gather Data

#### Scope
Based on I94 Immigration Data and some other sources, creating a data model to see information about immigration movement e.g which city most immigration arrive to USA, most common visa type of immigration. 


#### Describe and Gather Data  

Data Sources:
- I94 Immigration Data: This data comes from the US National Tourism and Trade Office (https://travel.trade.gov/research/reports/i94/historical/2016.html)
- World Temperature Data: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data
- U.S. City Demographic Data:https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/
- Airport Code Table https://datahub.io/core/airport-codes#data

Tools: Jupyter notebook, AWS S3 Buckets, AWS Redshift, SparkSQL

In [31]:
	 # Create Spark session

spark = SparkSession.builder.\
        config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
        .enableHiveSupport().getOrCreate()

#Build SQL context object
sqlContext = SQLContext(spark)

In [32]:
# Read in the data here

# read immigration data
df_immigration=spark.read.format('com.github.saurfang.sas.spark').load("../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat")

demo_graphics=spark.read.format("csv").option("header", "true").option("delimiter", ";").load("us-cities-demographics.csv")

airport=spark.read.format("csv").option("header", "true").load("airport-codes_csv.csv")

temperatureData=spark.read.format("csv").option("header", "true").load("GlobalLandTemperaturesByState.csv")



In [33]:
# view top 5 rows of immigration data
df_immigration.show(3)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|validres|delete_days|delete_mexl|delete_dup|delete_visa|delete_recdup|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|  4.0|2016.0|   6.0| 135.0| 135.0|    XXX|20612.0|   null|   null|   null|  59.0|    2.0|  1.0|     1.0|        0.0|    

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

# Performing cleaning tasks here

- Filter out null values from immigration data set df_immigration, get origin country from country_udf by converting i94res codes to country.
- Drop duplicates from df_immigration and temparature data.

- Filter average temperature data only for the United States and only year = 2013 and create new fields with year, month, fahrenheit and run abbreviations function.

- Sort city demographic data then calculate percentages and select percentages fields and drop duplicates.
- Filter airport data for "small_airport" and use substring to return the state code.




## Immigration Data (number of entry by state and origin country)

In [34]:
# filter nulls from data set df_spark
# using all mapping tables from the udf to convert i94res codes to country name
# filter out NULLS and run country_udf function to show state names
# country_udf, abbrev_state_udf and city_code_udf were created with data from i94 SAS labels Descriptions file.

i94_data=df_immigration.filter(df_immigration.i94addr.isNotNull())\
.filter(df_immigration.i94res.isNotNull())\
.filter(col("i94addr").isin(list(abbrev_state.keys())))\
.filter(col("i94port").isin(list(city_codes.keys())))\
.withColumn("origin_country",country_udf(df_immigration["i94res"]))\
.withColumn("dest_state_name",abbrev_state_udf(df_immigration["i94addr"]))\
.withColumn("i94yr",col("i94yr").cast("integer"))\
.withColumn("i94mon",col("i94mon").cast("integer"))\
.withColumn("city_port_name",city_code_udf(df_immigration["i94port"]))

# Create new dataset to view immigration data by state and origin country

I94_Data_by_state_origin_country = i94_data.select("cicid",col("i94yr").alias("year"),col("i94mon").alias("month"),\
                             "origin_country","i94port","city_port_name",col("i94addr").alias("state_code"),"dest_state_name",\
                                                  col("count").alias("number of entry"))

In [35]:
# view first 5 records of the new data frame of immigration data
I94_Data_by_state_origin_country.show(5)

+-----+----+-----+--------------+-------+------------------+----------+---------------+---------------+
|cicid|year|month|origin_country|i94port|    city_port_name|state_code|dest_state_name|number of entry|
+-----+----+-----+--------------+-------+------------------+----------+---------------+---------------+
| 41.0|2016|    6|   SOUTH KOREA|    SFR|SAN FRANCISCO     |        CA|     California|            1.0|
| 42.0|2016|    6|   SOUTH KOREA|    SFR|SAN FRANCISCO     |        CA|     California|            1.0|
| 45.0|2016|    6|       ROMANIA|    HOU|HOUSTON           |        TX|          Texas|            1.0|
| 52.0|2016|    6|       ALBANIA|    BOS|BOSTON            |        MA|  Massachusetts|            1.0|
| 53.0|2016|    6|       ALBANIA|    NEW|NEWARK/TETERBORO  |        PA|   Pennsylvania|            1.0|
+-----+----+-----+--------------+-------+------------------+----------+---------------+---------------+
only showing top 5 rows



# Temperature Data

In [36]:
# From the GlobalLandTemparature filter data 2013 for US only. 
# drop duplicates
# add column average temparature in celcius by converting avg_temp_fahrenheit
# as the sample data set contains data up to 2013 only, so we will filter only for 2013

Temperatures=temperatureData.filter(temperatureData["country"]=="United States")\
.filter(year(temperatureData["dt"])==2013)\
.withColumn("year",year(temperatureData["dt"]))\
.withColumn("month",month(temperatureData["dt"]))\
.withColumn("avg_temp_fahrenheit",temperatureData["AverageTemperature"]*9/5+32)\
.withColumn("state_abbrev",state_udf(temperatureData["State"]))

Temperatures=Temperatures.select("year","month",round(col("AverageTemperature"),1).alias("avg_temp_celcius"),\
                                       round(col("avg_temp_fahrenheit"),1).alias("avg_temp_fahrenheit"),
                                       "state_abbrev","State","Country").dropDuplicates()

In [37]:
Temperatures.show(5)

+----+-----+----------------+-------------------+------------+-------------+-------------+
|year|month|avg_temp_celcius|avg_temp_fahrenheit|state_abbrev|        State|      Country|
+----+-----+----------------+-------------------+------------+-------------+-------------+
|2013|    7|            23.4|               74.1|          MA|Massachusetts|United States|
|2013|    3|            -1.9|               28.5|          SD| South Dakota|United States|
|2013|    9|            14.1|               57.4|          ME|        Maine|United States|
|2013|    1|            -1.3|               29.7|          PA| Pennsylvania|United States|
|2013|    9|            25.1|               77.2|          AL|      Alabama|United States|
+----+-----+----------------+-------------------+------------+-------------+-------------+
only showing top 5 rows



# U.S. Demographic Data by State

In [38]:

# Create new columns for percentages of other dimension (for example: Male Population, Number of Veterans etc.)
# by population

demo_graphics = demo_graphics\
.withColumn("Median Age",col("Median Age").cast("float"))\
.withColumn("pct_male_pop",demo_graphics["Male Population"]/demo_graphics["Total Population"]*100)\
.withColumn("pct_female_pop",demo_graphics["Female Population"]/demo_graphics["Total Population"]*100)\
.withColumn("pct_veterans",demo_graphics["Number of Veterans"]/demo_graphics["Total Population"]*100)\
.withColumn("pct_foreign_born",demo_graphics["Foreign-born"]/demo_graphics["Total Population"]*100)\
.withColumn("pct_race",demo_graphics["Count"]/demo_graphics["Total Population"]*100)\
.orderBy("State")

In [39]:
# Select states and calculated columns from above to view demo graphics data by states

demo_graphics_by_state=demo_graphics.select("State",col("State Code").alias("state_code"),\
                                 col("Median Age").alias("median_age"),\
                                 "pct_male_pop",\
                                 "pct_female_pop",\
                                 "pct_veterans",\
                                 "pct_foreign_born",\
                                 "Race",\
                                 "pct_race")

In [40]:
# show demo graphics data by state
demo_graphics_by_state.show(5)

+-------+----------+----------+------------------+------------------+-----------------+------------------+--------------------+------------------+
|  State|state_code|median_age|      pct_male_pop|    pct_female_pop|     pct_veterans|  pct_foreign_born|                Race|          pct_race|
+-------+----------+----------+------------------+------------------+-----------------+------------------+--------------------+------------------+
|Alabama|        AL|      38.1| 48.52311304292649| 51.47688695707351|8.797339171081992| 6.710767050562094|Black or African-...|32.552322937487446|
|Alabama|        AL|      29.1| 48.09229392503407| 51.90770607496593|3.708637556183774| 4.785535601700258|  Hispanic or Latino| 2.516829709776485|
|Alabama|        AL|      38.9|47.636815920398014|52.363184079601986|9.378701729447998| 2.515695332859512|               Asian|1.7398128405591091|
|Alabama|        AL|      35.4| 47.15284217243477| 52.84715782756524|7.455654931052018|4.6548612565184015|            

In [41]:
# create pivot table for avg percentage of race by state, median_age, male population etc.
pivot_demo_graphics_by_state = demo_graphics_by_state.groupBy("State","state_code","median_age","pct_male_pop",\
                                    "pct_female_pop","pct_veterans",\
                                    "pct_foreign_born").pivot("Race").avg("pct_race")

# rename header for race
pivot_demo_graphics_by_state=pivot_demo_graphics_by_state.select("State","state_code","median_age","pct_male_pop","pct_female_pop","pct_veterans","pct_foreign_born",\
                                         col("American Indian and Alaska Native").alias("native_american"),\
                                         col("Asian"),col("Black or African-American").alias("Black"),\
                                         col("Hispanic or Latino").alias("hispanic_or_latino"),"White")

In [42]:
# Find the average of each column per state. 

pivot = pivot_demo_graphics_by_state.groupBy("State","state_code").avg("median_age","pct_male_pop","pct_female_pop",\
                                                       "pct_veterans","pct_foreign_born","native_american",\
                                                       "Asian","Black","hispanic_or_latino","White").orderBy("State")

In [43]:
# Rounding the percentages and rename columns
results =pivot.select("State","state_code",round(col("avg(median_age)"),1).alias("median_age"),\
                  round(col("avg(pct_male_pop)"),1).alias("pct_male_pop"),\
                   round(col("avg(pct_female_pop)"),1).alias("pct_female_pop"),\
                   round(col("avg(pct_veterans)"),1).alias("pct_veterans"),\
                   round(col("avg(pct_foreign_born)"),1).alias("pct_foreign_born"),\
                   round(col("avg(native_american)"),1).alias("native_american"),\
                   round(col("avg(Asian)"),1).alias("Asian"),\
                   round(col("avg(hispanic_or_latino)"),1).alias("hispanic_or_latino"),\
                   round(col("avg(Black)"),1).alias("Black"),\
                   round(col('avg(White)'),1).alias('White')
                  )

In [44]:
# show the results
results.show(5)

+----------+----------+----------+------------+--------------+------------+----------------+---------------+-----+------------------+-----+-----+
|     State|state_code|median_age|pct_male_pop|pct_female_pop|pct_veterans|pct_foreign_born|native_american|Asian|hispanic_or_latino|Black|White|
+----------+----------+----------+------------+--------------+------------+----------------+---------------+-----+------------------+-----+-----+
|   Alabama|        AL|      36.2|        47.2|          52.8|         6.8|             5.1|            0.8|  2.9|               3.6| 45.0| 52.0|
|    Alaska|        AK|      32.2|        51.2|          48.8|         9.2|            11.1|           12.2| 12.3|               9.1|  7.7| 71.2|
|   Arizona|        AZ|      35.0|        48.8|          51.2|         6.6|            12.6|            2.8|  5.1|              28.8|  6.0| 82.7|
|  Arkansas|        AR|      32.8|        48.4|          51.6|         5.2|            10.7|            1.8|  4.1|          

# U.S. Airport Data by State

In [45]:
# From the airport-codes data, filter only small airport

airport_data=airport.filter(airport["type"]=="small_airport")\
.filter(airport["iso_country"]=="US")\
.withColumn("iso_region",substring(airport["iso_region"],4,2))\
.withColumn("elevation_ft",col("elevation_ft").cast("float"))

# Calculate average elevation_ft per state by group by airport data frame above.
airport_by_elevation_ft=airport_data.groupBy("iso_country","iso_region").avg("elevation_ft")

# Select relevant columns and drop duplicates
airport_avg_elevation_ft=airport_by_elevation_ft.select(col("iso_country").alias("country"),\
                                               col("iso_region").alias("state"),\
                                               round(col("avg(elevation_ft)"),1).alias("avg_elevation_ft")).orderBy("iso_region")

In [46]:
airport_avg_elevation_ft.show(5)

+-------+-----+----------------+
|country|state|avg_elevation_ft|
+-------+-----+----------------+
|     US|   AK|           545.1|
|     US|   AL|           414.6|
|     US|   AR|           488.4|
|     US|   AZ|          3098.0|
|     US|   CA|          1261.4|
+-------+-----+----------------+
only showing top 5 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model




### Star Schema
#### 1/ Dimension Tables
##### airport_table
- country, state, avg_elevation_ft

##### city_stats
- State,state_code,median_age,pct_male_pop,pct_female_pop,
pct_veterans,pct_foreign_born,native_american,Asian,hispan- ic_or_latino,Black,White

##### immigration_table
- cicid,year,month,origin_country,i94port,city_port_us,state,dest_state_us

##### avg_state_temps
- year,month,AverageTemperature,avg_temp_fahrenheit,state_abbrev,State,Country

#### 2/ Fact Table
##### immigration_fact_table
- year, immig_month, immig_origin, immig_state, 'to_immig_state_count', 'avg_temp_fahrenheit', 'avg_elevation_ft', 'pct_foreign_born', 'native_american', 'Asian', 'hispanic_or_latino', 'Black', 'White'

Star scheme was chosen because it's simple and suitable for data analysis later.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1- From the clean dataset above, create 4 dimensions tables (as in schema)

2- Fact table is created as a SQL query with joins to dimension tables.

3- Fact table is converted back to a spark dataframe.

4- Fact table is written as final parquet file.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [47]:
# Write code here

# Dimension tables are created from clean data set above.

I94_Data_by_state_origin_country.createOrReplaceTempView("immigration")
results.createOrReplaceTempView("demographics")
airport_avg_elevation_ft.createOrReplaceTempView("airport")
Temperatures.createOrReplaceTempView("temperature")


In [48]:
# set threshold time to zero for SQL joining and parquet writing the files, 
# so it is allowed to write without limitation for loading data

sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "0")

In [50]:
# Run this query to count number of immigrants to the USA by year, month, states and other dimensions
# This is one of the example that we could use to get the result from tables above.
# The result of this query becomes our fact table now.

immigration_result=spark.sql("""SELECT 
                                    m.year,
                                    m.month AS immig_month,
                                    m.origin_country AS immig_origin,
                                    m.dest_state_name AS to_immig_state,
                                    COUNT(m.state_code) AS to_immig_state_count,
                                    t.avg_temp_fahrenheit,
                                    a.avg_elevation_ft,
                                    d.pct_foreign_born,
                                    d.native_american,
                                    d.Asian,
                                    d.hispanic_or_latino,
                                    d.Black,
                                    d.White
                                    
                                    FROM immigration m JOIN temperature t ON m.state_code=t.state_abbrev AND m.month=t.month
                                    JOIN demographics d ON d.state_code=t.state_abbrev
                                    JOIN airport a ON a.state=t.state_abbrev
                                    
                                    GROUP BY m.year,m.month, m.origin_country,\
                                    m.dest_state_name,m.state_code,t.avg_temp_fahrenheit,a.avg_elevation_ft,\
                                    d.pct_foreign_born,d.native_american,\
                                    d.Asian,d.hispanic_or_latino,\
                                    d.hispanic_or_latino,d.White,\
                                    d.Black
                                    
                                    ORDER BY m.origin_country,m.state_code
                                    
""")

In [51]:
immigration_result.toDF('year', 'immig_month', 'immig_origin', 'to_immig_state', \
          'to_immig_state_count', 'avg_temp_fahrenheit', 'avg_elevation_ft',\
          'pct_foreign_born', 'native_american', 'Asian', 'hispanic_or_latino', 'Black', 'White').show(5)

+----+-----------+------------+--------------+--------------------+-------------------+----------------+----------------+---------------+-----+------------------+-----+-----+
|year|immig_month|immig_origin|to_immig_state|to_immig_state_count|avg_temp_fahrenheit|avg_elevation_ft|pct_foreign_born|native_american|Asian|hispanic_or_latino|Black|White|
+----+-----------+------------+--------------+--------------------+-------------------+----------------+----------------+---------------+-----+------------------+-----+-----+
|2016|          6| AFGHANISTAN|      Arkansas|                   1|               77.7|           488.4|            10.7|            1.8|  4.1|              14.2| 21.8| 68.0|
|2016|          6| AFGHANISTAN|       Arizona|                   1|               79.9|          3098.0|            12.6|            2.8|  5.1|              28.8|  6.0| 82.7|
|2016|          6| AFGHANISTAN|    California|                  31|               72.5|          1261.4|            27.6|    

In [52]:
# Final step is to write the fact table to parquet 

immigration_result.write.parquet("immigration_result")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [57]:
# Perform quality checks here
# Check numbers of records of 4 dimension tables

spark.sql('SELECT COUNT(*) as No_of_records_Immigration FROM immigration').show()
spark.sql('SELECT COUNT(*) as No_of_records_demographics FROM demographics').show()
spark.sql('SELECT COUNT(*) as No_of_records_airport FROM airport').show()
spark.sql('SELECT COUNT(*) as No_of_records_temperature FROM temperature').show()


+-------------------------+
|No_of_records_Immigration|
+-------------------------+
|                  3214208|
+-------------------------+

+--------------------------+
|No_of_records_demographics|
+--------------------------+
|                        49|
+--------------------------+

+---------------------+
|No_of_records_airport|
+---------------------+
|                   51|
+---------------------+

+-------------------------+
|No_of_records_temperature|
+-------------------------+
|                      459|
+-------------------------+



In [60]:

# This select can check whether any null values in year, month, origin, state from the fact table
# Data came through fine, if result is false.
immigration_result.select(isnull('year').alias('year'),\
                             isnull('immig_month').alias('month'),\
                             isnull('immig_origin').alias('country'),\
                             isnull('to_immig_state').alias('state')).dropDuplicates().show()

+-----+-----+-------+-----+
| year|month|country|state|
+-----+-----+-------+-----+
|false|false|  false|false|
+-----+-----+-------+-----+



In [61]:
# For example one of the checks, we can try is getting total number of immigrants

immigration_result.select(sum('to_immig_state_count').alias('Total_No_of_Immigrants')).show()

+----------------------+
|Total_No_of_Immigrants|
+----------------------+
|               3207230|
+----------------------+



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

DataDictionary is in a separate file (DataDictionary.md)

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

1/ Tools and technologies for the project
- For ETL and analysis using Apache Spark >> because the dataset is relative small (3.2 millions). 
- Jupyter notebook for running main scripts, as joining tables for analysis can be quickly demonstrated within the notebook, as well as for data quality check

2/ Data should be updated monthly or quarterly. The result of the data can be created monthly and quarterly reports for the government.
- Some visualisation can be quickly created in jupyter notebook.

3/ Under the following scenarios, I would approach the problem differently:
- The data was increased by 100x, we still could use Apache Spark for ETL. Data will be stored in AWS. Imcremental loads could be used to shorten the time for ETL. 

- Update data on a daily basis by 7am every day. >> Airflow can be used for ETL. We could create schedules for daily data updates. If some visualization on a dashboard is requires, we could the result table can be stored in AWS Redshift, visualisation with facts can be done quickly in Jupyter notebook (using matplotlib, pandas, seaborn etc..)

- This way data needs to be accessed by 100+ people. All we need to do is giving users theirs credentials to AWS Redshift. So they can create their own data analysis. If a standardised or centralised reporting is requires, we could use Tableau, Qlik, Looker for data analysis, visualisation. Those BI tools can connect to AWS Redshift. As soon as the data is updated in AWS Redshif