# ETL Pipeline for Temperature and Immigration data modelling
### Data Engineering Capstone Project

#### Project Summary


The goal of this project is to create an ETL pipeline using city temperature data  and  I94 immigration data to form a database that is optimized for queries on immigration instances. These models can be used to answer questions relating immigration behavior to destination temperature e.g., do people tend to emigrate to colder or warmer places?

The project follows the follow steps:

- Step 1: Scope the Project and Gather Data
- Step 2: Explore and Assess the Data
- Step 3: Define the Data Model
- Step 4: Run ETL to Model the Data
- Step 5: Complete Project Write Up

In [2]:
import pandas as pd, re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
# Do a conda install 
!conda install -c conda-forge python-snappy
!pip install pyarrow
!pip install python-snappy
!pip install -U fastparquet

Collecting package metadata: done
Solving environment: done


  current version: 4.6.14
  latest version: 4.8.0

Please update conda by running

    $ conda update -n base conda



# All requested packages already installed.

Requirement already up-to-date: fastparquet in /opt/conda/lib/python3.6/site-packages (0.3.2)


### Step 1: Scope the Project and Gather Data
#### Scope 
In this project, we will aggregate I94 immigration data (in sas_data provided which is for month of Apr and is about 2 million data points) by destination city to form our 1st dim table. Next we will aggregate city temperature data by city to form the 2nd dim table. The two datasets will be joined on destination city to form the fact table. 
The final database is optimized to query on immigration data to determine if temperature
is a deciding factor for the selection of destination cities. Spark will be used to process the data.<br>

#### Reasoning
Spark is better than regular pandas as it can load millions of data points and offers pretty much the same functionality as pandas.
It has high extendability to applications involving streaming data, as well as The SparkML library

#### Describe and Gather Data 

The I94 immigration data comes from the US National Tourism and Trade Office. It is provided in SAS7BDAT format which is a binary database storage format. Some important attributes include:

- i94yr = 4 digit year
- i94mon = numeric month
- i94cit = 3 digit code of origin city
- i94port = 3 character code of destination USA city
- arrdate = arrival date in the USA
- i94mode = 1 digit travel code
- depdate = departure date from the USA
- i94visa = reason for immigration

The temperature data comes from Kaggle. It is provided in csv format. Some important attributes include:

- AverageTemperature = average temperature
- City = city name
- Country = country name
- Latitude= latitude
- Longitude = longitude



In [2]:
#Read Data
immigration_df=pd.read_parquet('sas_data/part-00000-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet', engine='fastparquet')

In [3]:
immigration_df.tail()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
219263,459646.0,2016.0,4.0,135.0,135.0,ATL,20547.0,1.0,FL,20608.0,...,,M,1946.0,7012016,,,VS,55557730000.0,115,WT
219264,459647.0,2016.0,4.0,135.0,135.0,ATL,20547.0,1.0,FL,20608.0,...,,M,1947.0,7012016,,,VS,55557730000.0,115,WT
219265,459648.0,2016.0,4.0,135.0,135.0,ATL,20547.0,1.0,FL,20557.0,...,,M,1970.0,7012016,,,VS,55550400000.0,109,WT
219266,459649.0,2016.0,4.0,135.0,135.0,ATL,20547.0,1.0,FL,20557.0,...,,M,2003.0,7012016,,,VS,55550400000.0,109,WT
219267,459650.0,2016.0,4.0,135.0,135.0,ATL,20547.0,1.0,FL,20559.0,...,,M,1962.0,7012016,,,VS,55556250000.0,115,WT


In [6]:
print(immigration_df.shape)

(219268, 28)


In [9]:
temp_demo_df = pd.read_csv('data/GlobalLandTemperaturesByCity.csv', delimiter=',')
print(temp_demo_df.shape)
temp_demo_df.head()

(3081993, 7)


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


### Create Spark Session

In [3]:
spark = SparkSession\
.builder \
.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
.enableHiveSupport().getOrCreate()

### Step 2: Explore and Assess the Data
#### Explore the Data 
Temperature data has a lot of NaN values, it must be dealt with, I94 immigration data has to be checked for valid i94port values

#### Cleaning Steps
##### Immigration data
- For the I94 immigration data, we want to drop all entries where the destination city code i94port is not a valid value as described in I94_SAS_Labels_Description.SAS

##### Temperature Data
- For the temperature data, we want to drop all entries where AverageTemperature is NaN, 
- Then drop all entries with duplicate locations.
- Then add the i94port of the location in each entry.

In [4]:
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94port_valid = {}
with open('data/i94port_valid.txt') as f:
     for line in f:
         match = re_obj.search(line)
         i94port_valid[match[1]]=[match[2]]

def clean_i94_data(file):
    '''
    Input: Path to I94 immigration data file
    
    Output: Spark dataframe of I94 immigration data with valid i94port
    
    '''
    
    # Read I94 data into Spark
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(file)

    # Filter out entries where i94port is invalid
    df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(i94port_valid.keys())))

    return df_immigration


### Test function

In [5]:
immigration_test_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 
df_immigration_test = clean_i94_data(immigration_test_file)
df_immigration_test.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| null|      G|   null|      Y|   null| 1991.0|     D/S|     M|  null|   null|  3.73679633E9|00296|      F1|
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    nul

In [6]:
df_temp=spark.read.format("csv").option("header", "true").load("data/GlobalLandTemperaturesByCity.csv")

# Filter out entries with NaN 
df_temp=df_temp.filter(df_temp.AverageTemperature != 'NaN')

# Remove duplicates
df_temp=df_temp.dropDuplicates(['City', 'Country'])

@udf()
def get_i94port(city):
    '''
    Input: City name
    
    Output: Corresponding i94port
    
    '''
    
    for key in i94port_valid:
        if city.lower() in i94port_valid[key][0].lower():
            return key

# Add iport94 code based on city name
df_temp=df_temp.withColumn("i94port", get_i94port(df_temp.City))

# Remove entries with no iport94 code
df_temp=df_temp.filter(df_temp.i94port != 'null')



## Show Results

In [10]:
df_temp.show()

+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|     City|             Country|Latitude|Longitude|i94port|
+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|1852-07-01|             15.488|                        1.395|    Perth|           Australia|  31.35S|  114.97E|    PER|
|1828-01-01|             -1.977|                        2.551|  Seattle|       United States|  47.42N|  121.97W|    SEA|
|1743-11-01|              2.767|                        1.905| Hamilton|              Canada|  42.59N|   80.73W|    HAM|
|1849-01-01|  7.399999999999999|                        2.699|  Ontario|       United States|  34.56N|  116.76W|    ONT|
|1821-11-01|              2.322|                        2.375|  Spokane|       United States|  47.42N|  117.24W|    SPO|
|1843-01-01| 18.874000000000002|

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The first dimension table will contain data points from the I94 immigration data. The following columns are extracted from the immigration dataframe:

1. i94yr = 4 digit year
2. i94mon = numeric month
3. i94cit = 3 digit code of origin city
4. i94port = 3 character code of destination city
5. arrdate = arrival date
6. i94mode = 1 digit travel code
7. depdate = departure date
8. i94visa = reason for immigration

The second dimension table will contain city temperature data. The following columns are  extracted from the temperature dataframe:

1. i94port = 3 character code of destination city (mapped from immigration data during cleanup step)
2. AverageTemperature = average temperature
3. City = city name
4. Country = country name
5. Latitude= latitude
6. Longitude = longitude

The fact table will contain information from the I94 immigration data joined with the city temperature data on i94port:

1. i94yr = 4 digit year
2. i94mon = numeric month
3. i94cit = 3 digit code of origin city
4. i94port = 3 character code of destination city
5. arrdate = arrival date
6. i94mode = 1 digit travel code
7. depdate = departure date
8. i94visa = reason for immigration
9. AverageTemperature = average temperature of destination city

#### 3.2 Mapping Out Data Pipelines
The pipeline steps are described below:

1. Clean I94 data as described in step 2 to create Spark dataframe df_immigration for each month in batches.(1 batch shown)
2. Clean temperature data as described in step 2 to create Spark dataframe df_temp.
3. Create immigration dimension table by selecting relevant columns from df_immigration and write to parquet file partitioned by i94port
4. Create temperature dimension table by selecting relevant columns from df_temp and write to parquet file partitioned by i94port
5. Create fact table by joining immigration and temperature dimension tables on i94port and write to parquet file partitioned by i94port

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

### Immigration data from sas folder read into immigration data
   Read monthly data in batches, Results that our pipeline works have been shown for the small subset, still 2 million data points in sas_data folder

In [7]:

#immigration_data = '/data/18-83510-I94-Data-2016/*.sas7bdat'
immigration_data='../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_immigration = clean_i94_data(immigration_data)
# Extract columns for immigration dimension table
immigration_table = df_immigration.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])
# Write immigration dimension table to parquet files partitioned by i94port
immigration_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [8]:
temp_table = df_temp.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])

# Write temperature dimension table to parquet files partitioned by i94port
temp_table.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

In [9]:
df_immigration.createOrReplaceTempView("immigration_view")
df_temp.createOrReplaceTempView("temp_view")

# Create the fact table by joining the immigration and temperature views
fact_table = spark.sql('''
SELECT immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as i94port,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       temp_view.AverageTemperature as temperature,
       temp_view.Latitude as latitude,
       temp_view.Longitude as longitude
FROM immigration_view
JOIN temp_view ON (immigration_view.i94port = temp_view.i94port)
''')


In [10]:
# Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks
##### Quality Checks- 1
- Checks to determine if there are any Nan values

##### Quality Checks- 2, Unit Test
- To check if the number of columns in each DataFrame is correct

In [12]:
from pyspark.sql.functions import col,sum

def quality_check(df, description):
    '''
    Input: Spark dataframe, description of Spark datafram
    
    Output: Print outcome of data quality check
    
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

# Perform data quality check
quality_check(df_immigration, "immigration table")

Data quality check passed for immigration table with 3088544 records


0

In [11]:
def quality_check2(df, description,val):
    '''
    Input: Spark dataframe, description of Spark datafram
    
    Output: Print outcome of data quality check
    
    '''
    result= len(df.columns)
  
    if result == val:
        print("Data quality check passed,correct number of columns for {}".format(description))
    else:
        print("Data quality check failed incorrect number of columns for {}".format(description))

quality_check2(df_immigration, "immigration table",28)
quality_check2(df_temp, "temperature table",8)

Data quality check passed,correct number of columns for immigration table
Data quality check passed,correct number of columns for temperature table


#### 4.3 Data dictionary 

The first dimension table will contain events from the I94 immigration data. The columns below will be extracted from the immigration dataframe:

- i94yr = 4 digit year
- i94mon = numeric month
- i94cit = 3 digit code of origin city
- i94port = 3 character code of destination city
- arrdate = arrival date
- i94mode = 1 digit travel code
- depdate = departure date
- i94visa = reason for immigration

The second dimension table will contain city temperature data. The columns below will be extracted from the temperature dataframe:

- i94port = 3 character code of destination city (mapped from immigration data during cleanup step)
- AverageTemperature = average temperature
- City = city name
- Country = country name
- Latitude= latitude
- Longitude = longitude

The fact table will contain information from the I94 immigration data joined with the city temperature data on i94port:

- i94yr = 4 digit year
- i94mon = numeric month
- i94cit = 3 digit code of origin city
- i94port = 3 character code of destination city
- arrdate = arrival date
- i94mode = 1 digit travel code
- depdate = departure date
- i94visa = reason for immigration
- AverageTemperature = average temperature of destination city

#### Step 5: Complete Project Write Up 
1. Clearly state the rationale for the choice of tools and technologies for the project.
    Spark is better than regular pandas as it can load millions of data points and offers pretty much the same functionality as pandas.
    It has high extendability to applications involving streaming data, as well as The SparkML library


2. Propose how often the data should be updated and why.
 * The data should be updated monthly in conjunction with the current raw file format, this is a monthly pipeline to be performed in "Monthly Batches"

3. Write a description of how you would approach the problem differently under the following scenarios:

 * **The data was increased by 100x** <br>
  If the data was increased by 100x, we would no longer process the data as a single batch job. We could perhaps do incremental updates and monitor with Airflow. We could also consider moving Spark to cluster mode using a cluster manager such as Yarn.
<br>
 * **The data populates a dashboard that must be updated on a daily basis by 7am every day.**<br>
  If the data needs to populate a dashboard daily to meet an SLA then we could use a scheduling tool such as Airflow to run the ETL pipeline overnight.
<br>
 * **The database needed to be accessed by 100+ people.**<br>
  If the database needed to be accessed by 100+ people, we could consider publishing the parquet files to HDFS and giving read access to users that need it. If the users want to run SQL queries on the raw data, we could consider publishing to HDFS using a tool such as Hive.

##### [Reference](https://github.com/shalgrim)
##### [Reference](https://github.com/cheuklau)

