# Data Engineering Nanodegree

## Capstone Project: Immigration Data in the United States
Daniel Vargas
March 14, 2020
#### Project Summary
In this project, we will be looking at the `United States` immigration data, specifically:

- Effects of temperature on the volume of travellers
- Seasonality of travel
- Connections between the volume of travel and the number of entry ports (airports)
- Connections between the volume of travel and the demographics of various cities

We will be using the following datasets:

- `I94 Immigration Data`: data from the `US National Tourism and Trade Office` and includes the contents of the `i94` form on entry to the `United States`
- `countries.csv`: table containing country codes used in the dataset
- `i94portCodes.csv`: table containing city codes used in the dataset
- `World Temperature Data`: dataset from Kaggle including the temperatures of various cities in the world between 1743 and 2013
- `U.S. City Demographic Data`: data from `OpenSoft`, containing information about the demographics of all US cities and census-designated places with a population greater or equal to `65,000`. The data comes from the `US Census Bureau's 2015 American Community Survey`
- `Airport Code Table`: table of airport codes and corresponding cities

#### Project steps:
1. Scope the project and gather the data
2. Explore and assess the data
3. Define the data model
4. Run `ETL` to model the data
5. Complete project write up

In [24]:
import datetime
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import udf, date_add, desc, asc
from pyspark.sql.functions import sum as Fsum

### Scope the project and gather data

#### Scope 
- Aggregate `I94 immigration data` by destination city (dimension table)
- Aggregate `city temperature` by city (dimension table)
- Join both datasets on destination city (fact table)
- Optimize to query on immigration events to determine if temperature affects the selection of destination cities

#### Describe and Gather Data
`I94 immigration` attributes:
- `i94yr`: 4 digit year
- `i94mon`: numeric month
- `i94cit`: 3 digit code of origin city
- `i94port`: 3 character code of destination USA city
- `arrdate`: arrival date in the USA
- `i94mode`: 1 digit travel code
- `depdate`: departure date from the USA
- `i94visa`: reason for immigration

`Temperature` data attributes:
`AverageTemperature` = average temperature
- `City`: city name
- `Country`: country name
- `Latitude`: latitude
- `Longitude`: longitude

In [25]:
df_immig_sample = pd.read_csv('immigration_data_sample.csv')
df_countryCodes = pd.read_csv('countries.csv')
i94portCodes = pd.read_csv('i94portCodes.csv')
df_demographics = pd.read_csv('us-cities-demographics.csv', sep=';')
df_airports = pd.read_csv('airport-codes_csv.csv')
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = pd.read_csv(fname)

In [26]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_immigration = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [27]:
df_immigration.count()

3096313

In [28]:
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [29]:
# write to parquet, commented to avoid error: alysisException: 'path file:/home/workspace/sas_data already exists.;'
# df_immigration.write.parquet("sas_data")
# df_immigration=spark.read.parquet("sas_data")

### Explore and assess the data
#### Explore the Data 
For the I94 immigration data, we want to drop all entries where the destination city code i94port is not a valid value (e.g., XXX, 99, etc) as described in I94_SAS_Labels_Description.SAS. For the temperature data, we want to drop all entries where AverageTemperature is NaN, then drop all entries with duplicate locations, and then add the i94port of the location in each entry.

#### Cleaning Steps
Document steps necessary to clean the data

In [30]:
# Keep only data for the United States
df_temperature = df_temperature[df_temperature['Country']=='United States']
# Convert the date to datetime objects
df_temperature['convertedDate'] = pd.to_datetime(df_temperature.dt)
# Remove all dates prior to 1950
df_temperature=df_temperature[df_temperature['convertedDate']>"1950-01-01"].copy()
df_temperature[['City','convertedDate']].drop_duplicates().shape

(189472, 2)

In [31]:
# Remove missing values (Africa)
df_airports = df_airports[df_airports['iso_country'].fillna('').str.upper().str.contains('US')].copy()

# closed airports
# No for balloonports, seaplane bases or heliport
excludedValues = ['closed', 'heliport', 'seaplane_base', 'balloonport']
df_airports = df_airports[~df_airports['type'].str.strip().isin(excludedValues)].copy()

# Municipality field non-available for all airports
df_airports = df_airports[~df_airports['municipality'].isna()].copy()

# U-A seems error correction
df_airports['len'] = df_airports["iso_region"].apply(len)
# let's remove the codes that are incorrect.
df_airports = df_airports[df_airports['len']==5].copy()
# finally, let's extract the state code
df_airports['state'] = df_airports['iso_region'].str.strip().str.split("-", n = 1, expand = True)[1]

In [35]:
# Convert the to upper case and remove leading and trailing spaces
df_demographics.City = df_demographics.City.str.upper().str.strip()
# Remove leading / trailing spaces and convert to upper case
df_demographics.City = df_demographics.City.str.strip().str.upper()
df_immigration.createOrReplaceTempView("immig_table")

In [36]:
spark.sql("""
SELECT COUNT (DISTINCT cicid)
FROM immig_table
""").show()

+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              3096313|
+---------------------+



In [37]:
# Convert the arrdate field
# Compute the arrival dates by adding arrdate to 1960-01-01
df_immigration = spark.sql("SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date FROM immig_table")
df_immigration.createOrReplaceTempView("immig_table")

### Define the data model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Run `ETL` to model the data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Complete project write up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.