# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, date_add
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd

### Step 1: Scope the Project and Gather Data

#### Scope 
For the scope of work, there will be 2 dimension tables and 1 fact table. Firstly the immigration data will be aggregated by city, secondly the temperature data will be aggregated by city information. The results of these two operations will be merged based on city value to create one fact table. The final database will be created to analyze whether or not the temperature affects the destination cities of immigration.

#### Describe and Gather Data 
I94 immigration data gathered from the US National Tourism and Trade Office website. The format of the data is a binary database storage formata and called SAS7BDAT.

The temperature data is a Kaggle data set. It contains the temperature information of cities all around the world. This data can be found in the link below.
https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data

#### Key Notes:
- arrdate = arrival date in the USA,
- depdate = departure date from the USA,
- i94yr = 4 digit year,
- i94mon = numeric month,
- i94cit = 3 digit code of origin city,
- i94port = 3 character code of destination USA city,
- i94mode = 1 digit travel code,
- i94visa = reason for immigration, The temperature data set comes from Kaggle. It is in csv format.

#### Additional Notes:
- AverageTemperature = average temperature,
- City = city name,
- Country = country name,
- Latitude= latitude,
- Longitude = longitude

#### 1. Process of Immigration Data

In [3]:
# Read in the immigration data here
fname = './data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [4]:
df.head(50)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,...,,M,1959.0,09302016,,,AZ,92471040000.0,602.0,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1953.0,09302016,,,AZ,92471400000.0,602.0,B2
7,20.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1959.0,09302016,,,AZ,92471610000.0,602.0,B2
8,21.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20553.0,...,,M,1970.0,09302016,,,AZ,92470800000.0,602.0,B2
9,22.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20562.0,...,,M,1968.0,09302016,,,AZ,92478490000.0,608.0,B1


In [7]:
# Read in the temperature data here
temp_data = './data/data2/GlobalLandTemperaturesByCity.csv'
df_temp_data = pd.read_csv(temp_data, sep=',')

In [8]:
df_temp_data.head(50)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


In [10]:
set(df_temp_data["Country"].values)

{'Afghanistan',
 'Albania',
 'Algeria',
 'Angola',
 'Argentina',
 'Armenia',
 'Australia',
 'Austria',
 'Azerbaijan',
 'Bahamas',
 'Bahrain',
 'Bangladesh',
 'Belarus',
 'Belgium',
 'Benin',
 'Bolivia',
 'Bosnia And Herzegovina',
 'Botswana',
 'Brazil',
 'Bulgaria',
 'Burkina Faso',
 'Burma',
 'Burundi',
 'Cambodia',
 'Cameroon',
 'Canada',
 'Central African Republic',
 'Chad',
 'Chile',
 'China',
 'Colombia',
 'Congo',
 'Congo (Democratic Republic Of The)',
 'Costa Rica',
 'Croatia',
 'Cuba',
 'Cyprus',
 'Czech Republic',
 "Côte D'Ivoire",
 'Denmark',
 'Djibouti',
 'Dominican Republic',
 'Ecuador',
 'Egypt',
 'El Salvador',
 'Equatorial Guinea',
 'Eritrea',
 'Estonia',
 'Ethiopia',
 'Finland',
 'France',
 'Gabon',
 'Gambia',
 'Georgia',
 'Germany',
 'Ghana',
 'Greece',
 'Guatemala',
 'Guinea',
 'Guinea Bissau',
 'Guyana',
 'Haiti',
 'Honduras',
 'Hong Kong',
 'Hungary',
 'Iceland',
 'India',
 'Indonesia',
 'Iran',
 'Iraq',
 'Ireland',
 'Israel',
 'Italy',
 'Jamaica',
 'Japan',
 'Jorda

In [5]:
# Define Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()



### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [11]:
df_country

AttributeError: 'DataFrame' object has no attribute 'decribe'

In [6]:
# Dictionary of valid i94port codes is created
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94port_valid = {}

with open('i94port.txt') as f:
     for data in f:
         match = re_obj.search(data)
         i94port_valid[match[1]]=[match[2]]

In [7]:
def clean_immig_data(file):
    '''    
    Input: immigration data file location
    Output: Spark dataframe of immigration data with valid i94port
    '''    
    # Read I94 data into Spark
    df_immig = spark.read.format('com.github.saurfang.sas.spark').load(file)

    # Filter out entries where i94port is invalid
    df_immig = df_immig.filter(df_immig.i94port.isin(list(i94port_valid.keys())))

    return df_immig

In [8]:
immig_test_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 
df_immig_test = clean_immig_data(immig_test_file)
df_immig_test.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| null|      G|   null|      Y|   null| 1991.0|     D/S|     M|  null|   null|  3.73679633E9|00296|      F1|
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    nul

In [9]:
# Clean temperature data
df_temp_data = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

In [10]:
# Filter out data points with NaN average temperature
df_temp_data = df_temp_data.filter(df_temp_data.AverageTemperature != 'NaN')

In [11]:
@udf()
def get_i94port(city):
    '''
    Input: City name 
    Output: Corresponding i94port
    '''    
    for key in i94port_valid:
        if city.lower() in i94port_valid[key][0].lower():
            return key

In [12]:
# New column with i94port code
df_temp_data = df_temp_data.withColumn("i94port", get_i94port(df_temp_data.City))
df_temp_data.show()

+----------+-------------------+-----------------------------+-----+-------+--------+---------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|i94port|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+-------+
|1743-11-01|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-04-01| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-05-01|             10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-06-01| 14.050999999999998|                        1.347|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-07-01|             16.082|                        1.396|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-09-01| 12.780999999999999|                        1.454|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-10-01|               7.95|                         1.63|År

In [13]:
# Remove data points with no iport94 code
df_temp_data = df_temp_data.filter(df_temp_data.i94port != 'null')

In [14]:

# Show results
df_temp_data.show()

+----------+------------------+-----------------------------+--------+--------------+--------+---------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|    City|       Country|Latitude|Longitude|i94port|
+----------+------------------+-----------------------------+--------+--------------+--------+---------+-------+
|1743-11-01|             8.758|                        1.886|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|1744-04-01|6.0699999999999985|           2.9339999999999997|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|1744-05-01|             7.751|                        1.494|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|1744-06-01|             10.62|                        1.574|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|1744-07-01|             12.35|                        1.591|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|1744-09-01|            11.224|           1.6059999999999999|Aberdeen|United Kingdom|  57.05N|  

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Since I94 data joined with the city temperature data, the fact table will contain the columns below.
- i94yr
- i94mon
- i94cit
- i94port
- arrdate
- i94mode
- depdate
- i94visa
- AverageTemperature

The first dimension table is I94 imigration data. The columns are showed below.
- i94yr
- i94mon
- i94cit
- i94port
- arrdate
- i94mode
- depdate
- i94visa

The second dimension table will be the temperature data.
- i94port
- AverageTemperature
- City
- Country
- Latitude
- Longitude


#### 3.2 Mapping Out Data Pipelines
As described in the step 2, data clean up should be completed first of all.
- Clean up and normalize the I94 data
- Clean up and normalize the temperature data
- Create immigration dimension tables
- - Select the corresponding column from df_immig
- - Write to parquet partitioned by i94port
- Create temperature dimension tables
- - Select the corresponding column from df_temp_data
- - Write to parquet partitioned by i94port
- Create the fact table by joining tables above
- - Write to parquet partitioned by i94port

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [15]:
immig_data = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

In [16]:
df_immig = clean_immig_data(immig_data)

In [17]:
# Extract columns for immigration dimension table
immigration_table = df_immig.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

In [18]:
# Write to parquet files partitioned by i94port
immigration_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [19]:
# Extract columns for temperature dimension table
temp_table = df_temp_data.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])

In [None]:
# Write to parquet files partitioned by i94port
temp_table.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

In [None]:
# Create temporary views
df_immig.createOrReplaceTempView("immigration_view")
df_temp_data.createOrReplaceTempView("temperature_view")

In [None]:
# Create the fact table by joining the immigration and temperature views
fact_table = spark.sql('''
                            SELECT immigration_view.i94yr as year,
                                   immigration_view.i94mon as month,
                                   immigration_view.i94cit as city,
                                   immigration_view.i94port as i94port,
                                   immigration_view.arrdate as arrival_date,
                                   immigration_view.depdate as departure_date,
                                   immigration_view.i94visa as reason,
                                   temperature_view.AverageTemperature as temperature,
                                   temperature_view.Latitude as latitude,
                                   temperature_view.Longitude as longitude
                            FROM immigration_view
                            JOIN temperature_view 
                                ON (immigration_view.i94port = temperature_view.i94port)
                        ''')

In [None]:
# Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks
Run Quality Checks

In [None]:
def quality_check(df, description):
    '''
    Input: Spark dataframe, description of Spark datafram
    Output: Print outcome of data quality check
    '''
    
    result = df.count()
    if result == 0:
        print("Quality check failed for {} with zero records".format(description))
    else:
        print("Quality check passed for {} with {} records".format(description, result))
    return 0

In [None]:
# Perform data quality check
quality_check(df_immig, "immigration table")
quality_check(df_temp_data, "temperature table")

#### 4.3 Data dictionary 
#### Fact Table:

- i94yr: 4 digit year,
- i94mon: numeric month,
- i94cit: 3 digit code of origin city,
- i94port: 3 character code of destination USA city,
- arrdate: arrival date in the USA,
- i94mode: 1 digit travel code,
- depdate: departure date from the USA,
- i94visa: reason for immigration,
- AverageTemperature: average temperature of destination city

#### Dimension Table - I94 immigration data Events Columns:

- i94yr: 4 digit year
- i94mon: numeric month
- i94cit: 3 digit code of origin city
- i94port: 3 character code of destination USA city
- arrdate: arrival date in the USA
- i94mode: 1 digit travel code
- depdate: departure date from the USA
- i94visa: reason for immigration

#### Dimension Table - temperature data Columns:

- i94port: 3 character code of destination city
- AverageTemperature: average temperature
- City: city name
- Country: country name
- Latitude: latitude
- Longitude: longitude

#### Step 5: Complete Project Write Up
Clearly state the rationale for the choice of tools and technologies for the project.
- There is a significant size of the immigration data which is combined with temperature data. Therefore Spark has been used since it would be the best practice for this case.

Propose how often the data should be updated and why.
- There is a significant size of the immigration data which is combined with temperature data. Therefore Spark has been used since it would be the best practice for this case.

#### Scenarios
Write a description of how you would approach the problem differently under the following scenarios.
- The data was increased by 100x.
- - Use Spark with EMR to process the data in a distributed way with high efficiency
- The data populates a dashboard that must be updated on a daily basis by 7am every day
- - Use Airflow and create a DAG to monitor the process
- The database needed to be accessed by 100+ people
- - Use Redshift. Great auto-scaling capabilities and can be accessed by many people
