# Project Title
### Data Engineering Capstone Project

#### Project Summary
We will use Spark SQL to aggregate the data sources.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [37]:
# importing packages and files
import pandas as pd
import os
import glob
from state_abb import state_udf, abbrev_state, abbrev_state_udf,city_code_udf,city_codes
from immigration_codes import country_udf
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *

In [38]:
#Building Spark Session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [39]:
#Building SQL context object
sqlContext = SQLContext(spark)

### Step 1: Scope the Project and Gather Data

#### Scope 
The project will show the movement of immigration. We will pull data from various data sources to create fact and dimension tables, in order to show how immigration moves.

#### Describe and Gather Data 
1. **U.S. City Demographic Data:** comes from OpenSoft and includes data by city, state, age, population, veteran status and race.
2. **I94 Immigration Data:** comes from the US National Tourism and Trade Office and includes details on incoming immigrants and their ports of entry.
3. **Airport Code Table:** comes from datahub.io and includes airport codes and corresponding cities.
4. **World Temperature Data:** comes from kaggle and includes data on temperature changes in the U.S. since 1850

In [40]:
# Read in the data here
demog=spark.read.format("csv").option("header", "true").option("delimiter", ";").load("us-cities-demographics.csv")
airport=spark.read.format("csv").option("header", "true").load("airport-codes_csv.csv")
temperatureData=spark.read.format("csv").option("header", "true").load("GlobalLandTemperaturesByState.csv")
df_spark=spark.read.format('com.github.saurfang.sas.spark').load("../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues.

#### Cleaning Steps
1. Filtering avg temperature data for the U.S. where year == 2012. Creating fields with year, month, fahrenheit. Running abbreviations function. Dropping duplicates.
2. Removing 'nulls'. Converting i94res codes to 'country of origin'. Selecting significant columns from the immigration. Dropping duplicates.
3. Sorting city demographic data. Calculating percentages and selecting percentages fields. Dropping duplicates.
4. Filtering airport data for "small_airport". Using substring to return the state code.

# Filtering avg temperature data for the U.S. where year == 2012. Creating fields with year, month, fahrenheit. Running abbreviations function. Dropping duplicates.

In [41]:
#temperature data by state
usTemperatures=temperatureData.filter(temperatureData["country"]=="United States")\
.filter(year(temperatureData["dt"])==2012)\
.withColumn("year",year(temperatureData["dt"]))\
.withColumn("month",month(temperatureData["dt"]))\
.withColumn("avg_temp_fahrenheit",temperatureData["AverageTemperature"]*9/5+32)\
.withColumn("state_abbrev",state_udf(temperatureData["State"]))

new_Temperatures=usTemperatures.select("year","month",round(col("AverageTemperature"),1).alias("avg_temp_celcius"),\
                                       round(col("avg_temp_fahrenheit"),1).alias("avg_temp_fahrenheit"),
                                       "state_abbrev","State","Country").dropDuplicates()

In [42]:
new_Temperatures.show(3)

+----+-----+----------------+-------------------+------------+-------------+-------------+
|year|month|avg_temp_celcius|avg_temp_fahrenheit|state_abbrev|        State|      Country|
+----+-----+----------------+-------------------+------------+-------------+-------------+
|2012|    2|             0.2|               32.4|          MA|Massachusetts|United States|
|2012|    9|            24.2|               75.6|          MS|  Mississippi|United States|
|2012|    6|            21.7|               71.1|          VA|     Virginia|United States|
+----+-----+----------------+-------------------+------------+-------------+-------------+
only showing top 3 rows



# Removing 'nulls'. Converting i94res codes to 'country of origin'. Selecting significant columns from the immigration. Dropping duplicates.

In [43]:
#immigration data by state with origin
i94_data=df_spark.filter(df_spark.i94addr.isNotNull())\
.filter(df_spark.i94res.isNotNull())\
.filter(col("i94addr").isin(list(abbrev_state.keys())))\
.filter(col("i94port").isin(list(city_codes.keys())))\
.withColumn("origin_country",country_udf(df_spark["i94res"]))\
.withColumn("dest_state_name",abbrev_state_udf(df_spark["i94addr"]))\
.withColumn("i94yr",col("i94yr").cast("integer"))\
.withColumn("i94mon",col("i94mon").cast("integer"))\
.withColumn("city_port_name",city_code_udf(df_spark["i94port"]))

new_I94_Data=i94_data.select("cicid",col("i94yr").alias("year"),col("i94mon").alias("month"),\
                             "origin_country","i94port","city_port_name",col("i94addr").alias("state_code"),"dest_state_name")

In [44]:
new_I94_Data.show(3)

+-----+----+-----+--------------+-------+------------------+----------+---------------+
|cicid|year|month|origin_country|i94port|    city_port_name|state_code|dest_state_name|
+-----+----+-----+--------------+-------+------------------+----------+---------------+
| 41.0|2016|    6|   SOUTH KOREA|    SFR|SAN FRANCISCO     |        CA|     California|
| 42.0|2016|    6|   SOUTH KOREA|    SFR|SAN FRANCISCO     |        CA|     California|
| 45.0|2016|    6|       ROMANIA|    HOU|HOUSTON           |        TX|          Texas|
+-----+----+-----+--------------+-------+------------------+----------+---------------+
only showing top 3 rows



# Sorting city demographic data. Calculating percentages and selecting percentages fields. Dropping duplicates.

In [45]:
#U.S. demographic data by state
#Calculating percentages of each numeric column and creating new columns.
demog_data=demog\
.withColumn("Median Age",col("Median Age").cast("float"))\
.withColumn("pct_male_pop",demog["Male Population"]/demog["Total Population"]*100)\
.withColumn("pct_female_pop",demog["Female Population"]/demog["Total Population"]*100)\
.withColumn("pct_veterans",demog["Number of Veterans"]/demog["Total Population"]*100)\
.withColumn("pct_foreign_born",demog["Foreign-born"]/demog["Total Population"]*100)\
.withColumn("pct_race",demog["Count"]/demog["Total Population"]*100)\
.orderBy("State")

In [46]:
#Selecting columns with new calculated percentages and state names.
new_demog_data=demog_data.select("State",col("State Code").alias("state_code"),\
                                 col("Median Age").alias("median_age"),\
                                 "pct_male_pop",\
                                 "pct_female_pop",\
                                 "pct_veterans",\
                                 "pct_foreign_born",\
                                 "Race",\
                                 "pct_race")

In [47]:
#Select columns with new calculated percentages and state names.
new_demog_data=demog_data.select("State",col("State Code").alias("state_code"),\
                                 col("Median Age").alias("median_age"),\
                                 "pct_male_pop",\
                                 "pct_female_pop",\
                                 "pct_veterans",\
                                 "pct_foreign_born",\
                                 "Race",\
                                 "pct_race")

In [48]:
#pivoting the Race column
pivot_demog_data=new_demog_data.groupBy("State","state_code","median_age","pct_male_pop",\
                                    "pct_female_pop","pct_veterans",\
                                    "pct_foreign_born").pivot("Race").avg("pct_race")

#changing the header name of the race fields for spark compatibility.
pivot_demog_data=pivot_demog_data.select("State","state_code","median_age","pct_male_pop","pct_female_pop","pct_veterans","pct_foreign_born",\
                                         col("American Indian and Alaska Native").alias("native_american"),\
                                         col("Asian"),col("Black or African-American").alias("Black"),\
                                         col("Hispanic or Latino").alias("hispanic_or_latino"),"White")

In [49]:
#Finding the avg of each column per state. The avg function will distort the column names but will fix next step.
pivot=pivot_demog_data.groupBy("State","state_code").avg("median_age","pct_male_pop","pct_female_pop",\
                                                       "pct_veterans","pct_foreign_born","native_american",\
                                                       "Asian","Black","hispanic_or_latino","White").orderBy("State")


In [50]:
#Rounding the percentages and fixing column names
pivot=pivot.select("State","state_code",round(col("avg(median_age)"),1).alias("median_age"),\
                  round(col("avg(pct_male_pop)"),1).alias("pct_male_pop"),\
                   round(col("avg(pct_female_pop)"),1).alias("pct_female_pop"),\
                   round(col("avg(pct_veterans)"),1).alias("pct_veterans"),\
                   round(col("avg(pct_foreign_born)"),1).alias("pct_foreign_born"),\
                   round(col("avg(native_american)"),1).alias("native_american"),\
                   round(col("avg(Asian)"),1).alias("Asian"),\
                   round(col("avg(hispanic_or_latino)"),1).alias("hispanic_or_latino"),\
                   round(col("avg(Black)"),1).alias("Black"),\
                   round(col('avg(White)'),1).alias('White')
                  )

In [51]:
pivot.show(3)

+-------+----------+----------+------------+--------------+------------+----------------+---------------+-----+------------------+-----+-----+
|  State|state_code|median_age|pct_male_pop|pct_female_pop|pct_veterans|pct_foreign_born|native_american|Asian|hispanic_or_latino|Black|White|
+-------+----------+----------+------------+--------------+------------+----------------+---------------+-----+------------------+-----+-----+
|Alabama|        AL|      36.2|        47.2|          52.8|         6.8|             5.1|            0.8|  2.9|               3.6| 45.0| 52.0|
| Alaska|        AK|      32.2|        51.2|          48.8|         9.2|            11.1|           12.2| 12.3|               9.1|  7.7| 71.2|
|Arizona|        AZ|      35.0|        48.8|          51.2|         6.6|            12.6|            2.8|  5.1|              28.8|  6.0| 82.7|
+-------+----------+----------+------------+--------------+------------+----------------+---------------+-----+------------------+-----+-----+

# Filtering airport data for "small_airport". Using substring to return the state code.

In [52]:
#U.S. airport data by state
airport_data=airport.filter(airport["type"]=="small_airport")\
.filter(airport["iso_country"]=="US")\
.withColumn("iso_region",substring(airport["iso_region"],4,2))\
.withColumn("elevation_ft",col("elevation_ft").cast("float"))

#Find average elevation per state
airport_data_elevation=airport_data.groupBy("iso_country","iso_region").avg("elevation_ft")

#Select relevant columns and drop duplicates
new_airport_data=airport_data_elevation.select(col("iso_country").alias("country"),\
                                               col("iso_region").alias("state"),\
                                               round(col("avg(elevation_ft)"),1).alias("avg_elevation_ft")).orderBy("iso_region")

In [53]:
new_airport_data.show(3)

+-------+-----+----------------+
|country|state|avg_elevation_ft|
+-------+-----+----------------+
|     US|   AK|           545.1|
|     US|   AL|           414.6|
|     US|   AR|           488.4|
+-------+-----+----------------+
only showing top 3 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
I have used star schema to construct this data model. In a star schema database design, the dimensions are linked only through the central fact table. When two dimension tables are used in a query, only one join path, intersecting the fact table, exists between those two tables. This design feature enforces accurate and consistent query results.

##### **Dimension Tables**
###### Airport Data by State
 * country
 * state
 * avg_elevation_ft 
 
###### U.S. Demographic by State
 * State
 * state_code
 * median_age
 * pct_male_pop
 * pct_female_pop
 * pct_veterans
 * pct_foreign_born
 * native_american
 * Asian
 * hispanic_or_latino
 * Black
 * White
 
###### Immigration Data by State with Origin
 * cicid
 * year
 * month
 * origin_country
 * i94port
 * city_port_name
 * state_code
 * dest_state_name

###### Temperature Data by State
 * year
 * month
 * avg_temp_celcius
 * avg_temp_fahrenheit
 * state_abbrev
 * State
 * Country

#### **Fact Table**
 * year
 * immig_month
 * immig_origin
 * to_immig_state
 * to_immig_state_count
 * avg_temp_fahrenheit
 * avg_elevation_ft
 * pct_foreign_born
 * native_american
 * Asian
 * hispanic_or_latino
 * Black
 * White

#### 3.2 Mapping Out Data Pipelines
1. Dimension tables are created from the cleansed data.
2. Fact table are SQL query with joins to dimension tables.
3. Fact table is converted back to a spark dataframe.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [54]:
# Dimension Tables
new_I94_Data.createOrReplaceTempView("immigration")
pivot.createOrReplaceTempView("demographics")
new_airport_data.createOrReplaceTempView("airport")
new_Temperatures.createOrReplaceTempView("temperature")

# Unlimited time for SQL joins. Parquet writes.
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "0")

In [55]:
#Fact table
immigration_to_states=spark.sql("""SELECT 
                                    m.year,
                                    m.month AS immig_month,
                                    m.origin_country AS immig_origin,
                                    m.dest_state_name AS to_immig_state,
                                    COUNT(m.state_code) AS to_immig_state_count,
                                    t.avg_temp_fahrenheit,
                                    a.avg_elevation_ft,
                                    d.pct_foreign_born,
                                    d.native_american,
                                    d.Asian,
                                    d.hispanic_or_latino,
                                    d.Black,
                                    d.White
                                    
                                    FROM immigration m JOIN temperature t ON m.state_code=t.state_abbrev AND m.month=t.month
                                    JOIN demographics d ON d.state_code=t.state_abbrev
                                    JOIN airport a ON a.state=t.state_abbrev
                                    
                                    GROUP BY m.year,m.month, m.origin_country,\
                                    m.dest_state_name,m.state_code,t.avg_temp_fahrenheit,a.avg_elevation_ft,\
                                    d.pct_foreign_born,d.native_american,\
                                    d.Asian,d.hispanic_or_latino,\
                                    d.hispanic_or_latino,d.White,\
                                    d.Black
                                    
                                    ORDER BY m.origin_country,m.state_code
                                    
""")

In [56]:
immigration_to_states.toDF('year', 'immig_month', 'immig_origin', 'to_immig_state', \
          'to_immig_state_count', 'avg_temp_fahrenheit', 'avg_elevation_ft',\
          'pct_foreign_born', 'native_american', 'Asian', 'hispanic_or_latino', 'Black', 'White').show(3)

+----+-----------+------------+--------------+--------------------+-------------------+----------------+----------------+---------------+-----+------------------+-----+-----+
|year|immig_month|immig_origin|to_immig_state|to_immig_state_count|avg_temp_fahrenheit|avg_elevation_ft|pct_foreign_born|native_american|Asian|hispanic_or_latino|Black|White|
+----+-----------+------------+--------------+--------------------+-------------------+----------------+----------------+---------------+-----+------------------+-----+-----+
|2016|          6| AFGHANISTAN|      Arkansas|                   1|               78.6|           488.4|            10.7|            1.8|  4.1|              14.2| 21.8| 68.0|
|2016|          6| AFGHANISTAN|       Arizona|                   1|               79.2|          3098.0|            12.6|            2.8|  5.1|              28.8|  6.0| 82.7|
|2016|          6| AFGHANISTAN|    California|                  31|               69.4|          1261.4|            27.6|    

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [59]:
# Perform quality checks here
#Checking for null value
immigration_to_states.select(isnull('year').alias('year'),\
                             isnull('immig_month').alias('month'),\
                             isnull('immig_origin').alias('country'),\
                             isnull('to_immig_state').alias('state')).dropDuplicates().show()

+-----+-----+-------+-----+
| year|month|country|state|
+-----+-----+-------+-----+
|false|false|  false|false|
+-----+-----+-------+-----+



In [66]:
# Count the total number of immigrants from the source table.
spark.sql('SELECT COUNT(*) FROM immigration').show()

+--------+
|count(1)|
+--------+
| 3214208|
+--------+



In [63]:
# Count the total number of people immigrated to the United States from the Fact Table.
immigration_to_states.select(sum('to_immig_state_count').alias('fact_table_count')).show()

+----------------+
|fact_table_count|
+----------------+
|         3207230|
+----------------+



#### 4.3 Data dictionary 
* Read file titled "tables_definition".

#### Step 5: Complete Project Write Up
1. Since the data used in this project is small, I have used Apache spark for to read, transform, create data outputs for further analysis. 
2. The data should be updated monthly. This gives the latest data for government and organizations.
3. Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x: We can use Hadoop for faster prcessing.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day: We can use Airflow to create a schedule that can    run a distributed update on all the tables.
 * The database needed to be accessed by 100+ people: We can use AWS for increased capacity