## Data Engineering Capstone Project

#### Project Summary
USA immigration flows analysis

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import re
import json
from pathlib import Path
import pandas as pd

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

from datetime import datetime, timedelta

### Step 1: Scope the Project and Gather Data


#### Scope 
Analyze how the immigration flows in USA change during the year and how the immigration demographics are distributed <br>

Datasets used: 
- I94 Immigration Data
- U.S. City Demographic Data
- Airports

The data will be gathered in parquet files (if origin data splitted), then cleaned and filtered to remove invalid and unnecessary data and then stored in other parquet files containing fact and dimensions tables.
Quality checks will be executed on the data.<br>
The final dataset can be used to find the number of immigrants travelling to USA each month for the year 2016, from where, for how long, to where, in which city and similar.

#### Describe and Gather Data 


We will use Spark to gather and trasform the data

In [2]:
# Create or get the Spark session
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()




And this are the datasets that we will use:

##### I94 Immigration Data
This data comes from the US National Tourism and Trade Office. The data comes from [here](https://www.trade.gov/national-travel-and-tourism-office). It includes information about the immigration arrivals and departures from various countries into different US cities with reason for the travel


The dataset contains data of the year 2016 and is stored into <br>
`../../data/18-83510-I94-Data-2016/`  <br>
divided into different files, let's load the data

In [3]:
%%bash
# Run this cell to start from a clean environment
rm -rf sas_data.parquet
rm -rf pipeline_output

In [4]:
# Check how the data is split
for path in Path('../../data/18-83510-I94-Data-2016/').rglob('i94_???16_sub.sas7bdat'):
    print(path.resolve())

/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat
/data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat
/data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat
/data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat
/data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat
/data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat
/data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat
/data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat
/data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat
/data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat
/data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat
/data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat


In [5]:
# Save the raw data into a signle local parquet for the immigration data
for path in Path('../../data/18-83510-I94-Data-2016/').rglob('i94_???16_sub.sas7bdat'):
    df_spark = spark.read.format('com.github.saurfang.sas.spark').load(str(path.resolve()))
    df_spark.write.mode('append').partitionBy("i94mon").parquet("sas_data.parquet")


In [6]:
# Read the immigration data
df_immigration = spark.read.parquet("sas_data.parquet")

In [7]:
# Dataset dimensions
(df_immigration.count(), len(df_immigration.columns))

(40790529, 28)

In [8]:
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = t

##### U.S. City Demographic Data
This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. More info can be found [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/information/)

In [9]:
fname = 'us-cities-demographics.csv'
df_demographic = spark.read.options(header=True,
                        inferSchema=True,
                        delimiter=';'
                       ).csv(fname)

In [10]:
(df_demographic.count(), len(df_demographic.columns))

(2891, 12)

In [11]:
df_demographic.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



##### Airports
This dataset contains information about the main airports in the world, with city, position and timezone. More info [here](http://www.lsv.fr/~sirangel/teaching/dataset/)

In [12]:
fname = 'Airports.csv'
df_airports = spark.read.options(header=True,
                        inferSchema=True,
                        delimiter=';',
                        escape='\\',
                       ).csv(fname)


In [13]:
(df_airports.count(), len(df_airports.columns))

(4246, 10)

In [14]:
df_airports.printSchema()

root
 |-- Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- IATA: string (nullable = true)
 |-- ICAO: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Altitude: string (nullable = true)
 |-- Timezone: integer (nullable = true)
 |-- DST: string (nullable = true)



### Step 2: Explore and Assess the Data
#### Explore the Data 
The immigration datasets contains a lot of dirt data, with duplicates and invalid data.
These are the cleaning steps

#### Cleaning Steps


- Read the Labels description, to remove unwanted data

In [15]:
with open("I94_SAS_Labels_Descriptions_transformed.json",'r') as f:
    labels_data = json.load(f)
def airport_code_valid(code):
    global labels_data
    return str(code) in labels_data["i94port"]




In [16]:
# UDF to check if the airport codes are valid
udf_airport_code_valid = F.udf(airport_code_valid, T.BooleanType())

In [17]:
#UDF to check if the state code is valid
udf_valid_state_code = F.udf(lambda state: state != "99", T.BooleanType())

- Remove rows with null values in the most useful rows for our analysis
- Remove duplicates row
- Check that no invalid airport code exists

In [18]:
df_immigration = df_immigration.dropna(how = "any", subset=["i94port","cicid","arrdate","i94addr"]) \
                                        .dropDuplicates() \
                                        .where(udf_airport_code_valid(F.col("i94port")) \
                                               & udf_valid_state_code(F.col("i94addr")))



Rows count after cleaning

In [19]:
df_immigration.count()

38760226

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The data model use is a star schema, since the main desired output of the analysis is how the immigration flows change during the year, it is best to map the immigration movements in a fact table and all the information around them in dimension tables


#### 3.2 Mapping Out Data Pipelines
To create this model we need to do the following:
- Remove columns with a lot of invalid values or not useful for the scope or that contains private data
- Rename columns if needed
- Create a table for each desired dimension: cities, states, airports
- Create the fact table: immigrations
- Convert dates to timestamps
- Store tables

    

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model


Polish immigration dataframe

In [20]:
# Drop CIC and not useful columns
df_immigration = df_immigration.drop("entdepa",
                                     "entdepd",
                                     "entdepu",
                                     "occup",
                                     "insnum",
                                     "admnum",
                                     "dtadfile",
                                     "visapost",
                                     "dtaddto")


In [21]:
def convert_to_datetime(n):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(n))
    except:
        return None

udf_convert_to_datetime = F.udf(convert_to_datetime, T.DateType())

In [22]:
udf_convert_matflag = F.udf(lambda x: x=='M', T.BooleanType())

In [23]:
# Convert dates into datetime
df_immigration = df_immigration.withColumn("arrdate", udf_convert_to_datetime("arrdate")) \
            .withColumn("depdate", udf_convert_to_datetime("depdate"))\
            .withColumn("matflag",udf_convert_matflag("matflag"))

In [24]:
# Rename columns giving more meanings
df_immigration = df_immigration.withColumnRenamed("cicid","id")\
                                .withColumnRenamed("i94yr","i94year")\
                                .withColumnRenamed("i94cit","i94origin")\
                                .withColumnRenamed("i94res","i94residence")\
                                .withColumnRenamed("i94port","i94destination")\
                                .withColumnRenamed("arrdate","i94arrivaldate")\
                                .withColumnRenamed("i94mode","i94travelcode")\
                                .withColumnRenamed("i94addr","i94state")\
                                .withColumnRenamed("depdate","i94departuredate")\
                                .withColumnRenamed("i94bir","i94age")\
                                .withColumnRenamed("matflag","matchflag")\
                                .withColumnRenamed("biryear","i94birth")\
                                .withColumnRenamed("fltno","i94flightnumber")\
                                .withColumnRenamed("i94mon","i94month")

In [25]:
df_immigration.printSchema()

root
 |-- id: double (nullable = true)
 |-- i94year: double (nullable = true)
 |-- i94origin: double (nullable = true)
 |-- i94residence: double (nullable = true)
 |-- i94destination: string (nullable = true)
 |-- i94arrivaldate: date (nullable = true)
 |-- i94travelcode: double (nullable = true)
 |-- i94state: string (nullable = true)
 |-- i94departuredate: date (nullable = true)
 |-- i94age: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- matchflag: boolean (nullable = true)
 |-- i94birth: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- i94flightnumber: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- i94month: double (nullable = true)



Create dataframes for cities and states

In [26]:
#Drop not useful columns, rename columns better and remove duplicates
df_cities = df_demographic.drop("Count","Number of Veterans","Average Household Size")\
                    .withColumnRenamed("State Code","State_Code")\
                    .withColumnRenamed("Median Age","Median_age")\
                    .withColumnRenamed("Male Population","Male_Population")\
                    .withColumnRenamed("Female Population","Female_Population")\
                    .withColumnRenamed("Total Population","Total_Population")\
                    .withColumnRenamed("Foreign-born","Foreign_born")\
                    .dropDuplicates()

In [27]:
#Create states from cities
df_states = df_cities.select("State_Code",
                             "State")\
                    .withColumnRenamed("State_Code","code")\
                    .withColumnRenamed("State","name")

In [28]:
#Drop duplicate column
df_cities = df_cities.drop("State")

In [29]:
df_cities.printSchema()

root
 |-- City: string (nullable = true)
 |-- Median_age: double (nullable = true)
 |-- Male_Population: integer (nullable = true)
 |-- Female_Population: integer (nullable = true)
 |-- Total_Population: integer (nullable = true)
 |-- Foreign_born: integer (nullable = true)
 |-- State_Code: string (nullable = true)
 |-- Race: string (nullable = true)



In [30]:
df_states.printSchema()

root
 |-- code: string (nullable = true)
 |-- name: string (nullable = true)



Create dataframe for airports

In [31]:
# Remove the not useful/duplicate columns
different_usa_airports = df_immigration.select(F.col("i94destination")).distinct()
df_airports = df_airports.join(different_usa_airports ,df_airports.IATA==different_usa_airports.i94destination)
df_airports = df_airports.drop(df_airports.i94destination)\
                        .dropDuplicates()
df_airports.printSchema()

root
 |-- Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- IATA: string (nullable = true)
 |-- ICAO: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Altitude: string (nullable = true)
 |-- Timezone: integer (nullable = true)
 |-- DST: string (nullable = true)



In [32]:
# Store immigration data using partions since the higher number of rows
df_immigration.write.partitionBy("i94month").parquet("pipeline_output/immigration.parquet")

In [33]:
# No partition needed for the other dataframes
df_cities.write.parquet("pipeline_output/cities.parquet")

In [34]:
df_states.write.parquet("pipeline_output/states.parquet")

In [35]:
df_airports.write.parquet("pipeline_output/airports.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [36]:
# Load all the dataframes
df_immigration = spark.read.parquet("pipeline_output/immigration.parquet")

In [37]:
df_cities = spark.read.parquet("pipeline_output/cities.parquet")

In [38]:
df_states = spark.read.parquet("pipeline_output/states.parquet")

In [39]:
df_airports = spark.read.parquet("pipeline_output/airports.parquet")

 Check that at least a row is imported in each table

In [41]:
def check_not_empty(data, data_name):
    if not df_airports.count() > 0:
        raise ValueError(f"{data_name} data is empty") 
    return True

In [42]:
check_not_empty(df_airports,"Airports")

True

In [43]:
check_not_empty(df_cities,"Cities")

True

In [44]:
check_not_empty(df_airports,"States")

True

In [45]:
check_not_empty(df_immigration,"Immigration")

True

Check that common keys exist between tables 

In [46]:
def check_common_keys(data_1, name_1, data_2, name_2, key_1,key_2):
    if not data_1.join(data_2, getattr(data_1, key_1) == getattr(data_2, key_2)).count() > 0:
            raise ValueError(f"No common keys between {name_1} and {name_2}")
    return True

In [47]:
check_common_keys(df_airports,"Airports",df_immigration,"Immigration","IATA","i94destination")

True

In [48]:
check_common_keys(df_states,"States",df_immigration,"Immigration","code","i94state")


True

In [49]:
check_common_keys(df_cities,"Cities",df_immigration,"Immigration","State_Code","i94state")

True

#### 4.3 Data dictionary 
- Immigration
 | Field | Description | Source |
 | --- | --- | --- |
 | id: double PRIMARY KEY | primay key of the table | I94 Immigration Data
 | i94year: double | 4 digits year of the insert | I94 Immigration Data
 | i94origin: double | origin airport | I94 Immigration Data
 | i94residence: double | residence of the person | I94 Immigration Data
 | i94destination: string FOREIGN KEY | code of the destination | I94 Immigration Data
 | i94arrivaldate: date | arrival date of the person | I94 Immigration Data
 | i94travelcode: double |  how the person arrived1 = 'Air' 2 = 'Sea' 3 = 'Land' 9 = 'Not reported' | I94 Immigration Data
 | i94state: string FOREIGN KEY | code of the state | I94 Immigration Data
 | i94departuredate: date | date of departure | I94 Immigration Data
 | i94age: double | age of the person | I94 Immigration Data
 | i94visa: double | visa category   1 = Business 2 = Pleasure 3 = Student | I94 Immigration Data
 | count: double | number of person that applied for the given id | I94 Immigration Data
 | matchflag: boolean | Math of arrival and departure record | I94 Immigration Data
 | i94birth: double | year of birth of the person | I94 Immigration Data
 | gender: string | gender of the person | I94 Immigration Data
 | airline: string | airline of the travel | I94 Immigration Data
 | i94flightnumber: string | number of the flight I94 Immigration Data
 | visatype: string |  class of admission legally admitting the non-immigrant to temporarily stay in U.S. | I94 Immigration Data
 | i94month: double | month of the insertion | I94 Immigration Data
 
- Cities
 
 | Field | Description | Source |
 | --- | --- | --- |
 | City: string | Name of the city | U.S. City Demographic Data |
 | Median_age: double | Median age of the people living in the city | U.S. City Demographic Data |
 | Male_Population: integer | Number of male in the city | U.S. City Demographic Data |
 | Female Population: integer | Number of female in the city | U.S. City Demographic Data |
 | Total Population: integer | Total population in the city | U.S. City Demographic Data |
 | Foreign_born: integer | Number of born foreigner in the city | U.S. City Demographic Data |
 | State_Code: string FOREIGN KEY | code of the state containing the city | U.S. City Demographic Data |
 | Race: string | Prevalent race of the city | U.S. City Demographic Data |

- States
 
 | Field | Description | Source |
 | --- | --- | --- |
 | code: string PRIMARY KEY | Code of the state | U.S. City Demographic Data | 
 | name: string | Name of the state | U.S. City Demographic Data |
 
 
- Airports
 | Field | Description | Source |
 | --- | --- | --- |
 | Name: string | Name of airport | Airports |
 | City: string | Main city served by airport | Airports |
 | Country: string | Country or territory where airport is located | Airports |
 | IATA: string PRIMARY KEY | 3-letter IATA code | Airports |
 | ICAO: string | 4-letter ICAO code | Airports |
 | Latitude: double | Decimal degrees, usually to six significant digits. Negative is South, positive is North | Airports |
 | Longitude: double | Decimal degrees, usually to six significant digits. Negative is West, positive is East | Airports |
 | Altitude: integer | Altitude in feet | Airports |
 | Timezone: integer | Hours offset from UTC | Airports |
 | DST: string | Daylight savings time. One of E (Europe), A (US/Canada), S (South America), O (Australia), Z (New Zealand), N (None) or U (Unknown) | Airports |
 

#### Step 5: Complete Project Write Up

- Query example

Let's check if the pipeline executed correctly.
Find the number of people travelling in the city of Atlanta in August 2016 and compare it with the number of foreigner living there 

In [50]:
atlanta_airport_code = df_airports.select("IATA").where(F.col("City")=="Atlanta").collect()


In [51]:
number_immigrants = df_immigration.where((F.col("i94destination") == atlanta_airport_code[0]["IATA"]) & \
                     (F.col("i94month") == 8) & \
                    (F.col("i94year") == 2016)).count()

In [52]:
foreign = df_cities.select("Foreign_born").where(F.col("City") == "Atlanta").collect()


In [53]:
number_immigrants/foreign[0]["Foreign_born"]

3.4206959020489753

The number of foreign born people in the city of Atlanta in August 2016 incremented about 3.4 times

Now let's answer some questions about the project

- Choice of tools and technologies for the project.

The main technology used in this project is Apache Spark, that is very useful to process and analyze a big quantity of data with easy scalability. Since we had to merge and transform the data before using it it's the perfect choice for the task.

- How often the data should be updated and why

The data should be updated monthly/yearly to be able to capture the trends between the years and discover patters between the months. 

- What if the data was increased by 100x.

If the data increased 100x a bigger spark cluster is needed, with more workers and so bigger processing data. The easiest way to achieve this is to use AWS S3 and EMR. Also a finer parquet division would help (i.e by day)

- What if the data populates a dashboard that must be updated on a daily basis by 7am every day.

We need to schedule the execution of the pipeline, with a tool like Airflow, making sure that the SLA (Service Level Agreement) is configured for that time

- What if the database needed to be accessed by 100+ people.

We need to configure Apache Spark to enable concurrent access of the team.
If data is stored into a database, make sure that it supports atomic operations.
We can use AWS Redshift that support virtually unlimited concurrent users and concurrent queries based on the number of cluster or Spark Hive by setting concurrency values
