# Project Title
### Data Engineering Capstone Project

#### Project Summary
A United States government agency is looking to better understand their immigration data.  On an annual basis they gather a resource that documents each immigration case in a Statistical Analysis Software file.  They also maintain a csv file with basic US city demographic information.  These datasets are stored on premise.

As their data engineer, I am tasked with building an ETL pipeline that extracts their data from on premise, cleans and transforms data into a set of tables represented in parquet files in S3 to be queried in Athena for their analytics team to continue finding insights on US immigration patterns. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
The analytics team would like to better understand their immigration data.  They are particularly interested in the demographic information of immigrants and the towns that they are drawn to live in within the United States.  They want to better understand how factors like age and visa category affect where people move.  They also want to know if people are drawn to cities in which people of a similar race are a significant portion of the population.

#### Describe and Gather Data 
One of the datasets I am using is the Visitor Arrivals Program (I-94 Record) data.  This dataset provides international visitor arrival statistics about country of origin, type of visa, age groups, and state of residence.  More information is available on their website: https://travel.trade.gov/research/reports/i94/historical/2016.html

The country codes associated with country of origin in the I-94 data are made available in the resource 'I94_SAS_Labels_Descriptions.SAS'  This resource was parsed to generate the country_code.csv file.

The continent associated with the country is made available in the country_to_contient.csv file made available at this website:
https://datahub.io/JohnSnowLabs/country-and-continent-codes-list

The dataset on US city demographics is provided by OpenSoft.  More information is available on their website:
https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/


In [42]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import *
from datetime import datetime, timedelta
from pyspark.sql.window import Window
import boto3

In [65]:
%load_ext sql

In [43]:
# setup Spark connection
spark = SparkSession.builder\
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .enableHiveSupport().getOrCreate()

In [44]:
# Visitor Arrivals Program (I-94 Record) data
raw_immigration_df = spark.read.format('com.github.saurfang.sas.spark').load("../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat")

In [45]:
raw_immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- validres: double (nullable = true)
 |-- delete_days: double (nullable = true)
 |-- delete_mexl: double (nullable = true)
 |-- delete_dup: double (nullable = true)
 |-- delete_visa: double (nullable = true)
 |-- delete_recdup: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- mat

In [46]:
# US city demographics
demo_schema = StructType([
	StructField("city", StringType()),
	StructField("state", StringType()),
	StructField("median_age", DoubleType()),
	StructField("male_population", IntegerType()),
	StructField("female_population", IntegerType()),
	StructField("total_population", IntegerType()),
	StructField("number_of_veterans", IntegerType()),
	StructField("foreign_born", IntegerType()),
	StructField("average_household_size", DoubleType()),
	StructField("state_code", StringType()),
	StructField("race", StringType()),
	StructField("count", IntegerType()),
])

us_demo_df = spark.read.csv('us-cities-demographics.csv', sep=';', schema=demo_schema, header=True)

In [47]:
us_demo_df.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- number_of_veterans: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- state_code: string (nullable = true)
 |-- race: string (nullable = true)
 |-- count: integer (nullable = true)



In [48]:
# country code data extracted from I94_SAS_Labels_Descriptions.SAS
country_code_df = spark.read.csv('country_code.csv', header=True)

In [49]:
country_code_df.printSchema()

root
 |-- country_id: string (nullable = true)
 |-- country: string (nullable = true)



In [50]:
# mapping of country to continent
continent_df = spark.read.csv('country_to_continent.csv', header=True)

In [51]:
continent_df.printSchema()

root
 |-- continent: string (nullable = true)
 |-- country_name: string (nullable = true)



### Step 2: Explore and Assess the Data
#### Cleaning Steps
The Visitor Arrivals Program (I-94 Record) data has room for improvement.  These improvements are made the immigration_df dataframe.  Most of the fields in the raw_immigration_df that are represented as decimal data types should be represented as integers, so these values are cast to integers.  The arrival date and departure date are also difficult to parse because of the special SAS date format, so this will be converted to a datetime data type that is more familiar. 

In [52]:
# select fields of interest and cast double to integers
immigration_df = raw_immigration_df.selectExpr(
    'cast(cicid as int) AS id',
    'cast(i94yr as int) AS year',
    'cast(i94mon as int) AS month',
    'cast(arrdate as int) AS arrdate',
    'cast(depdate as int) AS depdate',
    'cast(i94res as int) AS country_of_origin_code',
    'i94addr AS state', 
    'cast(i94bir as int) AS age',
    'gender',
    'cast(i94visa as int) AS visa_category_code'
)

In [53]:
# convert SAS date to date datatype
def to_datetime(sas_date):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(sas_date))
    except:
        return None

sas_to_datetime_udf = udf(lambda sas_date: to_datetime(sas_date), DateType())

immigration_df = immigration_df.withColumn("arrival_date", sas_to_datetime_udf("arrdate"))\
    .withColumn("departure_date", sas_to_datetime_udf("depdate"))\
    .drop("arrdate", "depdate")

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
I have opted to approach the data models using denormalized tables.  The goal of the project is to optimize for querying.  To avoid having to join several tables, I have opted to keep as much relevant information as possible in the table that accounts for US immigration data.  I have also opted to map codes to their names for ease of querying.  For example, I have mapped the visa code to its category name (i.e. visa category type 1 represents business related visas).  I have also mapped the country of origin code to the name of the country itself.  I also included the mapping of country to continent to better understand the continent of origin of the immigrant.

Because the nature of the analysis, it is also useful to add an additional field to the US demographic table to calculate the percent representation of each race in each city.

By having as much information as possible in the immigration data, it will be easier to join with a single other table - the US demographic information table.  These two tables can be joined by state if the data analysts want to better understand where US immigrants are residing.   

#### 3.2 Mapping Out Data Pipelines
The spark job will perform a series of tranformations and then write the data out to S3.  Data validation checks will be performed at the end of the job and if these validations fail then the job will fail.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [54]:
def visa_category(category_code):
    if category_code == 1:
        return "business"
    elif category_code == 2:
        return "pleasure"
    elif category_code == 3:
        return "student"
    else:
        return "unknown"
    
visa_category_udf = udf(lambda category_code: visa_category(category_code), StringType())

immigration_df = immigration_df.withColumn("visa_category", visa_category_udf("visa_category_code"))\
    .drop("visa_category_code")

In [55]:
# join the country of origin data set to map the country of origin code to its name
immigration_df = immigration_df.join(country_code_df, immigration_df.country_of_origin_code == country_code_df.country_id, how='left')\
    .drop("country_of_origin_code", "country_id")

In [56]:
continent_df = spark.read.csv('country_to_continent.csv', header=True)

In [57]:
immigration_df = immigration_df.join(continent_df, immigration_df.country == continent_df.country_name, how='left')\
    .drop("country_name")
immigration_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- visa_category: string (nullable = true)
 |-- country: string (nullable = true)
 |-- continent: string (nullable = true)



In [58]:
def race_percent(race_count, total_population):
    return race_count/total_population

race_percent_udf = udf(race_percent, DoubleType())
us_demo_df = us_demo_df.withColumn("race_percent", race_percent_udf("count", "total_population"))

In [59]:
immigration_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- visa_category: string (nullable = true)
 |-- country: string (nullable = true)
 |-- continent: string (nullable = true)



In [66]:
us_demo_df.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- number_of_veterans: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- state_code: string (nullable = true)
 |-- race: string (nullable = true)
 |-- count: integer (nullable = true)
 |-- race_percent: double (nullable = true)

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+--------------------+
|            city|         state|median_age|male_population|female_population|total_population|number_of_veterans|foreign_born|average_household_size|state_code|                race| count|        race_percent|


In [62]:
# write the immigration data out to parquet files partitioned by year and month of arrival
immigration_df.write.partitionBy("year", "month").parquet("s3://udacity-cbohara/immigration")

In [63]:
# write the US demographic data out to parquet files partitioned by 2 letter state abbreviation (ex: CA for California)
us_demo_df.write.partitionBy("state_code").parquet("s3://udacity-cbohara/demo")

#### 4.2 Data Quality Checks
In order to ensure the data was loaded correctly I will check the content of the S3 output via Athena to ensure it contains the same line count as the data frame.  These tables will also be made available to the data analysts for general queries.

In [64]:
immigration_df_count = immigration_df.count()
us_demo_df_count = us_demo_df.count()

In [None]:
%%sql
-- Create Athena table
CREATE EXTERNAL TABLE public.immigration_fact_table (
id int,
year int,
month int,
state string,
age int,
gender string,
arrival_date date,
departure_date date,
visa_category string,
country string,
continent string)
PARTITIONED BY ( 
year int,
month int)
LOCATION 's3://udacity-cbohara/immigration';

In [None]:
%%sql
-- Check line count equals data frame content
select count(*)
from public.immigration_fact_table;

In [None]:
%%sql
-- Create Athena table
CREATE EXTERNAL TABLE public.us_demographic_dimension_table (
city string,
state string,
median_age double,
male_population int,
female_population int,
total_population int,
number_of_veterans int,
foreign_born int,
average_household_size double,
state_code string,
race string,
count int
race_percent double
)
PARTITIONED BY ( 
state_code string)
LOCATION 's3://udacity-cbohara/demo';

In [None]:
%%sql
-- Check line count equals data frame content
select count(*)
from public.us_demographic_dimension_table;

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Immigration Fact Table
##### Source - i94_jun16_sub.sas7bdat
* id - CIC ID
* year - I94 form submission year
* month - I94 form submission month
* state - state of residence in the United States during visa stay
* age - age of immigrant
* gender - gender of immigrant
* arrival_date - arrival date into the United States (if applicable)
* departure_date - departure date from the United States (if applicable)
* visa_category - type of visa (business, student, or pleasure)
* country - country of origin 
* continent - continent of origin

#### US Demographic Dimension Table
##### Sources - us-cities-demographics.csv, country_code.csv, country_to_continent.csv
* city - name of city
* state - name of state
* median_age - median age of residents in the city
* male_population - total male population count in the city 
* female_population - total female population count in the city
* total_population - total population count in the city
* number_of_veterans - number of veterans in the city
* foreign_born - number of foreign born residents 
* average_household_size - average household size
* state_code - 2 letter abbreviation of the state (ex: CA for California)
* race - name of racial category
* count - count of individuals associated with race in previous column
* race_percent - percentage of individuals who identify as a certain race within city

#### Step 5: Complete Project Write Up
#### Clearly state the rationale for the choice of tools and technologies for the project.
The goal of the project was to create a data lake with data relevant to immigration to the United States. This would include records of immigration and demographics for the cities.  It would be valuable for data analysts to have the ability to query the data lake.  This would allow them to better understand US immigration patterns.  For example, which regions are popular to immigrate to, and for what reason?  What demographic factors influence the migration to a certain location?

I chose to use Spark for the ease of ingesting, transforming, and writing out datasets.  While a cluster was not required for the data sets of this size, it would be very easy to scale up and allocate more resources to the Spark job by spinning up a AWS EMR cluster or transitioning the Spark job to leverage AWS Glue, a serverless Spark service.

I chose to create a denormalized fact table for the immigration to the United States, as well as a denormalized table representing US demographics.  The reason why I chose a denormalized approach is the data sets have been made available specifically for the ability of data analysis.

I chose to use AWS Athena as a query tool because it is a serverless service that makes it easy to query S3 data in place.  You do not have to worry about the cost and maintaince of a AWS Redshift cluster.  It's an ideal tool for simple denormalized data sets that are written to S3 and that require little joins. 

I chose to partition the immigration fact table by year and month to make it easier for users to identify when a I94 record was submitted.  I chose to partition the US demographic dimension table by state because it is a natural categorization for United States demographic data.  These files were written in parquet format, which is the preferred file format for AWS Athena because you are charged by the amount of data you scan per query.  If your data is a more compact parquet format and you only choose certain columns to query, you can reduce the cost per query compared to writing out to CSV files. 

#### Propose how often the data should be updated and why.
The data is updated on an annual basis, making the need for automation very low priority.  It is a task that can be manually enabled once a year to ingest the new I94 data set.

#### Write a description of how you would approach the problem differently under the following scenarios:
##### The data was increased by 100x.
If the data increased, I would transition the job to use an AWS EMR cluster rather than a local machine to handle the increase memory requirements.

##### Pipelines need to be run at 7am
Given how infrequently the data is updated, this would not be a realistic requirement for the project.

##### The database needed to be accessed by 100+ people.
The beauty of AWS Athena is you don't need to manage a data warehouse yourself.  That being said, the standard limit is 20 DML queries (ie SELECT queries) can be run at the same time.  If you are an AWS partner, you can request that these service limits be increased to enable the 100+ people to access the data at a given time.

If the service limit would not be able to be increased,