# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project focuses on building an ETL pipeline using I-94 immigration data and tempertaure data from various cities as the two data-sources. The data is read, cleansed, appropriately merged (joined) and finally stored in disk in partitions (for optimal querying).

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

from misc_utils import MiscUtils

In [4]:
mu_obj = MiscUtils()

### Step 1: Scope the Project and Gather Data

#### Scope 
Data from I-94 immigration records are aggregated by destination city. Likewise, data from city-temperature records are aggregated by city. Then, the two tables are joined (on destination city) to form a fact table.

#### Describe and Gather Data 
The I94 immigration data comes from the US National Tourism and Trade Office. It is provided in SAS7BDAT format which is a binary database storage format. Some relevant attributes include:

    i94yr = 4 digit year
    i94mon = numeric month
    i94cit = 3 digit code of origin city
    i94port = 3 character code of destination USA city
    arrdate = arrival date in the USA
    i94mode = 1 digit travel code
    depdate = departure date from the USA
    i94visa = reason for immigration

The temperature data comes from Kaggle. It is provided in csv format. Some relevant attributes include:

    AverageTemperature = average temperature
    City = city name
    Country = country name
    Latitude= latitude
    Longitude = longitude

In [5]:
# Read in the I94 data here
# for speed/convenience, we use only one month data: Apr 2016.
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
i94_df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [6]:
i94_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [7]:
print(i94_df.shape)

(3096313, 28)


In [8]:
# Read the temperature data
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
cityTemp_df = pd.read_csv(fname, sep=",")

In [9]:
print(cityTemp_df.shape)

(8599212, 7)


In [10]:
cityTemp_df.head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


#### Step 2: Clean the Data

Some of the cleansing steps taken are:
- Remove observations that have invalid port-code
- Round up the average-temperature and avg-temperature-uncertainty values (large decimal values is not useful).
- Remove observations where average temperature is null
- drop duplicate locations


In [11]:
# Spark session to read SAS7BDAT formatted files.
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [12]:
df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_spark.show(n=3)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null|1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| n

#### Reading and Cleaning SAS Label Descriptions
Read the given SAS descriptions and have the port-code and values in a separate .txt file. Then, create a data-dictionary to have a good mapping between the city-code and city-name. While doing so, ensure that the names do not have extra spaces (as in the original SAS label description), strip from both sides etc. This code is in a separate file called misc_utils.py.

In [None]:
# Read all valid I94 ports from SAS Label descriptions
valid_i94_ports_dict = mu_obj.clean_ports('valid_i94_ports.txt')
#valid_i94_ports_dict.keys()

In [14]:
# Remove all entries with invalid/unknown port-codes and 
# only keep those with valid port-codes.
def clean_i94_data(i94_file):
    raw_i94_df = spark.read.format('com.github.saurfang.sas.spark').load(i94_file)
    validated_i94_df = raw_i94_df.filter(raw_i94_df.i94port.isin(list(valid_i94_ports_dict.keys())))
    return validated_i94_df

In [15]:
# Consider only Apr 2016 I-94 dataset (for short run cycle)
sample_i94_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 
sample_cleansed_i94_df = clean_i94_data(sample_i94_file)

In [16]:
mu_obj.spark_df_shape(sample_cleansed_i94_df, 'cleansed i94')

Number of rows in cleansed i94 dataset: 3088576
Number of columns in cleansed i94 dataset: 28


In [17]:
sample_cleansed_i94_df.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| null|      G|   null|      Y|   null| 1991.0|     D/S|     M|  null|   null|  3.73679633E9|00296|      F1|
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    nul

In [18]:
# Clean temperature data
raw_temperature_df = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")
mu_obj.spark_df_shape(raw_temperature_df, 'raw temperature')
raw_temperature_df.show(5)

# Filter out entries with null average temperature
filtered_temperature_df = raw_temperature_df.filter(raw_temperature_df.AverageTemperature != 'null')
print("\n")
mu_obj.spark_df_shape(filtered_temperature_df, 'filtered temperature')
filtered_temperature_df.show(5)

# Remove duplicate locations
cleansed_temperature_df = filtered_temperature_df.dropDuplicates(['City', 'Country'])
print("\n")
mu_obj.spark_df_shape(cleansed_temperature_df, 'cleansed temperature')
cleansed_temperature_df.show(5)

Number of rows in raw temperature dataset: 8599212
Number of columns in raw temperature dataset: 7
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



Number of rows in f

In [19]:
@udf()
def get_i94port(city):
    '''
    Input: City name
    Output: Corresponding i94port
    '''
    for key, _ in valid_i94_ports_dict.items():
        if city.strip().lower() in valid_i94_ports_dict[key].lower():
            return key

In [20]:
# Add i94_port code based on city name
# And, round the temperatures to 2 digits after decimal point.
import pyspark.sql.functions as func

temperature_df = cleansed_temperature_df.withColumn("i94port", get_i94port(cleansed_temperature_df.City))
temperature_df = temperature_df.withColumn("AverageTemperature", func.round(temperature_df["AverageTemperature"], 2))
temperature_df = temperature_df.withColumn("AverageTemperatureUncertainty", func.round(temperature_df["AverageTemperatureUncertainty"], 2))

temperature_df.show(5)

+----------+------------------+-----------------------------+-----------+-------------+--------+---------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|       City|      Country|Latitude|Longitude|i94port|
+----------+------------------+-----------------------------+-----------+-------------+--------+---------+-------+
|1743-11-01|              3.26|                         1.67|  Allentown|United States|  40.99N|   74.56W|   null|
|1779-11-01|              0.01|                         2.71|     Atyrau|   Kazakhstan|  47.42N|   50.92E|   null|
|1825-01-01|             26.07|                         2.16|    Bintulu|     Malaysia|   2.41N|  113.30E|   null|
|1825-01-01|             26.52|                         2.58|Butterworth|     Malaysia|   5.63N|  100.09E|   null|
|1845-01-01|              25.0|                         1.87|     Cainta|  Philippines|  15.27N|  120.83E|   null|
+----------+------------------+-----------------------------+-----------+-------

In [21]:
# Remove entries with no i94_port code
temperature_df = temperature_df.filter(temperature_df.i94port != 'null')
temperature_df.show(5)

+----------+------------------+-----------------------------+--------+-------------+--------+---------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|    City|      Country|Latitude|Longitude|i94port|
+----------+------------------+-----------------------------+--------+-------------+--------+---------+-------+
|1852-07-01|             15.49|                          1.4|   Perth|    Australia|  31.35S|  114.97E|    PER|
|1828-01-01|             -1.98|                         2.55| Seattle|United States|  47.42N|  121.97W|    SEA|
|1743-11-01|              2.77|                         1.91|Hamilton|       Canada|  42.59N|   80.73W|    HAM|
|1849-01-01|               7.4|                          2.7| Ontario|United States|  34.56N|  116.76W|    ONT|
|1821-11-01|              2.32|                         2.38| Spokane|United States|  47.42N|  117.24W|    SPO|
+----------+------------------+-----------------------------+--------+-------------+--------+---------+-

In [22]:
# This takes a long time. Do not run often.
# mu_obj.spark_df_shape(temperature_df, 'temperature_df')

### Step 3: Define the Data Model

#### 3.1 Conceptual Data Model
The first dimension table will contain events from the I94 immigration data. The columns below will be extracted from the immigration dataframe:

The immigration table comes from the I-94 Immigration data. Each report (entry) contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries). From this, the following columns are extracted.

    i94yr = 4 digit year
    i94mon = numeric month
    i94cit = 3 digit code of origin city
    i94port = 3 character code of destination city
    arrdate = arrival date
    i94mode = 1 digit travel code
    depdate = departure date
    i94visa = reason for immigration

The second table is the city temperature data gotten from Kaggle. The following fields are considered:

    i94port = 3 character code of destination city (mapped from immigration data during cleanup step)
    AverageTemperature = average temperature
    City = city name
    Country = country name
    Latitude= latitude
    Longitude = longitude

The fact table will contain information from the I94 immigration data joined with the city temperature data on i94port:

    i94yr = 4 digit year
    i94mon = numeric month
    i94cit = 3 digit code of origin city
    i94port = 3 character code of destination city
    arrdate = arrival date
    i94mode = 1 digit travel code
    depdate = departure date
    i94visa = reason for immigration
    AverageTemperature = average temperature of destination city

The tables will be saved to Parquet files partitioned by city (i94port).


#### 3.2 Mapping Out Data Pipelines
Here are the steps that are taken:
- A spark dataframe is computed from cleansed I94 data
- A spark dataframe is computed from cleansed Temperature data
- Create an immigration dimension table and store as Parquet files (partitioned by i94port)
- Create a temperature dimension table and store as Parquet files (partitioned by i94port)
- Compute a fact table by joining the above two tables based on i94port.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [24]:
# Path to I94 immigration data 
#immigration_data = '/data/18-83510-I94-Data-2016/*.sas7bdat'
immigration_data = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

# Clean I94 immigration data and store as Spark dataframe
df_immigration = clean_i94_data(immigration_data)

# Extract columns for immigration dimension table
immigration_table = df_immigration.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

# Write immigration dimension table to parquet files partitioned by i94port
immigration_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [25]:
# Extract columns for temperature dimension table
temperature_dim_tbl = temperature_df.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])

# Write temperature dimension table to parquet files partitioned by i94port
temperature_dim_tbl.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

In [26]:
# Create temporary views of the immigration and temperature data
df_immigration.createOrReplaceTempView("immigration_view")
temperature_df.createOrReplaceTempView("temp_view")

# Create the fact table by joining the immigration and temperature views
fact_table = spark.sql('''
SELECT immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as i94port,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       temp_view.AverageTemperature as temperature,
       temp_view.Latitude as latitude,
       temp_view.Longitude as longitude
FROM immigration_view
JOIN temp_view ON (immigration_view.i94port = temp_view.i94port)
''')

# Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks
The only check I do here is to count the final size of the dataframes and ensure they are not empty.

In [27]:
# Perform quality checks here
def quality_check(df, description):
    '''
    Input: Spark dataframe, description of Spark datafram
    Output: Print outcome of data quality check
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

# Perform data quality check
quality_check(df_immigration, "immigration table")
quality_check(temperature_df, "temperature table")

Data quality check passed for immigration table with 3088576 records
Data quality check passed for temperature table with 175 records


0

#### 4.3 Data dictionary 

The first dimension table will contain events from the I94 immigration data. The columns below will be extracted from the immigration dataframe:

The immigration table comes from the I-94 Immigration data. Each report (entry) contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries). From this, the following columns are extracted.

    i94yr = 4 digit year
    i94mon = numeric month
    i94cit = 3 digit code of origin city
    i94port = 3 character code of destination city
    arrdate = arrival date
    i94mode = 1 digit travel code
    depdate = departure date
    i94visa = reason for immigration

The second table is the city temperature data gotten from Kaggle. The following fields are considered:

    i94port = 3 character code of destination city (mapped from immigration data during cleanup step)
    AverageTemperature = average temperature
    City = city name
    Country = country name
    Latitude= latitude
    Longitude = longitude

The fact table will contain information from the I94 immigration data joined with the city temperature data on i94port:

    i94yr = 4 digit year
    i94mon = numeric month
    i94cit = 3 digit code of origin city
    i94port = 3 character code of destination city
    arrdate = arrival date
    i94mode = 1 digit travel code
    depdate = departure date
    i94visa = reason for immigration
    AverageTemperature = average temperature of destination city

The tables will be saved to Parquet files partitioned by city (i94port).


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

The datasets given are very large. The number of observations run into millions. To preprocess, analyse and perform data-intensive computations on them, Spark is an ideal technology to use.

* Propose how often the data should be updated and why.
Both I-94 Immigration dataset and Temperature dataset have monthly aggregates, it would be preferabe to update the datasets on a monthly basis.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 In this case, one should ensure we run increase the number of spark nodes appropriately. 'Appropriately' - A given SLA would determine (trail and error) how many nodes are needed to meet the SLA.
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
 An ETL tool like Airflow can be used the previous night to ensure (with retries, email notification etc) that data is properly staged, pre-processed and necessary KPIs/metrics are extracted and available to dashboard program (Tableau..) in a nice partitioned manner.
 
 * The database needed to be accessed by 100+ people.
 In this case, it can be stored in some scalable, high-performance db like RedShift, Snowflake or BigQuery. It is to be noted that the data must be properly partitioned, indexed and given the right permissions to glitch-free access my large number of consumers. Furthermore, only the necessary fields are to be kept and the rest can be discarded.
 
 * Purpose of this project.
 The final data model contains data from both I-94 immigration arrival information and the respective cities temperature details. This information will be assimilated monthly. This will act as a rich source of information to :
     1. Understand the travel patterns during different months of the years.
     2. Understand the reasons why certain ports have high traffic than the others.
     3. Helps in capacity-planning for city/airport officials and be prepared for the varying inflows.
     4. Helps in demographic analysis (based on Visa information of immigrants) to see what percentage of influx (to a city) are students, professionals, family folsk, etc and maybe some amount of attribution could be done for seasonality and important days of a year (Christmas, Thanksgiving, graduation, ...)


In order for the downstream analysts to answer such questions, a clean, high-quality dataset must be prepared and ready to be accessed. Hence, the purpose of this project is enable downstream data-analysts/scientists to be able to access quality data (without redundancies and other errors). Perhaps, for faster access to data-analysts and reporting folks, we could export this data into a RedShift Database so that they could visualize using Tableau or something similar.