# ETL Pipleine for Immigration Data
### Data Engineering Capstone Project

#### Project Summary
The goal of this project is to create a database that is optimised to query immigration events. This will be used by Data Analysts to determine if temperature affects the selection of destination cities.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Import all the the libraries
import pandas as pd, re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

### Step 1: Scope the Project and Gather Data

#### Scope 

##### Goal:

The goal of this project is to create a database that is optimised to query immigration events. This will be used by Data Analysts to determine if temperature affects the selection of destination cities.

##### Schema:

Schema consists of 1 Fact table and 2 Dimension tables. 

In this project, we will aggregate I94 immigration data by destination city to form our first dimension table. Next we will aggregate city temperature data by city to form the second dimension table. The two datasets will be joined on destination city to form the fact table. The final database is optimized to query on immigration events to determine  Spark will be used to process the data.

#### Data Description 

The I94 immigration data comes from the US National Tourism and Trade Office. It is provided in SAS7BDAT format which is a binary database storage format. Some relevant attributes include:

- i94yr = 4 digit year
- i94mon = numeric month
- i94cit = 3 digit code of origin city
- i94port = 3 character code of destination USA city
- arrdate = arrival date in the USA
- i94mode = 1 digit travel code
- depdate = departure date from the USA
- i94visa = reason for immigration

The temperature data comes from Kaggle. It is provided in csv format. Some relevant attributes include:

- AverageTemperature = average temperature
- City = city name
- Country = country name
- Latitude= latitude
- Longitude = longitude

In [2]:
# Read April 2016 I94 immigration data into Pandas for exploration
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [3]:
# Display first few entries of temperature data
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [4]:
# Create Spark session with SAS7BDAT jar
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


# Step 2: Data Cleaning

For the I94 immigration data, we want to drop all entries where the destination city code `i94port` is not a valid value (e.g., `XXX`, `99`, etc) as described in `I94_SAS_Labels_Description.SAS`. For the temperature data, we want to drop all entries where `AverageTemperature` is NaN, then drop all entries with duplicate locations, and then add the `i94port` of the location in each entry.

In [5]:
# Clean I94 immigration data

# Create dictionary of valid i94port codes
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94port_valid = {}
with open('i94port_valid.txt') as f:
     for line in f:
         match = re_obj.search(line)
         i94port_valid[match[1]]=[match[2]]

def clean_i94_data(file):
    '''
    Input: Path to I94 immigration data file
    
    Output: Spark dataframe of I94 immigration data with valid i94port
    
    '''
    
    # Read I94 data into Spark
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(file)

    # Filter out entries where i94port is invalid
    df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(i94port_valid.keys())))

    return df_immigration

# Test function
# immigration_test_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 
# df_immigration_test = clean_i94_data(immigration_test_file)
# df_immigration_test.show()

In [6]:
immigration_test_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 
df_immigration_test = clean_i94_data(immigration_test_file)
df_immigration_test.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| null|      G|   null|      Y|   null| 1991.0|     D/S|     M|  null|   null|  3.73679633E9|00296|      F1|
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    nul

In [7]:
# Clean temperature data
df_temp=spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

# Filter out entries with NaN average temperature
df_temp=df_temp.filter(df_temp.AverageTemperature != 'NaN')

# Remove duplicate locations
df_temp=df_temp.dropDuplicates(['City', 'Country'])

@udf()
def get_i94port(city):
    '''
    Input: City name
    
    Output: Corresponding i94port
    
    '''
    
    for key in i94port_valid:
        if city.lower() in i94port_valid[key][0].lower():
            return key

# Add iport94 code based on city name
df_temp=df_temp.withColumn("i94port", get_i94port(df_temp.City))

# Remove entries with no iport94 code
df_temp=df_temp.filter(df_temp.i94port != 'null')

# Show results
# df_temp.show()

In [8]:
df_temp.show()

+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|     City|             Country|Latitude|Longitude|i94port|
+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|1852-07-01|             15.488|                        1.395|    Perth|           Australia|  31.35S|  114.97E|    PER|
|1828-01-01|             -1.977|                        2.551|  Seattle|       United States|  47.42N|  121.97W|    SEA|
|1743-11-01|              2.767|                        1.905| Hamilton|              Canada|  42.59N|   80.73W|    HAM|
|1849-01-01|  7.399999999999999|                        2.699|  Ontario|       United States|  34.56N|  116.76W|    ONT|
|1821-11-01|              2.322|                        2.375|  Spokane|       United States|  47.42N|  117.24W|    SPO|
|1843-01-01| 18.874000000000002|

# Step 3: Define the Data Model

### 3.1 Conceptual Data Model

The first dimension table will contain events from the I94 immigration data. The columns below will be extracted from the immigration dataframe:

- `i94yr` = 4 digit year
- `i94mon` = numeric month
- `i94cit` = 3 digit code of origin city
- `i94port` = 3 character code of destination city
- `arrdate` = arrival date
- `i94mode` = 1 digit travel code
- `depdate` = departure date
- `i94visa` = reason for immigration

The second dimension table will contain city temperature data. The columns below will be extracted from the temperature dataframe:

- `i94port` = 3 character code of destination city (mapped from immigration data during cleanup step)
- `AverageTemperature` = average temperature
- `City` = city name
- `Country` = country name
- `Latitude` = latitude
- `Longitude` = longitude

The fact table will contain information from the I94 immigration data joined with the city temperature data on i94port:

- `i94yr` = 4 digit year
- `i94mon` = numeric month
- `i94cit` = 3 digit code of origin city
- `i94port` = 3 character code of destination city
- `arrdate` = arrival date
- `i94mode` = 1 digit travel code
- `depdate` = departure date
- `i94visa` = reason for immigration
- `AverageTemperature` = average temperature of destination city

The tables will be saved to Parquet files partitioned by city (i94port).

### 3.2 Mapping Out Data Pipelines

The pipeline steps are described below:

1. Clean I94 data as described in step 2 to create Spark dataframe df_immigration for each month
2. Clean temperature data as described in step 2 to create Spark dataframe df_temp (already performed)
3. Create immigration dimension table by selecting relevant columns from df_immigration and write to parquet file partitioned by i94port
4. Create temperature dimension table by selecting relevant columns from df_temp and write to parquet file partitioned by i94port
5. Create fact table by joining immigration and temperature dimension tables on i94port and write to parquet file partitioned by i94port

# Step 4: Run Pipelines to Model the Data

### 4.1 Create the data model
*Build the data pipelines to create the data model.*

In [9]:
# Path to I94 immigration data 
#immigration_data = '/data/18-83510-I94-Data-2016/*.sas7bdat'
immigration_data = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

# Clean I94 immigration data and store as Spark dataframe
df_immigration = clean_i94_data(immigration_data)

# Extract columns for immigration dimension table
immigration_table = df_immigration.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

# Write immigration dimension table to parquet files partitioned by i94port
immigration_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [10]:
# Extract columns for temperature dimension table
temp_table = df_temp.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])

# Write temperature dimension table to parquet files partitioned by i94port
temp_table.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

In [11]:
# Create temporary views of the immigration and temperature data
df_immigration.createOrReplaceTempView("immigration_view")
df_temp.createOrReplaceTempView("temp_view")

# Create the fact table by joining the immigration and temperature views
fact_table = spark.sql('''
SELECT immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as i94port,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       temp_view.AverageTemperature as temperature,
       temp_view.Latitude as latitude,
       temp_view.Longitude as longitude
FROM immigration_view
JOIN temp_view ON (immigration_view.i94port = temp_view.i94port)
''')

# Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

In [12]:
# Create temporary views of the immigration and temperature data
df_immigration.createOrReplaceTempView("immigration_view")
df_temp.createOrReplaceTempView("temp_view")

# Create the fact table by joining the immigration and temperature views
query = spark.sql('''
SELECT immigration_view.i94yr AS year,
       immigration_view.i94mon AS month,
       immigration_view.i94cit AS city,
       immigration_view.i94port AS i94port,
       immigration_view.arrdate AS arrival_date,
       immigration_view.depdate AS departure_date,
       immigration_view.i94visa AS reason,
       temp_view.AverageTemperature AS temperature,
       temp_view.Latitude AS latitude,
       temp_view.Longitude AS longitude
FROM immigration_view
JOIN temp_view ON immigration_view.i94port = temp_view.i94port
WHERE temp_view.AverageTemperature > 5 
''')

In [14]:
query.show()

+------+-----+-----+-------+------------+--------------+------+-----------------+--------+---------+
|  year|month| city|i94port|arrival_date|departure_date|reason|      temperature|latitude|longitude|
+------+-----+-----+-------+------------+--------------+------+-----------------+--------+---------+
|2016.0|  4.0|111.0|    SNA|     20545.0|       20547.0|   1.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  4.0|114.0|    SNA|     20545.0|       20562.0|   2.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  4.0|117.0|    SNA|     20545.0|       20559.0|   2.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  4.0|129.0|    SNA|     20545.0|       20548.0|   2.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  4.0|575.0|    SNA|     20545.0|       20547.0|   1.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  4.0|575.0|    SNA|     20545.0|       20547.0|   2.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  4.0|577.0|    SNA|     20545.0|       20550.0|   1.0|7.168999999999999|  29.74N| 

In [16]:
# Sample query that allows users to look at immigration events and temperature based on
# conditions above 5 degrees
result_df = spark.sql('''
SELECT immigration_view.i94yr AS year,
       immigration_view.i94mon AS month,
       immigration_view.i94cit AS city,
       immigration_view.i94port AS i94port,
       immigration_view.arrdate AS arrival_date,
       immigration_view.depdate AS departure_date,
       immigration_view.i94visa AS reason,
       temp_view.AverageTemperature AS temperature,
       temp_view.Latitude AS latitude,
       temp_view.Longitude AS longitude
FROM immigration_view
JOIN temp_view ON immigration_view.i94port = temp_view.i94port
WHERE temp_view.AverageTemperature > 5
''')

# Display the result DataFrame
result_df.show()

+------+-----+-----+-------+------------+--------------+------+-----------------+--------+---------+
|  year|month| city|i94port|arrival_date|departure_date|reason|      temperature|latitude|longitude|
+------+-----+-----+-------+------------+--------------+------+-----------------+--------+---------+
|2016.0|  4.0|111.0|    SNA|     20545.0|       20547.0|   1.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  4.0|114.0|    SNA|     20545.0|       20562.0|   2.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  4.0|117.0|    SNA|     20545.0|       20559.0|   2.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  4.0|129.0|    SNA|     20545.0|       20548.0|   2.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  4.0|575.0|    SNA|     20545.0|       20547.0|   1.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  4.0|575.0|    SNA|     20545.0|       20547.0|   2.0|7.168999999999999|  29.74N|   97.85W|
|2016.0|  4.0|577.0|    SNA|     20545.0|       20550.0|   1.0|7.168999999999999|  29.74N| 

### Example Queries on Database

**Question:** Do warmers cities attract more visitors than colder cities?

In [15]:
#Query 1 Shows visits to American citites where Average temprature > 5. 
query = spark.sql('''
SELECT immigration_view.i94port AS i94port,
       COUNT(*) AS visit_count
FROM immigration_view
JOIN temp_view ON immigration_view.i94port = temp_view.i94port
WHERE temp_view.AverageTemperature > 5
GROUP BY immigration_view.i94port
ORDER BY visit_count DESC
LIMIT 10
''')

In [16]:
query.show()

+-------+-----------+
|i94port|visit_count|
+-------+-----------+
|    LOS|     930489|
|    NYC|     485916|
|    MIA|     343941|
|    SFR|     152586|
|    ORL|     149195|
|    NEW|     136122|
|    CHI|     130564|
|    HOU|     101481|
|    FTL|      95977|
|    ATL|      92579|
+-------+-----------+



In [17]:
#Query 2 Shows visits to American citites where Average temprature < 2. 
query2 = spark.sql('''
SELECT immigration_view.i94port AS i94port,
       COUNT(*) AS visit_count
FROM immigration_view
JOIN temp_view ON immigration_view.i94port = temp_view.i94port
WHERE temp_view.AverageTemperature < 2
GROUP BY immigration_view.i94port
ORDER BY visit_count DESC
LIMIT 10
''')

In [18]:
query2.show()

+-------+-----------+
|i94port|visit_count|
+-------+-----------+
|    BOS|      57354|
|    SEA|      47719|
|    TOR|      20886|
|    MON|       6006|
|    BUF|       1040|
|    OTT|        663|
|    VIC|        626|
|    ANC|         91|
|    SYR|         76|
|    PRO|         12|
+-------+-----------+



*Based on the queries above, it can be seen that citites with warmer climates tend to have higher immigration rates than those with colder temperatures*

The top 10 warmer cities visits range from **92,579 - 930,489**. This is significantly higher than countries with colder climates where number of visits range from **12 - 57,354**

This is an example of how the schema can be used to answer the question of if temperature affects the immigration rates of a country. 

### 4.2 Data Quality Checks
*Ensure there are adequate number of entries in each table.*

In [19]:
def quality_check(df, description):
    '''
    Input: Spark dataframe, description of Spark datafram
    
    Output: Print outcome of data quality check
    
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

In [21]:
# Perform data quality check
print(quality_check(df_immigration, "immigration table"))
#print(quality_check(df_temp, "temperature table"))

Data quality check passed for immigration table with 3088544 records
0


In [22]:
def check_duplicates(df, description):
    '''
    Input: Spark DataFrame, description of the DataFrame
    
    Output: Print outcome of duplicate records check
    '''
    duplicate_count = df.count() - df.dropDuplicates().count()
    if duplicate_count == 0:
        print("Duplicate records check passed for '{}' with no duplicates".format(description))
    else:
        print("Duplicate records check failed for '{}' with {} duplicate records".format(description, duplicate_count))
    return 0

In [23]:
print(check_duplicates(df_immigration, "immigration table"))

Duplicate records check passed for 'immigration table' with no duplicates
0


### 4.3 Data dictionary

The first dimension table will contain events from the I94 immigration data. The columns below will be extracted from the immigration dataframe:

- `i94yr` = 4 digit year
- `i94mon` = numeric month
- `i94cit` = 3 digit code of origin city
- `i94port` = 3 character code of destination city
- `arrdate` = arrival date
- `i94mode` = 1 digit travel code
- `depdate` = departure date
- `i94visa` = reason for immigration

The second dimension table will contain city temperature data. The columns below will be extracted from the temperature dataframe:

- `i94port` = 3 character code of destination city (mapped from immigration data during cleanup step)
- `AverageTemperature` = average temperature
- `City` = city name
- `Country` = country name
- `Latitude` = latitude
- `Longitude` = longitude

The fact table will contain information from the I94 immigration data joined with the city temperature data on i94port:

- `i94yr` = 4 digit year
- `i94mon` = numeric month
- `i94cit` = 3 digit code of origin city
- `i94port` = 3 character code of destination city
- `arrdate` = arrival date
- `i94mode` = 1 digit travel code
- `depdate` = departure date
- `i94visa` = reason for immigration
- `AverageTemperature` = average temperature of destination city


# Step 5: Project Summary

### Rationale for the choice of tools and technologies for the project.

**Tools**
- **`Spark:`** *Spark was chosen since it can easily handle multiple file formats (including SAS) containing large amounts of data.* 
- **`Spark SQL:`** *Spark SQL was chosen to process the large input files into dataframes and manipulated via standard SQL join operations to form additional tables.*

**Update Frequency**
*Monthly.* This is in line with the current raw file format.

**Future Design Considerations**

1. *The data was increased by 100x.*

If Spark with standalone server mode can not process 100x data set, we could consider to put data in `AWS EMR` which is a distributed data cluster for processing large data sets on cloud

2. *The data populates a dashboard that must be updated on a daily basis by 7am every day.*

Apache Airflow could be used for building up a ETL data pipeline to regularly update the date and populate a report. Apache Airflow also integrate with Python and AWS very well. More applications can be combined together to deliever more powerful task automation.

3. *The database needed to be accessed by 100+ people.*

If the database needed to be accessed by 100+ people, we could consider publishing the parquet files to HDFS and giving read access to users that need it. If the users want to run SQL queries on the raw data, we could consider publishing to HDFS using a tool such as Impala.