# Project Title
### Data Engineering Capstone Project

#### Project Summary
In this project our goal is to create an ETL pipeline with the 'I94 immigration data' and 'city temperature data' to create a database that is much optimized for immigration event queries. With the use of this database we can answer questions relating to immigration behavior to destination temperature For example, do people prefer immigrating to warmer places than other places?

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [21]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
import re

### Step 1: Scope the Project and Gather Data

#### Scope 
 We are going to aggregate I94 immigration data by destination city to form our 1st Dimension table. Next we are gpoing to aggregate city temperature data by city to form the 2nd Dimension table. The two datasets will be joined on the destination city to form the Fact table. The final database is optimized accordingly to query on immigration events to determine if temperature affects the selection of destination cities. 'Apache Spark' will be used to process the data present.

#### Describe and Gather Data 
This I94 immigration data comes from the US National Tourism and Trade Office and it is provided in SAS7BDAT format. We will be using the  attributes such as,
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination USA city
* arrdate = arrival date in the USA
* i94yr = 4 digit year
* i94mon = numeric month
* i94mode = 1 digit travel code
* depdate = departure date from the USA
* i94visa = reason for immigration

The temperature data comes from Kaggle. It is provided in csv format. We will be using attributes such as,
* Latitude= latitude
* Longitude = longitude
* AverageTemperature = average temperature
* City = city name
* Country = country name

In [7]:
# Reading April 2016 I94 immigration data for exploration
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immig_df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [31]:
immig_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [9]:
# Reading the temperature data
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temp_df = pd.read_csv(fname, sep=',')

In [10]:
temp_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [24]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 2
#### Explore the Data and Cleaning Steps
As we can see from the dataframe outputs above, many values in i94port column is not normal. so for the I94 immigration data we should drop all the entries where the destination city code i94port is not a valid value such as XXX, 99, etc... which is as described in I94_SAS_Labels_Description.SAS file. And similarly for the temperature data, we should drop all the entries where AverageTemperature is NaN. Also drop all entries with duplicate locations, and then add the i94port of the location in each entry.

In [29]:
# Cleaning I94 immigration data

# Create a dictionary comprised of valid i94port codes
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94port_valid = {}
with open('i94port_valid.txt') as f:
     for line in f:
         match = re_obj.search(line)
         i94port_valid[match[1]]=[match[2]]

def clean_i94_data(file):
    """
    Input: I94 immigration data file path
    
    Output: I94 immigration data with valid i94port in Spark dataframe
    
    """
    
    # Read I94 data into a dataframe
    immigration_df = spark.read.format('com.github.saurfang.sas.spark').load(file)

    # Filter out the entries where i94port is not valid
    immigration_df = immigration_df.filter(immigration_df.i94port.isin(list(i94port_valid.keys())))

    return immigration_df

# Test function
#immigration_data_test = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 
#immigration_df_test = clean_i94_data(immigration_data_test)
#immigration_df_test.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| null|      G|   null|      Y|   null| 1991.0|     D/S|     M|  null|   null|  3.73679633E9|00296|      F1|
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    nul

In [28]:
# cleaning the temperature data
temp_df = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

# Filtering out the entries with NaN average temperature
temp_df = temp_df.filter(temp_df.AverageTemperature != 'NaN')

# Remove duplicate locations
temp_df = temp_df.dropDuplicates(['City', 'Country'])

@udf()
def get_i94port(city):
    """
    Input: Name of the city for which we need the corresponding i94 port
    
    Output: Corresponding i94port
    
    """
    
    for key in i94port_valid:
        if city.lower() in i94port_valid[key][0].lower():
            return key

# Adding iport94 code based on the city
temp_df = temp_df.withColumn("i94port", get_i94port(temp_df.City))

# Removing the entries with no iport94 code
temp_df = temp_df.filter(temp_df.i94port != 'null')

# Show results
# temp_df.show()

+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|     City|             Country|Latitude|Longitude|i94port|
+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|1852-07-01|             15.488|                        1.395|    Perth|           Australia|  31.35S|  114.97E|    PER|
|1828-01-01|             -1.977|                        2.551|  Seattle|       United States|  47.42N|  121.97W|    SEA|
|1743-11-01|              2.767|                        1.905| Hamilton|              Canada|  42.59N|   80.73W|    HAM|
|1849-01-01|  7.399999999999999|                        2.699|  Ontario|       United States|  34.56N|  116.76W|    ONT|
|1821-11-01|              2.322|                        2.375|  Spokane|       United States|  47.42N|  117.24W|    SPO|
|1843-01-01| 18.874000000000002|

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The 1st dimension table will contain events from the I94 immigration data. 

The columns below are to be extracted from the immigration dataframe:
1. i94yr = 4 digit year
2. i94port = 3 character code of destination city
3. arrdate = arrival date
4. i94mode = 1 digit travel code
5. i94mon = numeric month
6. i94cit = 3 digit code of origin city
7. depdate = departure date
8. i94visa = reason for immigration

The 2nd Dimension table will contain city temperature data. 

The columns below are to be extracted from the temperature dataframe:
1. i94port = 3 character code of destination city (mapped from immigration data during cleanup step)
2. Latitude= latitude
3. Longitude = longitude
4. AverageTemperature = average temperature
5. City = city name
6. Country = country name

The Fact table will contain information from the I94 immigration data which is joined with the city temperature data on i94port:

1. i94yr = 4 digit year
2. i94port = 3 character code of destination city
3. arrdate = arrival date
4. i94mode = 1 digit travel code
5. i94mon = numeric month
6. i94cit = 3 digit code of origin city
7. depdate = departure date
8. i94visa = reason for immigration
9. AverageTemperature = average temperature of destination city

The tables that are to be created will be saved to Parquet files format partitioned by the city(i94port).

#### 3.2 Mapping Out Data Pipelines

Described below are the pipeline steps to be executed:

* Clean the I94 data as mentioned in step 2 to create a Spark dataframe immigration_df for each month present.
* Clean the temperature data as mentioned in step 2 to create a Spark dataframe temp_df.
* Create immigration dimension table by selecting relevant columns from immigration_df and write to parquet file format partitioned by i94port
* Create temperature dimension table by selecting relevant columns from temp_df and write to parquet file format partitioned by i94port
* Create fact table by joining immigration and temperature dimension tables on i94port and write to parquet file format partitioned by i94port

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# I94 immigration data path
immigration_data = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

# Clean the I94 immigration data and store as Spark dataframe
immigration_df = clean_i94_data(immigration_data)

# Extract the columns for immigration dimension table
immigration_table = immigration_df.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

# Write the immigration dimension table to parquet files partitioned by i94port
immigration_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [32]:
# Extract the columns for temperature dimension table
temp_table = temp_df.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])

# Write the temperature dimension table to parquet files partitioned by i94port
temp_table.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

In [33]:
# Create temporary views of the immigration and temperature data
immigration_df.createOrReplaceTempView("immigration_view")
temp_df.createOrReplaceTempView("temp_view")

# Creating the fact table by joining the immigration and temperature views using spark sql
fact_table = spark.sql('''
SELECT immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as i94port,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       temp_view.AverageTemperature as temperature,
       temp_view.Latitude as latitude,
       temp_view.Longitude as longitude
FROM immigration_view
JOIN temp_view ON (immigration_view.i94port = temp_view.i94port)
''')

# Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks
This data quality check will ensure that there are adequate number of entries in each table.

In [None]:
# Performing quality checks here
def quality_check(df, description):
    '''
    Input: Dataframe(Spark) and description of the dataframe
    
    Output: Will print if data quality check passed or not
    
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

# Perform data quality check
quality_check(immigration_df, "immigration table")
quality_check(temp_df, "temperature table")

#### 4.3 Data dictionary 
The 1st dimension table will contain events from the I94 immigration data. 

The columns below are to be extracted from the immigration dataframe:
1. i94yr = 4 digit year
2. i94port = 3 character code of destination city
3. arrdate = arrival date
4. i94mode = 1 digit travel code
5. i94mon = numeric month
6. i94cit = 3 digit code of origin city
7. depdate = departure date
8. i94visa = reason for immigration

The 2nd Dimension table will contain city temperature data. 

The columns below are to be extracted from the temperature dataframe:
1. i94port = 3 character code of destination city (mapped from immigration data during cleanup step)
2. Latitude= latitude
3. Longitude = longitude
4. AverageTemperature = average temperature
5. City = city name
6. Country = country name

The Fact table will contain information from the I94 immigration data which is joined with the city temperature data on i94port:

1. i94yr = 4 digit year
2. i94port = 3 character code of destination city
3. arrdate = arrival date
4. i94mode = 1 digit travel code
5. i94mon = numeric month
6. i94cit = 3 digit code of origin city
7. depdate = departure date
8. i94visa = reason for immigration
9. AverageTemperature = average temperature of destination city

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

    Ans: I chose Apache Spark since it has the ability to easily handle multiple file formats including SAS, containing large amounts of data. Spark SQL was used to process the large input files into dataframes and was manipulated via standard SQL join operations to form the additional tables.

* Propose how often the data should be updated and why.

    Ans: For the best result output the data should be updated monthly in conjunction with the current raw file format.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 
     Ans: For scenario where the data is increased by 100x, we could no longer process the data available as a single batch job. One of the idea would be to do incremental updates using a tool such as Uber's Hudi. We could also consider moving to a cloud service where we can take advantage of services such as AWS EMR or Azure databrick where Spark can be run in cluster mode using a cluster manager such as Yarn.
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
     Ans: If the data needs to populate a dashboard daily to meet an SLA then we could use a AWS EMR in conjunction with Control-M to setup scheduled jobs or we can use scheduling tool such as Airflow to run the ETL pipeline.
 
 * The database needed to be accessed by 100+ people.
     
     Ans: If the database is needed to be accessed by 100+ people, we could consider publishing the parquet files to AWS S3 (object store)/HDFS and giving read access to users that need it. If the users want to run SQL queries on the raw data, we could then consider publishing to HDFS using a tool such as Impala.