# Temperature the biggest reason for immigration or just random event?
### Data Engineering Capstone Project

#### Project Summary

Data analyist in Our company have been given to task to see if the temperature of a place has been the biggest factor of immigration or it might just have been a random event. Data analysts have approached data engineer to provide them clean data to aid thier analysis. I94 Immigration data and city temperature data will be used to create a database that is optimized to query and analyze immigration events. An ETL pipeline is to be build with these to data sources to create the database.


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Import necessary libraries
from datetime import datetime, timedelta
from pyspark.sql.functions import udf
import re
import pandas as pd
import psycopg2

### Step 1: Scope the Project and Gather Data

#### Scope 

After much debate between the data engineering team in collaboration with the data analyst , based on the requirements, We'll be creating a start schema table with 2 dimension tables and 1 fact table. we will aggregate I94 immigration data by destination city to form our first dimension table,next we will aggregate city temperature data by city to form the second dimension table. The two datasets will be joined on destination city as primary key to form the fact table. The final database is optimized to query on immigration events to determine if temperature affects the selection of destination cities. Spark will be used to process the data.


#### Describe and Gather Data 

I94 immigration data comes from the[US National Tourism and Trade Office website](https://travel.trade.gov/research/reports/i94/historical/2016.html). It is provided in SAS7BDAT format which is a binary database storage format.

- i94yr = 4 digit year,
- i94mon = numeric month,
- i94cit = 3 digit code of origin city,
- i94port = 3 character code of destination USA city,
- arrdate = arrival date in the USA,
- i94mode = 1 digit travel code,
- depdate = departure date from the USA,
- i94visa = reason for immigration,

The temperature data is a Kaggle data set that includes temperatures in cities around the world. It can be found here: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data. Its stored in a csv format

- AverageTemperature = average temperature,
- City = city name,
- Country = country name,
- Latitude= latitude,
- Longitude = longitude

<b>Immigration Data</b>

In [2]:
# Read April 2016 I94 immigration data
immigration = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_immig = pd.read_sas(immigration, 'sas7bdat', encoding="ISO-8859-1")

In [5]:
# display first 5 row of df_immig
pd.set_option('display.max_column',None)
df_immig.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [6]:
print(df_immig.shape)
df_immig.columns

(3096313, 28)


Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

<b>Temperature Data</b>

In [7]:
# Read in the temperature data
temperature = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(temperature, sep=',')

In [8]:
# Display first 5 rows of df_temp
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [9]:
print(df_temp.shape)
df_temp.columns

(8599212, 7)


Index(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City',
       'Country', 'Latitude', 'Longitude'],
      dtype='object')

In [10]:
# Create Spark session with SAS7BDAT jar
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 2: Explore and Assess the Data
#### Explore the Data 

**I94 immigration data** -> drop all data entry points with the destination city code i94port not a valid value eg (XXX, 99, NaN, etc).

**Temperature Data** -> drop all data entry points where AverageTemperature is Null, where duplicate locations exists, and add the i94port of the location in each entry.

In [11]:
# Clean immigration data
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94portvalid = {}
with open('i94port.txt') as f:
     for data in f:
         match = re_obj.search(data)
         i94portvalid[match[1]]=[match[2]]

def check_immig(file):
    '''    
    Input: Path to immigration data file
    Output: Spark dataframe of immigration data with valid i94port
    '''    
    # Read I94 data into Spark
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(file)

    # Filter out entries where i94port is invalid
    df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(i94portvalid.keys())))

    return df_immigration

Here we'll clean the Temperature data by filtering the NaN values out and passing the dataframe
without NaN value to a new variable name

In [12]:
# Clean temperature data
df_temp = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

In [13]:
# Filter out data points with NaN average temperature
df_temp = df_temp.filter(df_temp.AverageTemperature != 'NaN')

#remove duplicate locations
df_temp=df_temp.dropDuplicates(['City', 'Country'])

In [14]:
@udf()
def get_i94port(city):
    '''
    Input: City name 
    Output: Corresponding i94port
    '''
    
    for key in i94portvalid:
        if city.lower() in i94portvalid[key][0].lower():
            return key
    

In [15]:
# Add iport94 code based on city name
df_temp = df_temp.withColumn("i94port", get_i94port(df_temp.City))
df_temp.show()

+----------+--------------------+-----------------------------+------------+------------------+--------+---------+-------+
|        dt|  AverageTemperature|AverageTemperatureUncertainty|        City|           Country|Latitude|Longitude|i94port|
+----------+--------------------+-----------------------------+------------+------------------+--------+---------+-------+
|1743-11-01|               3.264|                        1.665|   Allentown|     United States|  40.99N|   74.56W|   null|
|1779-11-01|0.011999999999999985|                        2.714|      Atyrau|        Kazakhstan|  47.42N|   50.92E|   null|
|1825-01-01|  26.069000000000003|                         2.16|     Bintulu|          Malaysia|   2.41N|  113.30E|   null|
|1825-01-01|              26.517|           2.5839999999999996| Butterworth|          Malaysia|   5.63N|  100.09E|   null|
|1845-01-01|              24.995|                        1.871|      Cainta|       Philippines|  15.27N|  120.83E|   null|
|1825-01-01|    

In [16]:
# Remove data points with no iport94 code
df_temp = df_temp.filter(df_temp.i94port != 'null')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Here we have defined two dimension table along side a fact table

**The Dimension tables include** 
1. Temperature data
2. I94 immigration data Events

**The Fact table includes**
1. I94 immigration data joined with the city temperature data on i94port

Here includes more details on columns decription of these tables

**Fact Table** - I94 immigration data joined with the city temperature data on i94port,
Columns:
   - i94yr = 4 digit year,
   - i94mon = numeric month,
   - i94cit = 3 digit code of origin city,
   - i94port = 3 character code of destination USA city,
   - arrdate = arrival date in the USA,
   - i94mode = 1 digit travel code,
   - depdate = departure date from the USA,
   - i94visa = reason for immigration,
   - AverageTemperature = average temperature of destination city,

**Dimension Table** - I94 immigration data Events
Columns:
   - i94yr = 4 digit year
   - i94mon = numeric month
   - i94cit = 3 digit code of origin city
   - i94port = 3 character code of destination USA city
   - arrdate = arrival date in the USA
   - i94mode = 1 digit travel code
   - depdate = departure date from the USA
   - i94visa = reason for immigration

**Dimension Table** - temperature data
Columns:
- i94port = 3 character code of destination city (mapped from cleaned up immigration data)
- AverageTemperature = average temperature
- City = city name
- Country = country name
- Latitude= latitude
- Longitude = longitude

#### 3.2 Mapping Out Data Pipelines\n

Pipeline Steps:

1. Clean I94 data as described in step 2 to create Spark dataframe df_immigration_test for each month.
2. Clean temperature data as described in step 2 to create Spark dataframe df_temperature_data
3. Create immigration dimension table by selecting relevant columns from df_immigration and write to parquet file partitioned by i94port.
4. Create temperature dimension table by selecting relevant columns from df_temperature_data and write to parquet file partitioned by i94port.
5. Create fact table by joining immigration and temperature dimension tables on i94port and write to parquet file partitioned by i94port

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [17]:
immigration_data = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

# Clean I94 immigration data and store as Spark dataframe
df_immigration = check_immig(immigration_data)

In [18]:
# 1. Extract columns for immigration dimension table
# 2. Write immigration dimension table to parquet files partitioned by i94port
immigration_table = df_immigration.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

immigration_table.write.mode("append").partitionBy("i94port").parquet("/output/immigration.parquet")

In [20]:
# 1. Extract columns for temperature dimension table
# 2. Write temperature dimension table to parquet files partitioned by i94port
temp_table = df_temp.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])

temp_table.write.mode("append").partitionBy("i94port").parquet("/output/temperature.parquet")

In [22]:
# Create temporary views of the immigration and temperature data
df_immigration.createOrReplaceTempView("immigration")
df_temp.createOrReplaceTempView("temperature")

In [24]:
#Join immigration and temperature table to create fact table
# We'll be joining on i94port 
fact_table = spark.sql('''
select immigration.i94yr as year,
       immigration.i94mon as month,
       immigration.i94cit as city,
       immigration.i94port as i94port,
       immigration.arrdate as arrival_date,
       immigration.depdate as departure_date,
       immigration.i94visa as reason,
       temperature.AverageTemperature as temperature,
       temperature.Latitude as latitude,
       temperature.Longitude as longitude
from immigration
JOIN temperature ON (immigration.i94port = temperature.i94port)
''')

In [25]:
# Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/output/fact_table.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [26]:
def quality_check(df, description):
    '''
    Input: Spark dataframe
    Output: Print outcome of data quality check, if passed or Not
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

In [29]:
# Perform data quality check
quality_check(df_immigration, "immigration table")

Data quality check passed for immigration table with 3088544 records


0

#### 4.3 Data dictionary 

**Fact Table** - I94 immigration data joined with the city temperature data on i94port
Columns:
   - i94yr = 4 digit year,
   - i94mon = numeric month,
   - i94cit = 3 digit code of origin city,
   - i94port = 3 character code of destination USA city,
   - arrdate = arrival date in the USA,
   - i94mode = 1 digit travel code,
   - depdate = departure date from the USA,
   - i94visa = reason for immigration,
   - AverageTemperature = average temperature of destination city,

**Dimension Table** - I94 immigration data Events
Columns:
   - i94yr = 4 digit year
   - i94mon = numeric month
   - i94cit = 3 digit code of origin city
   - i94port = 3 character code of destination USA city
   - arrdate = arrival date in the USA
   - i94mode = 1 digit travel code
   - depdate = departure date from the USA
   - i94visa = reason for immigration

**Dimension Table** - temperature data
Columns:
- i94port = 3 character code of destination city (mapped from cleaned up immigration data)
- AverageTemperature = average temperature
- City = city name
- Country = country name
- Latitude= latitude
- Longitude = longitude

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project:
  1. I choose Spark beacuse it can easily and effectively handle large amount of data in any format ranging from txt, csv, SAS, json etc . And also with the spark sql support to process the input files into dataframes. Template view helped in using SQL commands to perform joins seamlessly.
    
* Propose how often the data should be updated and why.
    1. Since the format of the raw files are monthly, we should continue pulling the data monthly.
    
### Scenarios
* Write a description of how you would approach the problem differently under the following scenarios:
    1. the data was increased by 100x.
        - EMR: We would consider moving spark to cluster modes using yarn cluster manager by using EMR from amazon web services and save our data in s3 buckets. 
    2. The data populates a dashboard that must be updated on a daily basis by 7am every day.
        - A scheduling tool like Airflow can be used here, create DAG retries or send emails on failures.
        - Have daily quality checks; if fail, send emails to operators and freeze dashboards
    3. The database needed to be accessed by 100+ people.
        - We'll publish the data to an S3 bucket and grant Read access 