# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project explores a rich US immigration dataset in conjuction with US demographics and airports data. The user should be able to query the data to answer questions about US visitor flows in 2016, such as **"What large airport had the most air travelers as % of each airport's city population?"** (see notebook end for answer).

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Imports and installs here
import pandas as pd
import configparser
import os
from pyspark.sql import SparkSession
from helper import *

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')


os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [3]:
# This is to handle "Exception: Java gateway process exited before sending its port number" that arises in the workspace, as per https://knowledge.udacity.com/questions/573236

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"



In [4]:
# Build and return a Spark processs
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()
#spark.sparkContext.addPyFile("./home/workspace/hadoop-aws-3.3.1.jar")

# Specify output S3 bucket
s3_path='s3a://user-capstone-bucket/parquet/'

### Step 1: Scope the Project and Gather Data

#### Scope 
##### Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc

This project utilizes immigration, demographics and airport-codes data to shed light on such questions as:

-1. "How many visitors entered each US state in 2016 as % of the existing population -- based on 2015 demographics -- ?".

-2. "What was their mean age?"

The project connects to Spark, reads the immigration, demographics and airports data, transforms them into tables and writes them into an S3 bucket.

#### Describe and Gather Data 
##### Describe the data sets you're using. Where did it come from? What type of information is included? 

**1. Immigration dataset:** The US National Tourism and Trade office provides this dataset and includes information on visitors, their points of entry, their date of arrival etc. <a href="https://travel.trade.gov/research/reports/i94/historical/2016.html" target="_blank">link to the dataset</a>

**2. Demographics dataset:** U.S. City Demographic Data provided by OpenSoft. This dataset includes population data by age, ethnographic group, state/city, household size etc. <a href="https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/" target="_blank">link to the dataset</a>

**3. Airports dataset:** A table of airport detailed information such as airport type, name, state code etc. Provided by datahub.io <a href="https://datahub.io/core/airport-codes#data" target="_blank">link to the dataset</a>

In [5]:
# 1. Load immigration df
df_immigration = spark.read.load('./sas_data')
print_size(df_immigration, 'Immigration')

df_immigration.show(5)

Immigration (rows, columns): (3096313,28)
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0

In [6]:
# 2. Demographics df
df_demographics = spark.read.csv('us-cities-demographics.csv',sep=';',header=True)
print_size(df_demographics, 'Demographics')

df_demographics.show(5)

Demographics (rows, columns): (2891,12)
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|  

In [7]:
# 3. Airports df
df_airports = spark.read.csv('airport-codes_csv.csv',sep=',',header=True)
print_size(df_airports, 'Airports')

df_airports.show(5)

Airports (rows, columns): (55075,12)
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|   

In [8]:
# Template optionals
#df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
##### Identify data quality issues, like missing values, duplicate data, etc.

Both datasets have several columns that are not relevant for analysis, rows with missing values or fields that require conversion or someother  manipulation. We'll be handling those issues in the next few steps.

#### Cleaning Steps
##### Document steps necessary to clean the data

Having explored the datasets, the following clean up task are performed using the respective clean_up methods retrieved from the helper script:

**1. Immigration dataset:** We drop 15 columns from the original dataframe keeping only those that are relevant, convert SAS dates to date strings and drop all rows which have not reported (code=9) or have a missing mode of transport.


**2. Demographics dataset:** Convert numeric columns'  data type from string to int or double to accommodate calculations.

**3. Airports dataset:** Filter out any non-US airpor and drop the prefix "US-" from "iso_region" to use the column as key later on. Also drop all airports that have shut down. In addition, coordinates are split to latitude and longitude, and data types for elevation feet as well as lat/lon are converted to double.

In [9]:
# Performing cleaning tasks: immigration df

df_immigration_clean = clean_up_immigration_df(df_immigration)
print_size(df_immigration_clean, 'Immigration df post clean up')

df_immigration_clean.show()

Immigration df post clean up (rows, columns): (2937395,13)
+---------+------+-----+----------+-----------+-----------+------------+-----------+--------------+--------+----------------+---------+-------+
|       id|  year|month|country_it|country_res|entry_point|date_arrival|date_depart|mode_transport|us_state|visitor_birth_yr|visa_type|col_one|
+---------+------+-----+----------+-----------+-----------+------------+-----------+--------------+--------+----------------+---------+-------+
|5748517.0|2016.0|  4.0|     245.0|      438.0|        LOS|  2016-04-30| 2016-05-08|           1.0|      CA|            40.0|      1.0|    1.0|
|5748518.0|2016.0|  4.0|     245.0|      438.0|        LOS|  2016-04-30| 2016-05-17|           1.0|      NV|            32.0|      1.0|    1.0|
|5748519.0|2016.0|  4.0|     245.0|      438.0|        LOS|  2016-04-30| 2016-05-08|           1.0|      WA|            29.0|      1.0|    1.0|
|5748520.0|2016.0|  4.0|     245.0|      438.0|        LOS|  2016-04-30| 2016

In [10]:
# Performing cleaning tasks: demographics df

df_demographics_clean = clean_up_demographics_df(df_demographics)

print_size(df_demographics_clean, 'Demographics df post clean up')

df_demographics_clean.show()

Demographics df post clean up (rows, columns): (2891,12)
+----------------+--------------+----------+--------+----------+---------+------------+----------------+---------------+--------+--------------------+-----------+
|    us_city_name| us_state_name|median_age|male_pop|female_pop|total_pop|veterans_pop|foreign_born_pop|mean_hhold_size|us_state|         ethno_group|total_count|
+----------------+--------------+----------+--------+----------+---------+------------+----------------+---------------+--------+--------------------+-----------+
|   Silver Spring|      Maryland|        33|   40601|     41862|    82463|        1562|           30908|            2.6|      MD|  Hispanic or Latino|      25924|
|          Quincy| Massachusetts|        41|   44129|     49500|    93629|        4147|           32935|           2.39|      MA|               White|      58723|
|          Hoover|       Alabama|        38|   38040|     46799|    84839|        4819|            8229|           2.58|      AL

In [11]:
# Performing cleaning tasks: airports df

df_airports_clean = clean_up_airports_df(df_airports)

print_size(df_airports_clean, 'Airports df post clean up')

df_airports_clean.show()

Airports df post clean up (rows, columns): (21431,11)
+-----+-------------+--------------------+-----------+--------+------------+--------+---------+----------+-------------------+------------------+
|ident| airport_type|        airport_name|iso_country|us_state|us_city_name|gps_code|iata_code|local_code|                lat|               lon|
+-----+-------------+--------------------+-----------+--------+------------+--------+---------+----------+-------------------+------------------+
|  00A|     heliport|   Total Rf Heliport|         US|      PA|    Bensalem|     00A|     null|       00A| -74.93360137939453|    40.07080078125|
| 00AA|small_airport|Aero B Ranch Airport|         US|      KS|       Leoti|    00AA|     null|      00AA|        -101.473911|         38.704022|
| 00AK|small_airport|        Lowell Field|         US|      AK|Anchor Point|    00AK|     null|      00AK|     -151.695999146|       59.94919968|
| 00AL|small_airport|        Epps Airpark|         US|      AL|     Ha

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
##### Map out the conceptual data model and explain why you chose that model
We select to model the data using the Star Schema, which will allow end users to more effectively query the data joining the fact table and the three dimension tables. There's a detailed data dictionaryt towards the end of the notebook.

In [12]:
# df_immigration_clean.printSchema()
# df_demographics_clean.printSchema()
# df_airports_clean.printSchema()

#### 3.2 Mapping Out Data Pipelines

##### List the steps necessary to pipeline the data into the chosen data model

Here are the steps that will create our data pipeline:

-1. Drop nulls and duplicates

-2. Use the 3 staging tables -- **df_demographics_clean, df_immigration_clean and df_airports_clean** -- to create 3 dimension tables called **dim_city_pop, dim_airports_cities, dim_immigration** and a single **fact_table**

-3. Load data to S3 Save processed dimension and fact tables in parquet for downstream query

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [62]:
dim_city_pop = df_demographics_clean.groupBy('us_state','us_state_name','us_city_name','male_pop','female_pop','total_pop','veterans_pop',
                                             'foreign_born_pop','mean_hhold_size','median_age').pivot("ethno_group").mean('total_count').sort('us_state'
                                                             ).withColumnRenamed("Asian", "asian"
                                                             ).withColumnRenamed("American Indian and Alaska Native", "american_native"
                                                             ).withColumnRenamed("Black or African-American", "american_black"
                                                             ).withColumnRenamed("Hispanic or Latino", "hispanic_latino"
                                                             ).withColumnRenamed("White", "white").dropDuplicates()

dim_city_pop = dim_city_pop.withColumn("asian", dim_city_pop["asian"].cast(IntegerType())
           ).withColumn("american_native", dim_city_pop["american_native"].cast(IntegerType())
           ).withColumn("american_black", dim_city_pop["american_black"].cast(IntegerType())
           ).withColumn("hispanic_latino", dim_city_pop["hispanic_latino"].cast(IntegerType())
           ).withColumn("white", dim_city_pop["white"].cast(IntegerType()))

dim_airports_cities = df_airports_clean.join(dim_city_pop, ['us_state','us_city_name'], "inner").select(['ident', 'airport_type', 'airport_name', 'us_city_name', 
                                                                                      'us_state', 'local_code', 'lat', 'lon']).dropDuplicates()

dim_immigration = df_immigration_clean.groupby('us_state','mode_transport','visa_type','year','month').sum('col_one').sort('us_state'
                                                        ).withColumnRenamed("sum(col_one)", "total_visitors").dropDuplicates()

dim_immigration = dim_immigration.withColumn("mode_transport", dim_immigration["mode_transport"].cast(IntegerType())
           ).withColumn("visa_type", dim_immigration["visa_type"].cast(IntegerType())
           ).withColumn("year", dim_immigration["year"].cast(IntegerType())
           ).withColumn("month", dim_immigration["month"].cast(IntegerType())
           ).withColumn("total_visitors", dim_immigration["total_visitors"].cast(IntegerType()))

sup = dim_immigration.join(dim_city_pop,['us_state']).select(['us_state','mode_transport', 'visa_type', 'us_city_name']).dropDuplicates()
fact_table = sup.join(dim_airports_cities,['us_city_name','us_state']).select(['us_state','mode_transport', 'visa_type', 'us_city_name','airport_type', 'airport_name']).dropDuplicates()

dim_city_pop.show()
dim_airports_cities.show()
dim_immigration.show()
fact_table.show()

+--------+-------------+-----------------+--------+----------+---------+------------+----------------+---------------+----------+---------------+-----+--------------+---------------+-------+
|us_state|us_state_name|     us_city_name|male_pop|female_pop|total_pop|veterans_pop|foreign_born_pop|mean_hhold_size|median_age|american_native|asian|american_black|hispanic_latino|  white|
+--------+-------------+-----------------+--------+----------+---------+------------+----------------+---------------+----------+---------------+-----+--------------+---------------+-------+
|      AK|       Alaska|        Anchorage|  152945|    145750|   298695|       27492|           33258|           2.77|        32|          36339|36825|         23107|          27261| 212696|
|      AL|      Alabama|       Montgomery|   94582|    106004|   200586|       14955|            9337|           2.41|        35|           1277| 6518|        121360|           6648|  73545|
|      AL|      Alabama|       Huntsville|   

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
 
**Here we're performing two tests:
-1. whether the key us_state has any missing values in any of the tables
-2. whether the fact table has any missing values in any of its columns**


**Both are important to check whether user aggregations will be succesful**

Run Quality Checks

In [80]:
# Quality Check 1

qual_1 = [fact_table.where(F.col('us_state').isNull()).count(),
          dim_airports_cities.where(F.col('us_state').isNull()).count(),
          dim_city_pop.where(F.col('us_state').isNull()).count(),
          dim_immigration.where(F.col('us_state').isNull()).count()]

if sum(qual_1) == 0:
    print('No missing data in unique key us_state')
else:
    raise('Important Error: Unique key us_state includes null values')

No missing data in unique key us_state


In [81]:
# Quality Check 2
if fact_table.na.drop().count() == fact_table.count():
    print('The fact table doesn\'t have any missing values')
else:
    raise('Important Error: fact_table has missing values')

The fact table doesn't have any missing values


#### 4.3 Add tables to S3 as parquet
#####  After implementing this https://gist.github.com/eddies/f37d696567f15b33029277ee9084c4a0, using VIM,
##### because of this error appearing in the workspace: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

In [15]:
dim_city_pop.write.parquet(s3_path+'dim_city_pop.parquet', mode='overwrite')

In [17]:
dim_airports_cities.write.parquet(s3_path+'dim_airports_cities.parquet', mode='overwrite')

In [None]:
dim_immigration.write.parquet(s3_path+'dim_immigration.parquet', mode='overwrite')

In [None]:
fact_table.write.parquet(s3_path+'fact_table.parquet', mode='overwrite')

fact_table.printSchema dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### dimension tables

##### dim_city_pop

| Field name | Type| Constraint | Description |
| --- | --- | --- | --- |
| us_state | string | Composite Key | US state id
| us_city_name | string | Composite Key | US city name
| male_pop | integer |   | Male population
| female_pop | integer |   | Female population
| total_pop | integer |   | Total population
| veterans_pop | integer |   | Veterans population
| foreign_born_pop | integer |   | Population born outside the US
| mean_hhold_size | double |   | Average number of persons forming a household
| median_age | integer |   | Population's mediana age
| american_native | integer |   | Ethnographic group population: American Indian and Alaska Native
| asian | integer |   | Ethnographic group population: Asian
| american_black | integer |   | Ethnographic group population: Black or African-American
| hispanic_latino | integer |   | Ethnographic group population: Hispanic or Latino
| white | integer |   | Ethnographic group population: White


##### dim_airports_cities

| Field name | Type| Constraint | Description |
| --- | --- | --- | --- |
| ident | string |  | Visitor's id
| airport_type | string |  | Airport type e.g. large airports or helicopter airports
| airport_name | string |  Composite Key | The airport's name
| us_state | string | Composite Key | US state id
| us_city_name | string | Composite Key  | US city name
| local_code | string |   | Airport id
| lat | double |   | Airport latitude
| lon | double |   | Airport longitude


##### dim_immigration

| Field name | Type| Constraint | Description |
| --- | --- | --- | --- |
| us_state | string | Composite Key | US state id
| mode_transport | integer |  Composite Key | Id that corresponds to a mode of transport (1 = Air; 2 = Sea; 3 = Land)
| visa_type | integer | Composite Key  | Id that corresponds to the visa type the traveler holds (1 = Business; 2 = Pleasure; 3 = Student)
| year | integer |   | Year the trip took place
| month | integer |   | Month the trip took place
| total_visitors | integer |   | Number of total travelers


##### fact_table

| Field name | Type| Constraint | Description |
| --- | --- | --- | --- |
| us_state | string | Foreign Key | US state id
| mode_transport | integer |  Foreign Key | Id that corresponds to a mode of transport (1 = Air; 2 = Sea; 3 = Land)
| visa_type | integer | Foreign Key  | Id that corresponds to the visa type the traveler holds (1 = Business; 2 = Pleasure; 3 = Student)
| us_city_name | string | Foreign Key | US city name
| airport_type | string | Foreign Key  | Airport type e.g. large airports or helicopter airports
| airport_name | string |  Foreign Key | The airport's name



#### Step 5: Complete Project Write Up

**Clearly state the rationale for the choice of tools and technologies for the project.**

We decided to use Spark for data wrangling and cleaning because of its power when it comes to handling large datasets. We used S3 to store the tables due to lower costs as  we're very budget constraint and couldn't afford a Redshift cluster.

**Propose how often the data should be updated and why.**

The data can be updated even daily assuming the immigration dataset gets updated in that frequency. However, since we're creating monthly aggregation it might make more sense to run the pipeline once per month.

Write a description of how you would approach the problem differently under the following scenarios:

**The data was increased by 100x.**

We could use larger EC2 instances in AWS's EMR or additional work nodes. Increasing capacity is usually the fastest way to improve perfomance and deal with any issues a 100x dataset might bring about.

**The data populates a dashboard that must be updated on a daily basis by 7am every day.**

In that case, using Airflow for scheduling and automating the data pipeline jobs is probably a good solution
sers.
 
**The database needed to be accessed by 100+ people.**
 
The larger the number of people accessing a database, the more likely to face perfomrance issues. Again adding more nodes or usinglarger Redshift clusters is probably the easiest way to deal with the higher workload

#### Step 6: Sample query

#### What large airport had the most air travelers as % of each airport's city population?

In [79]:
fact_table.join(dim_immigration,['us_state','mode_transport','visa_type'],'inner').filter((F.col('mode_transport')==1) & (F.col('year') == 2016) & (F.col('airport_type')=='large_airport')
                ).groupBy('us_state','us_city_name','airport_name').sum('total_visitors'
                ).join(dim_city_pop,['us_state','us_city_name']).select('us_state','us_city_name','airport_name','sum(total_visitors)','total_pop').sort('airport_name'
                ).withColumn("ratio", F.col("sum(total_visitors)") / F.col("total_pop")).sort(F.desc('ratio')).show()

+--------+---------------+--------------------+-------------------+---------+------------------+
|us_state|   us_city_name|        airport_name|sum(total_visitors)|total_pop|             ratio|
+--------+---------------+--------------------+-------------------+---------+------------------+
|      FL|     Fort Myers|Southwest Florida...|             616842|    74015| 8.334013375667094|
|      FL|West Palm Beach|Palm Beach Intern...|             616842|   106782|5.7766477496207225|
|      CA|      Fairfield|Travis Air Force ...|             462964|   112972|4.0980419927061575|
|      NY|       Syracuse|Syracuse Hancock ...|             542397|   144152| 3.762674121760364|
|      FL|Fort Lauderdale|Fort Lauderdale H...|             616842|   178587|  3.45401400997833|
|      FL|    Tallahassee|Tallahassee Regio...|             616842|   189894|3.2483490789598406|
|      CA|        Ontario|Ontario Internati...|             462964|   171200|2.7042289719626167|
|      NY|      Rochester|Grea