# Project Title
### Data Engineering Capstone Project

#### Project Summary
The data sources will be aggregated using Spark SQL and matplotlib will be used to display graphs of the data.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import os
import glob
from us_state_abbrev import state_udf, abbrev_state, abbrev_state_udf,city_code_udf,city_codes
from immigration_codes import country_udf
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *

In [2]:
#Build spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [3]:
#Build SQL context object
sqlContext = SQLContext(spark)

### Step 1: Scope the Project and Gather Data

#### Scope 
This project will pull data from all sources and create fact and dimension tables to show movement of immigration.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 
1. **U.S. City Demographic Data:** comes from [OpenSoft](https://public.opendatasoft.com) and includes data by city, state, age, population, veteran status and race.
2. **I94 Immigration Data:** comes from the [US National Tourism and Trade Office](https://travel.trade.gov/research/reports/i94/historical/2016.html)
and includes details on incoming immigrants and their ports of entry.
3. **Airport Code Table:** comes from [datahub.io](https://datahub.io/core/airport-codes#data) and includes airport codes and corresponding cities.
4. **World Temperature Data:** comes from [kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) and includes data on temperature changes in the U.S. since 1850.

In [4]:
# Read in the data here
demog=spark.read.format("csv").option("header", "true").option("delimiter", ";").load("us-cities-demographics.csv")
airport=spark.read.format("csv").option("header", "true").load("airport-codes_csv.csv")
temperatureData=spark.read.format("csv").option("header", "true").load("GlobalLandTemperaturesByState.csv")
df_spark=spark.read.format('com.github.saurfang.sas.spark').load("../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
1. Filter average temperature data only for the United States and only year == 2013 and 
    create new fields with year, month, fahrenheit and run abbreviations function and drop duplicates.
2. Remove nulls then convert i94res codes to country of origin then select important columns from the immigration data and drop duplicates.
3. Sort city demographic data then calculate percentages and select percentages fields and drop duplicates.
4. Filter airport data for "small_airport" and use substring to return the state code.

# Temperature Data by State

In [5]:
#filter the world Temperature Data for only the U.S. and only == 2013 and drop duplicates and convert celcius temp to f
usTemperatures=temperatureData.filter(temperatureData["country"]=="United States")\
.filter(year(temperatureData["dt"])==2013)\
.withColumn("year",year(temperatureData["dt"]))\
.withColumn("month",month(temperatureData["dt"]))\
.withColumn("avg_temp_fahrenheit",temperatureData["AverageTemperature"]*9/5+32)\
.withColumn("state_abbrev",state_udf(temperatureData["State"]))

new_Temperatures=usTemperatures.select("year","month",round(col("AverageTemperature"),1).alias("avg_temp_celcius"),\
                                       round(col("avg_temp_fahrenheit"),1).alias("avg_temp_fahrenheit"),
                                       "state_abbrev","State","Country").dropDuplicates()

In [6]:
new_Temperatures.show(5)

+----+-----+----------------+-------------------+------------+-------------+-------------+
|year|month|avg_temp_celcius|avg_temp_fahrenheit|state_abbrev|        State|      Country|
+----+-----+----------------+-------------------+------------+-------------+-------------+
|2013|    7|            23.4|               74.1|          MA|Massachusetts|United States|
|2013|    3|            -1.9|               28.5|          SD| South Dakota|United States|
|2013|    9|            14.1|               57.4|          ME|        Maine|United States|
|2013|    1|            -1.3|               29.7|          PA| Pennsylvania|United States|
|2013|    9|            25.1|               77.2|          AL|      Alabama|United States|
+----+-----+----------------+-------------------+------------+-------------+-------------+
only showing top 5 rows



# Immigration Data by State with Origin

In [7]:
# remove nulls then convert i94res codes to country of origin and filter out NULLS and run country_udf function to show state names
# country_udf, abbrev_state_udf and city_code_udf were created with data from i94 SAS labels Descriptions file.
i94_data=df_spark.filter(df_spark.i94addr.isNotNull())\
.filter(df_spark.i94res.isNotNull())\
.filter(col("i94addr").isin(list(abbrev_state.keys())))\
.filter(col("i94port").isin(list(city_codes.keys())))\
.withColumn("origin_country",country_udf(df_spark["i94res"]))\
.withColumn("dest_state_name",abbrev_state_udf(df_spark["i94addr"]))\
.withColumn("i94yr",col("i94yr").cast("integer"))\
.withColumn("i94mon",col("i94mon").cast("integer"))\
.withColumn("city_port_name",city_code_udf(df_spark["i94port"]))

new_I94_Data=i94_data.select("cicid",col("i94yr").alias("year"),col("i94mon").alias("month"),\
                             "origin_country","i94port","city_port_name",col("i94addr").alias("state_code"),"dest_state_name")

In [8]:
new_I94_Data.show(5)

+-----+----+-----+--------------+-------+------------------+----------+---------------+
|cicid|year|month|origin_country|i94port|    city_port_name|state_code|dest_state_name|
+-----+----+-----+--------------+-------+------------------+----------+---------------+
| 41.0|2016|    6|   SOUTH KOREA|    SFR|SAN FRANCISCO     |        CA|     California|
| 42.0|2016|    6|   SOUTH KOREA|    SFR|SAN FRANCISCO     |        CA|     California|
| 45.0|2016|    6|       ROMANIA|    HOU|HOUSTON           |        TX|          Texas|
| 52.0|2016|    6|       ALBANIA|    BOS|BOSTON            |        MA|  Massachusetts|
| 53.0|2016|    6|       ALBANIA|    NEW|NEWARK/TETERBORO  |        PA|   Pennsylvania|
+-----+----+-----+--------------+-------+------------------+----------+---------------+
only showing top 5 rows



# U.S. Demographic Data by State

In [9]:
#Calculate percentages of each numeric column and create new columns.
demog_data=demog\
.withColumn("Median Age",col("Median Age").cast("float"))\
.withColumn("pct_male_pop",demog["Male Population"]/demog["Total Population"]*100)\
.withColumn("pct_female_pop",demog["Female Population"]/demog["Total Population"]*100)\
.withColumn("pct_veterans",demog["Number of Veterans"]/demog["Total Population"]*100)\
.withColumn("pct_foreign_born",demog["Foreign-born"]/demog["Total Population"]*100)\
.withColumn("pct_race",demog["Count"]/demog["Total Population"]*100)\
.orderBy("State")

In [10]:
#Select columns with new calculated percentages and state names.
new_demog_data=demog_data.select("State",col("State Code").alias("state_code"),\
                                 col("Median Age").alias("median_age"),\
                                 "pct_male_pop",\
                                 "pct_female_pop",\
                                 "pct_veterans",\
                                 "pct_foreign_born",\
                                 "Race",\
                                 "pct_race")

In [11]:
#pivot the Race column
pivot_demog_data=new_demog_data.groupBy("State","state_code","median_age","pct_male_pop",\
                                    "pct_female_pop","pct_veterans",\
                                    "pct_foreign_born").pivot("Race").avg("pct_race")

#change the header name of the race fields for spark compatibility.
pivot_demog_data=pivot_demog_data.select("State","state_code","median_age","pct_male_pop","pct_female_pop","pct_veterans","pct_foreign_born",\
                                         col("American Indian and Alaska Native").alias("native_american"),\
                                         col("Asian"),col("Black or African-American").alias("Black"),\
                                         col("Hispanic or Latino").alias("hispanic_or_latino"),"White")
                                         

In [12]:
#Find the average of each column per state. The avg function will distort the column names but will fix next step.
pivot=pivot_demog_data.groupBy("State","state_code").avg("median_age","pct_male_pop","pct_female_pop",\
                                                       "pct_veterans","pct_foreign_born","native_american",\
                                                       "Asian","Black","hispanic_or_latino","White").orderBy("State")


In [13]:
#Round the percentages and fix column names
pivot=pivot.select("State","state_code",round(col("avg(median_age)"),1).alias("median_age"),\
                  round(col("avg(pct_male_pop)"),1).alias("pct_male_pop"),\
                   round(col("avg(pct_female_pop)"),1).alias("pct_female_pop"),\
                   round(col("avg(pct_veterans)"),1).alias("pct_veterans"),\
                   round(col("avg(pct_foreign_born)"),1).alias("pct_foreign_born"),\
                   round(col("avg(native_american)"),1).alias("native_american"),\
                   round(col("avg(Asian)"),1).alias("Asian"),\
                   round(col("avg(hispanic_or_latino)"),1).alias("hispanic_or_latino"),\
                   round(col("avg(Black)"),1).alias("Black"),\
                   round(col('avg(White)'),1).alias('White')
                  )


In [14]:
pivot.show(5)

+----------+----------+----------+------------+--------------+------------+----------------+---------------+-----+------------------+-----+-----+
|     State|state_code|median_age|pct_male_pop|pct_female_pop|pct_veterans|pct_foreign_born|native_american|Asian|hispanic_or_latino|Black|White|
+----------+----------+----------+------------+--------------+------------+----------------+---------------+-----+------------------+-----+-----+
|   Alabama|        AL|      36.2|        47.2|          52.8|         6.8|             5.1|            0.8|  2.9|               3.6| 45.0| 52.0|
|    Alaska|        AK|      32.2|        51.2|          48.8|         9.2|            11.1|           12.2| 12.3|               9.1|  7.7| 71.2|
|   Arizona|        AZ|      35.0|        48.8|          51.2|         6.6|            12.6|            2.8|  5.1|              28.8|  6.0| 82.7|
|  Arkansas|        AR|      32.8|        48.4|          51.6|         5.2|            10.7|            1.8|  4.1|          

# U.S. Airport Data by State

In [15]:
#Filter airport data for 'small_airport' in the U.S. and use substring to show state
airport_data=airport.filter(airport["type"]=="small_airport")\
.filter(airport["iso_country"]=="US")\
.withColumn("iso_region",substring(airport["iso_region"],4,2))\
.withColumn("elevation_ft",col("elevation_ft").cast("float"))

#Find average elevation per state
airport_data_elevation=airport_data.groupBy("iso_country","iso_region").avg("elevation_ft")

#Select relevant columns and drop duplicates
new_airport_data=airport_data_elevation.select(col("iso_country").alias("country"),\
                                               col("iso_region").alias("state"),\
                                               round(col("avg(elevation_ft)"),1).alias("avg_elevation_ft")).orderBy("iso_region")


In [16]:
new_airport_data.show(5)

+-------+-----+----------------+
|country|state|avg_elevation_ft|
+-------+-----+----------------+
|     US|   AK|           545.1|
|     US|   AL|           414.6|
|     US|   AR|           488.4|
|     US|   AZ|          3098.0|
|     US|   CA|          1261.4|
+-------+-----+----------------+
only showing top 5 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model
##### Star Schema
1. Dimension Tables
    * airport_table
        * country, state, avg_elevation_ft
    * city_stats
        * State,state_code,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,native_american,Asian,hispanic_or_latino,Black,White
    * immigration_table
        * cicid,year,month,origin_country,i94port,city_port_us,state,dest_state_us
    * avg_state_temps
        * year,month,AverageTemperature,avg_temp_fahrenheit,state_abbrev,State,Country
2. Fact Table
    * immigration_fact_table
        * year, immig_month, immig_origin, immig_state, 'to_immig_state_count', 'avg_temp_fahrenheit', 'avg_elevation_ft',
          'pct_foreign_born', 'native_american', 'Asian', 'hispanic_or_latino', 'Black', 'White'

This schema was chosen because of it's simplicity and it's use in data analytics.
#### 3.2 Mapping Out Data Pipelines

1. Dimension tables will be created from cleansed data.
2. Fact table is created as a SQL query with joins to dimension tables.
3. Fact table is converted back to a spark dataframe.
4. Fact table is written as final parquet file.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [17]:
# Create Dimension tables
new_I94_Data.createOrReplaceTempView("immigration")
pivot.createOrReplaceTempView("demographics")
new_airport_data.createOrReplaceTempView("airport")
new_Temperatures.createOrReplaceTempView("temperature")

#allow unlimited time for SQL joins and parquet writes.
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "0")

In [18]:
# This query will build the fact table by joining to the dimension tables above.
# We are counting how many people immigrated to each state in the U.S.
immigration_to_states=spark.sql("""SELECT 
                                    m.year,
                                    m.month AS immig_month,
                                    m.origin_country AS immig_origin,
                                    m.dest_state_name AS to_immig_state,
                                    COUNT(m.state_code) AS to_immig_state_count,
                                    t.avg_temp_fahrenheit,
                                    a.avg_elevation_ft,
                                    d.pct_foreign_born,
                                    d.native_american,
                                    d.Asian,
                                    d.hispanic_or_latino,
                                    d.Black,
                                    d.White
                                    
                                    FROM immigration m JOIN temperature t ON m.state_code=t.state_abbrev AND m.month=t.month
                                    JOIN demographics d ON d.state_code=t.state_abbrev
                                    JOIN airport a ON a.state=t.state_abbrev
                                    
                                    GROUP BY m.year,m.month, m.origin_country,\
                                    m.dest_state_name,m.state_code,t.avg_temp_fahrenheit,a.avg_elevation_ft,\
                                    d.pct_foreign_born,d.native_american,\
                                    d.Asian,d.hispanic_or_latino,\
                                    d.hispanic_or_latino,d.White,\
                                    d.Black
                                    
                                    ORDER BY m.origin_country,m.state_code
                                    
""")


In [19]:
immigration_to_states.toDF('year', 'immig_month', 'immig_origin', 'to_immig_state', \
          'to_immig_state_count', 'avg_temp_fahrenheit', 'avg_elevation_ft',\
          'pct_foreign_born', 'native_american', 'Asian', 'hispanic_or_latino', 'Black', 'White').show(5)

+----+-----------+------------+--------------+--------------------+-------------------+----------------+----------------+---------------+-----+------------------+-----+-----+
|year|immig_month|immig_origin|to_immig_state|to_immig_state_count|avg_temp_fahrenheit|avg_elevation_ft|pct_foreign_born|native_american|Asian|hispanic_or_latino|Black|White|
+----+-----------+------------+--------------+--------------------+-------------------+----------------+----------------+---------------+-----+------------------+-----+-----+
|2016|          6| AFGHANISTAN|      Arkansas|                   1|               77.7|           488.4|            10.7|            1.8|  4.1|              14.2| 21.8| 68.0|
|2016|          6| AFGHANISTAN|       Arizona|                   1|               79.9|          3098.0|            12.6|            2.8|  5.1|              28.8|  6.0| 82.7|
|2016|          6| AFGHANISTAN|    California|                  31|               72.5|          1261.4|            27.6|    

In [20]:
# Write fact table to parquet
immigration_to_states.write.parquet("immigration_to_states")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [21]:
# Check for NULL values in year, month, origin_country, to_immig_state
# If false is retured for all columns selected then the data is fine across the dataset
immigration_to_states.select(isnull('year').alias('year'),\
                             isnull('immig_month').alias('month'),\
                             isnull('immig_origin').alias('country'),\
                             isnull('to_immig_state').alias('state')).dropDuplicates().show()


+-----+-----+-------+-----+
| year|month|country|state|
+-----+-----+-------+-----+
|false|false|  false|false|
+-----+-----+-------+-----+



In [22]:
# Count the total number of people immigrated to the United States from the Fact Table.
immigration_to_states.select(sum('to_immig_state_count').alias('fact_table_count')).show()

+----------------+
|fact_table_count|
+----------------+
|         3207230|
+----------------+



In [23]:
# Count the total number of immigrants from the source table new_I94_Data
spark.sql('SELECT COUNT(*) FROM immigration').show()
#new_I94_Data.select(sum('origin_country').alias('i94Count')).show()
# Both totals must match

+--------+
|count(1)|
+--------+
| 3214208|
+--------+



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.
* See the DataDictionary file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

**1. For this project I used Apache Spark to read, transform, and create data outputs for further analysis. The reason for this was due to the small amount of data and the speed of Spark.**

**2. The data should be updated quarterly. This gives the most up-to-date data for government and organizations.**

**3. Under the following scenarios, I would approach the problem differently:**
  * **If the data was increased by 100x, I would use Apache Hadoop to create a distributed processing system for faster processing.**
  * **To update on a daily basis I would use Apache Airflow to create a schedule to run a distributed update on all tables with data streamed from the source.**
  * **If the data needs to be accessed by 100+ people, I would use a web app running on Amazon AWS for increased capacity.**