# Project Title
### Data Engineering Capstone Project

#### Project Summary
Aggregate and analyze US immigration data, by combining it with airport codes and city demographic.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here

# Dataframe manipulation
import pandas as pd 
# File system interaction
import os
import glob
# Lookup state/city/immigration codes
from codesAndAbbreviations import us_state_codes, state_udf, state_abrvtn, state_abrvtn_udf, city_codes, city_udf, immigration_codes, immigration_udf
# For our ETL/ELT
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *

In [2]:
# Create Spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [3]:
# Create SQLContext
sqlContext = SQLContext(spark)

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, we will combine data 3 types of data:
1. US Immigration Data (SAS)
2. US City Demographics Data (csv)
3. World Airport Data (csv)
and create a data model that will enable analysis of immigration data.




#### Describe and Gather Data 
Used the following Udacity provided datasets - 
1. I94 Immigration Data: This data comes from the US National Tourism and Trade Office (https://travel.trade.gov/research/reports/i94/historical/2016.html). A data dictionary is included in the workspace. There's a sample file to set a sense of the data in csv format before reading it all in. The entire dataset will not be used.
2. U.S. City Demographic Data: This data comes from OpenSoft (https://public.opendatasoft.com).
3. Airport Code Table: This is a simple table of airport codes and corresponding cities (https://datahub.io/core/airport-codes#data).

In [4]:
# Read in the data here

# Creating dataframes
df_demographics = spark.read.format("csv").option("header", "true").option("delimiter", ";").load("us-cities-demographics.csv")
df_airport = spark.read.format("csv").option("header", "true").load("airport-codes_csv.csv")
df_immigration = spark.read.format('com.github.saurfang.sas.spark').load("../../data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat")


In [5]:
df_demographics.head()

Row(City='Silver Spring', State='Maryland', Median Age='33.8', Male Population='40601', Female Population='41862', Total Population='82463', Number of Veterans='1562', Foreign-born='30908', Average Household Size='2.6', State Code='MD', Race='Hispanic or Latino', Count='25924')

In [6]:
df_airport.head()

Row(ident='00A', type='heliport', name='Total Rf Heliport', elevation_ft='11', continent='NA', iso_country='US', iso_region='US-PA', municipality='Bensalem', gps_code='00A', iata_code=None, local_code='00A', coordinates='-74.93360137939453, 40.07080078125')

In [7]:
df_immigration.head()

Row(cicid=8.0, i94yr=2016.0, i94mon=10.0, i94cit=111.0, i94res=111.0, i94port='CHM', arrdate=20728.0, i94mode=1.0, i94addr='HI', depdate=None, i94bir=70.0, i94visa=2.0, count=1.0, dtadfile='20161001', visapost=None, occup=None, entdepa='A', entdepd=None, entdepu=None, matflag=None, biryear=1946.0, dtaddto='12292016', gender='F', insnum='5245', airline='AF', admnum=67011135827.0, fltno='00342', visatype='WT')

In [8]:
df_immigration.count()

3649136

### Step 2: Explore and Assess the Data
#### Explore the Data 
Data quality issues revolved around the following factors:
* NULL values
* Finding the right codes
* Filtering the data we don't wish to report on
* Performing aggregations

#### Cleaning Steps
Data quality issues nd the steps to clean up the data have been discussed #as we perform the cleanup in the next few steps.


In [9]:
# Performing cleaning tasks here

# 1st task: Clean up I94/immigration data
# - remove nulls
# - extract country of origin using immigration codes
# - extract State names

# We will use these udfs that we imported above
# - state_abrvtn_udf, city_udf, immigration_udf
# country_udf, abbrev_state_udf and city_code_udf were created with data from i94 SAS labels Descriptions file.

immigration_data = df_immigration.filter(df_immigration.i94addr.isNotNull())\
.filter(df_immigration.i94res.isNotNull())\
.filter(col("i94addr").isin(list(state_abrvtn.keys())))\
.filter(col("i94port").isin(list(city_codes.keys())))\
.withColumn("origin_country",immigration_udf(df_immigration["i94res"]))\
.withColumn("dest_state_name",state_abrvtn_udf(df_immigration["i94addr"]))\
.withColumn("i94yr",col("i94yr").cast("integer"))\
.withColumn("i94mon",col("i94mon").cast("integer"))\
.withColumn("city_port_name",city_udf(df_immigration["i94port"]))

imgrtn_data = immigration_data.select("cicid",col("i94yr").alias("year"),col("i94mon").alias("month"),\
                             "origin_country","i94port","city_port_name",\
                             col("i94addr").alias("state_code"),"dest_state_name")








In [10]:
imgrtn_data.show(5)

+-----+----+-----+--------------+-------+------------------+----------+---------------+
|cicid|year|month|origin_country|i94port|    city_port_name|state_code|dest_state_name|
+-----+----+-----+--------------+-------+------------------+----------+---------------+
|  8.0|2016|   10|        FRANCE|    CHM|CHAMPLAIN         |        HI|         Hawaii|
|100.0|2016|   10|        TAIWAN|    AGA|AGANA             |        NY|       New York|
|101.0|2016|   10|        TAIWAN|    AGA|AGANA             |        NY|       New York|
|102.0|2016|   10|        TAIWAN|    AGA|AGANA             |        NY|       New York|
|103.0|2016|   10|        TAIWAN|    AGA|AGANA             |        MA|  Massachusetts|
+-----+----+-----+--------------+-------+------------------+----------+---------------+
only showing top 5 rows



In [11]:
# 2nd task: Clean up demographic data
# - represent the different factions of the population in percentages

demographics_data = df_demographics\
.withColumn("Median Age",col("Median Age").cast("float"))\
.withColumn("pct_male",df_demographics["Male Population"]/df_demographics["Total Population"]*100)\
.withColumn("pct_female",df_demographics["Female Population"]/df_demographics["Total Population"]*100)\
.withColumn("pct_veterans",df_demographics["Number of Veterans"]/df_demographics["Total Population"]*100)\
.withColumn("pct_foreign_born",df_demographics["Foreign-born"]/df_demographics["Total Population"]*100)\
.withColumn("pct_race",df_demographics["Count"]/df_demographics["Total Population"]*100)\
.orderBy("State")

dmgrphcs_data = demographics_data.select("State",col("State Code").alias("state_code"),\
                                 col("Median Age").alias("median_age"),\
                                 "pct_male","pct_female","pct_veterans",\
                                 "pct_foreign_born","Race","pct_race")


In [12]:
dmgrphcs_data.show(5)

+-------+----------+----------+------------------+------------------+-----------------+------------------+--------------------+------------------+
|  State|state_code|median_age|          pct_male|        pct_female|     pct_veterans|  pct_foreign_born|                Race|          pct_race|
+-------+----------+----------+------------------+------------------+-----------------+------------------+--------------------+------------------+
|Alabama|        AL|      38.1| 48.52311304292649| 51.47688695707351|8.797339171081992| 6.710767050562094|Black or African-...|32.552322937487446|
|Alabama|        AL|      29.1| 48.09229392503407| 51.90770607496593|3.708637556183774| 4.785535601700258|  Hispanic or Latino| 2.516829709776485|
|Alabama|        AL|      38.9|47.636815920398014|52.363184079601986|9.378701729447998| 2.515695332859512|               Asian|1.7398128405591091|
|Alabama|        AL|      35.4| 47.15284217243477| 52.84715782756524|7.455654931052018|4.6548612565184015|            

In [13]:
# 3rd task: Aggregate demographic data
# Based on 
# - Race
# - State

agg_dmgrphcs_data = dmgrphcs_data.groupBy("State","state_code","median_age","pct_male",\
                                    "pct_female","pct_veterans",\
                                    "pct_foreign_born").pivot("Race").avg("pct_race")

# Rename column headers
agg_dmgrphcs_data = agg_dmgrphcs_data.select("State","state_code","median_age","pct_male","pct_female","pct_veterans","pct_foreign_born",\
                                         col("American Indian and Alaska Native").alias("Native_American"),\
                                         col("Asian"),col("Black or African-American").alias("African"),\
                                         col("Hispanic or Latino").alias("South_American"),"White")


# Find average numbers of different population segments by state
avg_dmgrphcs_data = agg_dmgrphcs_data.groupBy("State","state_code").avg("median_age","pct_male","pct_female",\
                                                       "pct_veterans","pct_foreign_born","Native_American",\
                                                       "Asian","African","South_American","White").orderBy("State")

# Data selection and rounding all the numbers to make them readable
avg_dmgrphcs_data = avg_dmgrphcs_data.select("State","state_code",round(col("avg(median_age)"),2).alias("median_age"),\
                  round(col("avg(pct_male)"),2).alias("pct_male"),\
                  round(col("avg(pct_female)"),2).alias("pct_female"),\
                  round(col("avg(pct_veterans)"),2).alias("pct_veterans"),\
                  round(col("avg(pct_foreign_born)"),2).alias("pct_foreign_born"),\
                  round(col("avg(Native_American)"),2).alias("Native_American"),\
                  round(col("avg(Asian)"),2).alias("Asian"),\
                  round(col("avg(South_American)"),2).alias("South_American"),\
                  round(col("avg(African)"),2).alias("African"),\
                  round(col('avg(White)'),2).alias('White')
                  )

In [14]:
avg_dmgrphcs_data.show(5)

+----------+----------+----------+--------+----------+------------+----------------+---------------+-----+--------------+-------+-----+
|     State|state_code|median_age|pct_male|pct_female|pct_veterans|pct_foreign_born|Native_American|Asian|South_American|African|White|
+----------+----------+----------+--------+----------+------------+----------------+---------------+-----+--------------+-------+-----+
|   Alabama|        AL|     36.23|   47.25|     52.75|        6.76|            5.13|           0.81| 2.91|          3.57|  45.01|52.04|
|    Alaska|        AK|      32.2|    51.2|      48.8|         9.2|           11.13|          12.17|12.33|          9.13|   7.74|71.21|
|   Arizona|        AZ|     35.04|   48.81|     51.19|        6.61|           12.64|           2.82| 5.13|         28.77|   6.01|82.68|
|  Arkansas|        AR|     32.77|   48.41|     51.59|        5.21|           10.72|           1.83|  4.1|         14.17|  21.85|68.03|
|California|        CA|     36.18|   49.36|     

In [15]:
# 4th task: Cleanup Airport data
# Cleanup involves
# - Adding State codes
# - Filtering small airports in the US

airport_data = df_airport.filter(df_airport["type"]=="small_airport")\
.filter(df_airport["iso_country"]=="US")\
.withColumn("iso_region",substring(df_airport["iso_region"],4,2))\
.withColumn("elevation_ft",col("elevation_ft").cast("float"))

# Calculate avaerge elevations
avg_elevation = airport_data.groupBy("iso_country","iso_region").avg("elevation_ft")

# View non-duplicate data
arprt_data = avg_elevation.select(col("iso_country").alias("country"),\
                                               col("iso_region").alias("state"),\
                                               round(col("avg(elevation_ft)"),2).alias("avg_elevation")).orderBy("iso_region")

In [16]:
arprt_data.show(5)

+-------+-----+-------------+
|country|state|avg_elevation|
+-------+-----+-------------+
|     US|   AK|       545.07|
|     US|   AL|        414.6|
|     US|   AR|       488.45|
|     US|   AZ|      3098.01|
|     US|   CA|      1261.37|
+-------+-----+-------------+
only showing top 5 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

We will implement the standard Star schema model widely used for Data Analytics
1. Dimension Tables
   - airport
     Columns: country, state, avg_elevation
   - demographics
     Columns: State, state_code, median_age, pct_male, pct_female, pct_veterans, pct_foreign_born, Native_American, Asian, South_American, African, White
   - i94_immigration
     Columns: cicid, year, month, origin_country, i94port, city_port_us, state, dest_state_us
2. Fact Table
   - immigration_fact
     Columns: year, imm_month, imm_origin, imm_state, 'to_imm_state_count', 'avg_elevation', 'pct_foreign_born', 'Native_American', 'Asian', 'South_American', 'African', 'White'

#### 3.2 Mapping Out Data Pipelines

Steps to create data pipeline:
1. Dimention tables will be loaded first using the clean data, extracted above.
2. Create Fact table..
   a. using SQL query that joins it to the dimension tables.
   b. convert the Fact table to a spark dataframe.
   c. use the spark data frame to write the data to a parquet file.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [17]:
# Write code here

# Dimension tables

imgrtn_data.createOrReplaceTempView("immigration")
avg_dmgrphcs_data.createOrReplaceTempView("demographics")
arprt_data.createOrReplaceTempView("airport")

In [18]:
# Since fact table is large, we need the ETL to not timeout...

sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "0")

In [19]:
# Fact table: Immigration data by State

immigration_by_states = spark.sql("""SELECT 
                                    i.year,
                                    i.month AS imm_month,
                                    i.origin_country AS imm_origin,
                                    i.dest_state_name AS to_imm_state,
                                    COUNT(i.state_code) AS to_imm_state_count,
                                    a.avg_elevation,
                                    d.pct_foreign_born,
                                    d.Native_American,
                                    d.Asian,
                                    d.South_American,
                                    d.African,
                                    d.White
                                    FROM immigration i JOIN demographics d ON i.state_code=d.state_code
                                    JOIN airport a ON a.state=d.state_code
                                    GROUP BY i.year,i.month, i.origin_country,\
                                    i.dest_state_name,i.state_code,a.avg_elevation,\
                                    d.pct_foreign_born,d.Native_American,\
                                    d.Asian,d.South_American,\
                                    d.White,d.African
                                    ORDER BY i.origin_country,i.state_code
                                    
""")

In [20]:
# Converting SQL table to Spark DF...

immigration_by_states.toDF('year', 'imm_month', 'imm_origin', 'to_imm_state', \
          'to_imm_state_count', 'avg_elevation',\
          'pct_foreign_born', 'Native_American', 'Asian', 'South_American', 'African', 'White').show(5)

+----+---------+-----------+------------+------------------+-------------+----------------+---------------+-----+--------------+-------+-----+
|year|imm_month| imm_origin|to_imm_state|to_imm_state_count|avg_elevation|pct_foreign_born|Native_American|Asian|South_American|African|White|
+----+---------+-----------+------------+------------------+-------------+----------------+---------------+-----+--------------+-------+-----+
|2016|       10|AFGHANISTAN|     Arizona|                 3|      3098.01|           12.64|           2.82| 5.13|         28.77|   6.01|82.68|
|2016|       10|AFGHANISTAN|  California|                35|      1261.37|           27.57|           1.67|17.93|         37.81|   7.45|62.67|
|2016|       10|AFGHANISTAN|    Colorado|                 1|      5912.82|            9.58|           2.04| 4.93|          22.2|   4.21|87.95|
|2016|       10|AFGHANISTAN| Connecticut|                 3|       490.29|           25.21|           1.26| 5.34|         34.81|  24.28|59.61|

In [22]:
# Write Fact table to parquet

immigration_by_states.write.parquet("immigration_by_states")

#### 4.2 Data Quality Checks
Running quality checks include checking for NULL values and target table record count.

In [23]:
# Perform quality checks here

# Checking for NULL values in year, month, to_immig_state

immigration_by_states.select(isnull('year'),\
                             isnull('imm_month'),\
                             isnull('to_imm_state')).dropDuplicates().show()

+--------------+-------------------+----------------------+
|(year IS NULL)|(imm_month IS NULL)|(to_imm_state IS NULL)|
+--------------+-------------------+----------------------+
|         false|              false|                 false|
+--------------+-------------------+----------------------+



In [24]:
# Checking if the target fact table is even loaded by counting the no. of records in it.

immigration_by_states.select(sum('to_imm_state_count')).show()

+-----------------------+
|sum(to_imm_state_count)|
+-----------------------+
|                3330304|
+-----------------------+



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.


##### Please check README.md to find the details about the Data Dictionary!


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
##### Answer: Apache Spark / PySpark module in Python has been used to perform the ETL for this project. I tried using Dataframes alone at first. With Dataframes, it is very easy and quick to process the dimension tables / smaller data. As we move to millions of records / our Fact table, theprocessing speed reduces a lot. 
* Propose how often the data should be updated and why.
##### Answer: Monthly, typically immigration services makes very many decisions evry month and their decisions/strategies remain the same for that whole month.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
##### Answer: I will move up to Hadoop distributed systems to allow better parallel processing. 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
##### Answer: Airflow is the best solution in this case to manage and schedule data pipelines.
 * The database needed to be accessed by 100+ people.
##### Answer: This is generally a sensitive data, ideally should not be accessed by everyone. So, I would create a Web App solution that can only be accessed by the government employees.
 
 
 

