## Data Engineering for getting inside of Immigrants Destination Choice
### Capstone Project

#### Project Summary
The goal of this project is to create an ETL pipeline using I94 immigration data, city temperature data and U.S. demographics states data to form a database that is optimized for queries on immigration events. This database can be used to answer questions relating immigration behavior to destination demographics and/or temperature. For example do people tend to immigrate to warmer places? or how state divercity does play role on immigrants destination choice.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Wrangling the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here

import pandas as pd
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from i94_label import immigration_codes, state_abbrev, abbrev_state, i94_port, country_udf,\
state_udf, abbrev_state_udf, city_code_udf, state_code_udf, i94_model, i94_visa, arr_udf, visa_udf

### Step 1: Scope the Project and Gather Data

#### Scope 
For this project, 3 different data set has been used and pull data from all sources and create fact and dimension tables to show movement of immigration.
I94 immigration data, temperature data and U.S. cities demographics used for dimensional tables. Spark will be used to process the data.

#### Describe and Gather Data 
The I94 immigration [data](https://travel.trade.gov/research/reports/i94/historical/2016.html) comes from the US National Tourism and Trade Office. It is provided in SAS7BDAT [format](https://cran.r-project.org/web/packages/sas7bdat/vignettes/sas7bdat.pdf) which is a binary database storage format. Some relevant attributes include:
* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of entering USA city
* i94mode = arriving model
* depdate = departure date from the USA
* i94visa = type of  visa

The temperature [data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) comes from Kaggle. It is provided in csv format. Some relevant attributes include:
* AverageTemperature = average temperature (Celsius)
* City
* Country
* Latitude
* Longitude 

U.S. City Demographic Data: comes from [OpenSoft](https://public.opendatasoft.com/) and includes data by city, state, age, population, veteran status and race. Some relevant attributes include:
* City
* State
* Total number of population
* Race and their number
* Number of Veterans
* Number of Foreign born

### Step 2: Wrangling the Data

After accessing and exploring on datasets, for each datasets some steps applied and needed columns for data model selected in final step.

#### Accessing Data

In [2]:
# Create Spark session with SAS7BDAT jar
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [3]:
#Build SQL context object
sqlContext = SQLContext(spark)

In [4]:
# Read in the data presented in workspace
demog=spark.read.format("csv").option("header", "true").option("delimiter", ";").load("us-cities-demographics.csv")
airport=spark.read.format("csv").option("header", "true").load("airport-codes_csv.csv")

In [5]:
# Reading I94 and Temperature dataset
df_i94 =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat')
temp =spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

#### Cleaning Data
##### 1. Temperature Dataset:
    - Removing null rows
    - Filtering on country by United State
    - Filtering on last year on dataset, 2013
    - Adding state name baseed on city name
    - Selecting needed columns and change their names

In [6]:
# Clean temperature data

# Valid City and add their State column     
@udf
def get_state(city):
    '''
    This function take City from Tempreture dataset and check the city name by "i94 valid" list and
    return corresponded state:
    
    Input: City name
    Outpot: Corresponding i94port and State

    '''
    for key in i94_port:
        if city.lower() in i94_port[key].rsplit()[0].lower():
            return i94_port[key].rsplit()[-1]
        

# Add State column
temp= temp.withColumn("state_code", get_state(temp["City"]))\
.filter(temp.Country == "United States")\
.filter(year(temp["dt"]) == 2013)\
.withColumn("month" ,month(temp["dt"]))\
.withColumn("year" ,year(temp["dt"]))

In [7]:
# Show Result
temp.show(1)

+----------+------------------+-----------------------------+-------+-------------+--------+---------+----------+-----+----+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|state_code|month|year|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+----------+-----+----+
|2013-01-01|              6.32|                        0.267|Abilene|United States|  32.95N|  100.53W|      null|    1|2013|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+----------+-----+----+
only showing top 1 row



In [8]:
temp= temp.filter(temp['state_code'] != 'null').withColumn("state", abbrev_state_udf(temp['state_code']))

In [9]:
# Show Result
temp.show(1)

+----------+------------------+-----------------------------+-----+-------------+--------+---------+----------+-----+----+-----+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|      Country|Latitude|Longitude|state_code|month|year|state|
+----------+------------------+-----------------------------+-----+-------------+--------+---------+----------+-----+----+-----+
|2013-01-01|            -1.086|                         0.22|Akron|United States|  40.99N|   80.95W|        OH|    1|2013| Ohio|
+----------+------------------+-----------------------------+-----+-------------+--------+---------+----------+-----+----+-----+
only showing top 1 row



In [10]:
# Selecting needed columns
new_temp= temp.select("year","month",round(col("AverageTemperature"),1).alias("avg_temp_celsius"),"state_code", "state")

In [11]:
# Show Result
new_temp.show(10)

+----+-----+----------------+----------+----------+
|year|month|avg_temp_celsius|state_code|     state|
+----+-----+----------------+----------+----------+
|2013|    1|            -1.1|        OH|      Ohio|
|2013|    2|            -2.2|        OH|      Ohio|
|2013|    3|             1.3|        OH|      Ohio|
|2013|    4|             9.7|        OH|      Ohio|
|2013|    5|            16.8|        OH|      Ohio|
|2013|    6|            20.6|        OH|      Ohio|
|2013|    7|            23.0|        OH|      Ohio|
|2013|    8|            21.0|        OH|      Ohio|
|2013|    9|            17.8|        OH|      Ohio|
|2013|    1|            -2.0|        NM|New Mexico|
+----+-----+----------------+----------+----------+
only showing top 10 rows



##### 2. I94 Immigration Data
    - Removing null rows
    - Validating destination city, port, visa and arriving model
    - Changing arriving port and destination to complete city and state name
    - Selecting needed columns for data model and change names
> **Note**: in some rows arriving port state were different from destination address state. In this case, difference has been assumed is true and passenger select different entering point than final address


In [12]:
# Clean I94 immigration data
i94_data=df_i94.filter(df_i94.i94addr.isNotNull())\
.filter(df_i94.i94res.isNotNull())\
.filter(col("i94addr").isin(list(abbrev_state.keys())))\
.filter(col("i94port").isin(list(i94_port.keys())))\
.filter(col("i94visa").cast("integer").isin(list(i94_visa.keys())))\
.filter(col("i94mode").cast("integer").isin(list(i94_model.keys())))\
.withColumn("origin_country",country_udf(df_i94["i94cit"]))\
.withColumn("dest_state_name",abbrev_state_udf(df_i94["i94addr"]))\
.withColumn("i94yr",col("i94yr").cast("integer"))\
.withColumn("i94mon",col("i94mon").cast("integer"))\
.withColumn("city_port_name",city_code_udf(df_i94["i94port"]))\
.withColumn("state_port_name",state_code_udf(df_i94["i94port"]))\
.withColumn("i94mode",arr_udf(col("i94mode").cast("integer")))\
.withColumn("i94visa",visa_udf(col("i94visa").cast("integer")))

new_i94=i94_data.select(col("i94yr").alias("year"),col("i94mon").alias("month"),\
                        "origin_country",col("i94mode").alias("arriving_model"),\
                        col("i94visa").alias("visa_type"),
                        "city_port_name","state_port_name",\
                        col("i94addr").alias("state_code"),"dest_state_name")

In [13]:
new_i94.show(10)

+----+-----+--------------+--------------+---------+--------------+---------------+----------+---------------+
|year|month|origin_country|arriving_model|visa_type|city_port_name|state_port_name|state_code|dest_state_name|
+----+-----+--------------+--------------+---------+--------------+---------------+----------+---------------+
|2016|    2|       ALBANIA|           Air|  Student|      ATLANTA,|             GA|        MI|       Michigan|
|2016|    2|       ALBANIA|           Air| Pleasure|      CHICAGO,|             IL|        IL|       Illinois|
|2016|    2|       ALBANIA|           Air| Pleasure|      CHICAGO,|             IL|        IL|       Illinois|
|2016|    2|       ALBANIA|           Air| Pleasure|      CHICAGO,|             IL|        AZ|        Arizona|
|2016|    2|       ALBANIA|           Air| Pleasure|      CHICAGO,|             IL|        IL|       Illinois|
|2016|    2|       ALBANIA|           Air| Pleasure|      CHICAGO,|             IL|        IL|       Illinois|
|

##### 3. US Cities Demographic Data:
    - Remove nulls
    - Calculate percentage of population related columns
    - Pivot on `Race` diversity
    - Organize  by state and put average for numeric columns
    - Select needed columns for data model and make final dataset

In [14]:
demog.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

In [15]:
demog_1= demog\
.withColumn("pct_veterans",demog["Number of Veterans"]/demog["Total Population"]*100)\
.withColumn("pct_foreign_born",demog["Foreign-born"]/demog["Total Population"]*100)\
.withColumn("pct_race",demog["Count"]/demog["Total Population"]*100)\
.orderBy("State")

In [16]:
#Select columns with new calculated percentages and state names.
demog_2= demog_1.select("State",\
                         col("State Code").alias("state_code"),\
                         "pct_veterans",\
                         "pct_foreign_born",\
                         "Race",\
                         "pct_race"
                        )

In [17]:
demog_2.show()

+-------+----------+------------------+------------------+--------------------+------------------+
|  State|state_code|      pct_veterans|  pct_foreign_born|                Race|          pct_race|
+-------+----------+------------------+------------------+--------------------+------------------+
|Alabama|        AL| 7.455654931052018|4.6548612565184015|American Indian a...|0.6366346604448964|
|Alabama|        AL| 6.144463601039603|3.7230127891716633|American Indian a...|1.4492679035536913|
|Alabama|        AL| 3.708637556183774| 4.785535601700258|               Asian| 2.779190140128943|
|Alabama|        AL| 7.455654931052018|4.6548612565184015|               White|36.665071340970954|
|Alabama|        AL| 3.708637556183774| 4.785535601700258|  Hispanic or Latino| 2.516829709776485|
|Alabama|        AL| 8.797339171081992| 6.710767050562094|  Hispanic or Latino| 5.756845077572257|
|Alabama|        AL| 7.455654931052018|4.6548612565184015|  Hispanic or Latino|3.3142891328407766|
|Alabama| 

In [18]:
# Pivot the Race column
pivot_demog= demog_2.groupBy("State","state_code","pct_veterans",\
                               "pct_foreign_born").pivot("Race").avg("pct_race")

# Change the header name of the race fields
pivot_demog= pivot_demog.select("State","state_code","pct_veterans","pct_foreign_born",\
                               col("American Indian and Alaska Native").alias("native_american"),\
                               col("Asian"),col("Black or African-American").alias("Black"),\
                               col("Hispanic or Latino").alias("hispanic_latino"),"White")

In [19]:
pivot_demog.show(5)

+-------+----------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|  State|state_code|      pct_veterans|  pct_foreign_born|   native_american|             Asian|             Black|   hispanic_latino|             White|
+-------+----------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|Alabama|        AL|  5.68017067622202| 9.699548556677943|              null| 5.609448484777048|21.441789742924833| 4.042951944270913| 72.92518770848314|
|Alabama|        AL| 7.455654931052018|4.6548612565184015|0.6366346604448964| 3.249479026452494|60.502727009861104|3.3142891328407766|36.665071340970954|
|Alabama|        AL| 8.797339171081992| 6.710767050562094|0.9280116754973191|3.4719798639973773|32.552322937487446| 5.756845077572257|  64.4605899087323|
|Alabama|        AL|6.1476611248377235| 3.842520857471232|  0.61374243291409

In [20]:
# Calculating average of each column per state.
demog_avg= pivot_demog.groupBy("State","state_code").avg("pct_veterans","pct_foreign_born",\
                                                        "native_american","Asian","Black","hispanic_latino",\
                                                        "White").orderBy("State")

In [21]:
# Round the percentages and rename column names
new_demog= demog_avg.select(col("State").alias("state"),\
                             "state_code",\
                             round(col("avg(pct_veterans)"),1).alias("pct_veterans"),\
                             round(col("avg(pct_foreign_born)"),1).alias("pct_foreign_born"),\
                             round(col("avg(native_american)"),1).alias("native_american"),\
                             round(col("avg(Asian)"),1).alias("asian"),\
                             round(col("avg(hispanic_latino)"),1).alias("hispanic_latino"),\
                             round(col("avg(Black)"),1).alias("black"),\
                             round(col("avg(White)"),1).alias('white')
                            )

In [22]:
new_demog.show()

+--------------------+----------+------------+----------------+---------------+-----+---------------+-----+-----+
|               state|state_code|pct_veterans|pct_foreign_born|native_american|asian|hispanic_latino|black|white|
+--------------------+----------+------------+----------------+---------------+-----+---------------+-----+-----+
|             Alabama|        AL|         6.8|             5.1|            0.8|  2.9|            3.6| 45.0| 52.0|
|              Alaska|        AK|         9.2|            11.1|           12.2| 12.3|            9.1|  7.7| 71.2|
|             Arizona|        AZ|         6.6|            12.6|            2.8|  5.1|           28.8|  6.0| 82.7|
|            Arkansas|        AR|         5.2|            10.7|            1.8|  4.1|           14.2| 21.8| 68.0|
|          California|        CA|         4.1|            27.6|            1.7| 17.9|           37.8|  7.5| 62.7|
|            Colorado|        CO|         6.2|             9.6|            2.0|  4.9|   

### Step 3: Define the Data Model

#### Conceptual Data Model
During `Wrangling process`, an schema of data model, based on objective directions of this analysis project, was in mind and so selected columns rely on.
`Star Schema` has been selected to connect fact and dimensional tables.

#### Mapping Out Data Pipelines
For mapping data, following step has been done:
* Dimension tables create from cleans data.
* Fact table created as a SQL query with joins to dimension tables.

### Step 4: Run Pipelines to Model the Data 
#### Data Model and Data Dictionary
Data model, developed based on final datasets on following items:

1. **U.S. Demographic by State**
`
state: string (nullable = true)-Full state name
state_code: string (nullable = true)-Abbreviated state code
pct_veterans: double (nullable = true)-% Avg Veteran population per state
pct_foreign_born: double (nullable = true)-% Avg Foreign-Born population per state
native_american: double (nullable = true)-% Avg Native American population per state
asian: double (nullable = true)-% Avg Asian population per state
hispanic_latino: double (nullable = true)% Avg Hispanic or Latino population per state
black: double (nullable = true)-% Avg Black population per state
white: double (nullable = true)-% Avg White population per state`

2. **Immigration Data by State with Origin**
`year: integer (nullable = true)-Year of immigration
month: integer (nullable = true)-Month of immigration
origin_country: string (nullable = true)-Country of origin
arriving_model: string (nullable = true)-How immigrant entered (Air, Land, Sea)
visa_type: string (nullable = true)-Type of immigrant visa
city_port_name: string (nullable = true)-City port name
state_port_name: string (nullable = true)-State port name
state_code: string (nullable = true)-Abbreviated destination state code
dest_state_name: string (nullable = true)-State destination name
`
3. **Temperature Data by State**
`
year: integer (nullable = true)- Temperature Year
month: integer (nullable = true)- Temperature Month
avg_temp_celsius: double (nullable = true)- Avg Temperature in Celsius per State
state_code: string (nullable = true)-Abbreviated State Code
State: string (nullable = true)-State Name
`
4. **Fact Table**
`
year: integer (nullable = true)-Year from immigration table
immig_month: integer (nullable = true)-Month from immigration table
immig_from: string (nullable = true)-Country of Origin from immigration table
immig_state: string (nullable = true)-State immigrated to from immigration table
immig_state_count: long (nullable = false)-Total count of people immigrated per state from immigration table
pct_foreign_born: double (nullable = true)-Avg % foreign born from Demographic table
native_american: double (nullable = true)-Avg % Native American population from Demographic table
asian: double (nullable = true)-Avg % Asian population from Demographic table
hispanic_latino: double (nullable = true)-% Avg Hispanic or Latino population per state from Demographic table
black: double (nullable = true)-% Avg Black population per state from Demographic table
white: double (nullable = true)-% Avg White population per state from Demographic table
`

In [23]:
# Create Dimension tables
new_i94.createOrReplaceTempView("immigration")
new_demog.createOrReplaceTempView("demographics")
new_temp.createOrReplaceTempView("temperature")

# Allow unlimited time for SQL joins and parquet writes.
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "0")

In [24]:
# This query will build the fact table by joining to the dimension tables above.
# We are counting how many people immigrated to each state in the U.S.
immigration_to_state = spark.sql("""
SELECT 
m.year,
m.month AS immig_month,
m.origin_country AS immig_from,
m.dest_state_name AS immig_state,
m.arriving_model,
m.visa_type,
t.avg_temp_celsius,
d.pct_foreign_born,
d.native_american,
d.asian,
d.hispanic_latino,
d.black,
d.white
    
FROM immigration m JOIN temperature t ON (m.state_code=t.state_code) AND (m.month=t.month) 
JOIN demographics d ON (d.state_code=t.state_code)
                                    
GROUP BY m.year,\
m.month,\
m.origin_country,\
m.dest_state_name,\
m.arriving_model,\
m.visa_type,\
t.avg_temp_celsius,\
d.pct_foreign_born,\
d.native_american,\
d.asian,\
d.hispanic_latino,\
d.white,\
d.black
                                    
ORDER BY m.origin_country
""")

In [25]:
immigration_to_state.printSchema()

root
 |-- year: integer (nullable = true)
 |-- immig_month: integer (nullable = true)
 |-- immig_from: string (nullable = true)
 |-- immig_state: string (nullable = true)
 |-- arriving_model: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- avg_temp_celsius: double (nullable = true)
 |-- pct_foreign_born: double (nullable = true)
 |-- native_american: double (nullable = true)
 |-- asian: double (nullable = true)
 |-- hispanic_latino: double (nullable = true)
 |-- black: double (nullable = true)
 |-- white: double (nullable = true)



#### Data Quality Checks
During wrangling process, some part data quality has applied in removing null rows. Now the data quality check will ensure there are adequate number of entries in each table.

In [26]:
def quality_check(df, description):
    '''
    Input: Spark dataframe, description of Spark datafram
    
    Output: Print outcome of data quality check
    
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

# Perform data quality check


In [27]:
quality_check(new_i94, "immigration table")

Data quality check passed for immigration table with 2241338 records


0

In [28]:
quality_check(new_temp, "temperature table")

Data quality check passed for temperature table with 729 records


0

In [29]:
quality_check(new_demog, "demographic table")

Data quality check passed for demographic table with 49 records


0

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

Spark was chosen since it can easily handle multiple file formats (including SAS) containing large amounts of data. Spark SQL was chosen to process the large input files into dataframes and manipulated via standard SQL join operations to form additional tables.

* Propose how often the data should be updated and why.

The data should be updated monthly in conjunction with the current raw file format.

* Write a description of how you would approach the problem differently under the following scenarios:

>The data was increased by 100x.
 
If the data was increased by 100x, we would no longer process the data as a single batch job. We could perhaps do incremental updates. We could also consider moving Spark to cluster mode using a cluster manager such as Yarn.  
 
>The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
If the data needs to populate a dashboard daily to meet an SLA then we could use a scheduling tool such as [Airflow](https://airflow.apache.org) to run the ETL pipeline overnight.
 
>The database needed to be accessed by 100+ people.
 
If the database needed to be accessed by 100+ people, we could consider publishing the parquet files to HDFS and giving read access to users that need it. Using cloud services like AWS and Azure are other options