# Project Title
### Data Engineering Capstone Project

#### Project Summary
the project goal is to use the Immigration data and temperature data to enable the analyst to query the compained data in order to be able to determine if the temperature affect the behavior of the Immigration.


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import re
import psycopg2
from collections import defaultdict
from datetime import datetime, timedelta
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col

### Step 1: Scope the Project and Gather Data

#### Scope 
i will use spark to process the Immigration data and the temperature data in order to aggregate the two datasets by the city to get two dimensions table, then i will aggregate the two dimensions table to create the fact table.


#### Describe and Gather Data 
##### I94 Immigration Data: 
* This data comes from the US National Tourism and Trade Office. You can read more about it here https://travel.trade.gov/research/reports/i94/historical/2016.html
* It is in SAS7BDAT format.

##### Notes:

* i94YR = 4 digit year.
* i94MON = numeric month.
* i94CIT = 3 digit code of origin city.
* i94PORT = 3 character code of destination USA city.
* ARRDATE = arrival date in the USA.
* i94MODE = 1 digit travel code.
* DEPDATE = departure date from the USA.
* i94VISA = immigration reason.


##### World Temperature Data: 
* This dataset came from Kaggle. You can read more about it here https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data.
* It is in csv format.

##### Notes:
* AverageTemperature = average temperature.
* City = city name.
* Country = country name,.
* Latitude= latitude.
* Longitude = longitude.

In [2]:
	
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
#df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat')


In [3]:
#read feb data
df_immigration=spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat')
df_immigration.createOrReplaceTempView("immigration")

In [4]:
#Read the temperature data
df_temperature = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")
df_temperature.createOrReplaceTempView("temperature")

### Step 2: Explore and Assess the Data
#### Explore the Data 
* invalid i94PORT
* NaN AverageTemperature
* duplicated locations

#### Cleaning Steps
##### immigration data: 
* remove the rows that contains invalid i94PORT as mentioned in I94_SAS_Labels_Description file.

##### Temperature Data:
* remove the rows that contains NaN AverageTemperature or duplicated locations. 
* adding the I94PORT of the location in each row.

In [5]:
# Dictionary of valid ports
re_file = re.compile(r'\'(.*)\'.*\'(.*)\'')
validPorts = {}
with open('validports.txt') as file:
     for port in file:
         match = re_file.search(port)
         validPorts[match[1]]=[match[2]]

In [6]:
# Performing cleaning tasks here

# remove the rows where i94port is invalid
df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(validPorts.keys())))

df_immigration.show()


+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|      admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+
|  2.0|2016.0|   2.0| 101.0| 101.0|    ATL|20498.0|    1.0|     MI|   null|  21.0|    3.0|  1.0|    null|    null| null|      T|   null|   null|   null| 1995.0|     D/S|     F|  null|     DL|4.91319785E8|  241|      F1|
|  5.0|2016.0|   2.0| 101.0| 101.0|    CHI|20492.0|    1.0|     IL|   null|  55.0|    2.0|  1.0|    null|    null| null|

In [7]:
df_temperature = spark.sql("""
                            select distinct temperature.AverageTemperature, temperature.City, 
                            temperature.Country, temperature.Latitude, temperature.Longitude 
                            from temperature  
                            where temperature.AverageTemperature !='NaN'  """)

#df_temperature = df_temperature.filter(df_temperature.AverageTemperature != 'NaN')
df_temperature.show()

+-------------------+-----+-------+--------+---------+
| AverageTemperature| City|Country|Latitude|Longitude|
+-------------------+-----+-------+--------+---------+
| 16.500999999999998|Århus|Denmark|  57.05N|   10.33E|
| 12.947000000000001|Århus|Denmark|  57.05N|   10.33E|
|              6.549|Århus|Denmark|  57.05N|   10.33E|
|0.40299999999999997|Århus|Denmark|  57.05N|   10.33E|
|             -1.492|Århus|Denmark|  57.05N|   10.33E|
|              1.431|Århus|Denmark|  57.05N|   10.33E|
|             10.675|Århus|Denmark|  57.05N|   10.33E|
|             15.919|Århus|Denmark|  57.05N|   10.33E|
|               9.88|Århus|Denmark|  57.05N|   10.33E|
|             16.459|Århus|Denmark|  57.05N|   10.33E|
|              12.05|Århus|Denmark|  57.05N|   10.33E|
|             10.378|Århus|Denmark|  57.05N|   10.33E|
|                2.4|Århus|Denmark|  57.05N|   10.33E|
|              4.382|Århus|Denmark|  57.05N|   10.33E|
|             18.651|Århus|Denmark|  57.05N|   10.33E|
|         

In [8]:
@udf()
def get_city(city):

    for key in validPorts:
        if city.lower() in validPorts[key][0].lower():
            return key

In [9]:
# Add port code based on city name
df_temperature = df_temperature.withColumn("i94port", get_city(df_temperature.City))
df_temperature.show()

+-------------------+-----+-------+--------+---------+-------+
| AverageTemperature| City|Country|Latitude|Longitude|i94port|
+-------------------+-----+-------+--------+---------+-------+
| 16.500999999999998|Århus|Denmark|  57.05N|   10.33E|   null|
| 12.947000000000001|Århus|Denmark|  57.05N|   10.33E|   null|
|              6.549|Århus|Denmark|  57.05N|   10.33E|   null|
|0.40299999999999997|Århus|Denmark|  57.05N|   10.33E|   null|
|             -1.492|Århus|Denmark|  57.05N|   10.33E|   null|
|              1.431|Århus|Denmark|  57.05N|   10.33E|   null|
|             10.675|Århus|Denmark|  57.05N|   10.33E|   null|
|             15.919|Århus|Denmark|  57.05N|   10.33E|   null|
|               9.88|Århus|Denmark|  57.05N|   10.33E|   null|
|             16.459|Århus|Denmark|  57.05N|   10.33E|   null|
|              12.05|Århus|Denmark|  57.05N|   10.33E|   null|
|             10.378|Århus|Denmark|  57.05N|   10.33E|   null|
|                2.4|Århus|Denmark|  57.05N|   10.33E| 

In [10]:
#df_temperature.createOrReplaceTempView("temperature")
#df_temperature = spark.sql("""
#                            select temperature.AverageTemperature, temperature.City, 
#                            temperature.Country, temperature.Latitude, temperature.Longitude,temperature.i94port
#                            from temperature  
#                            where temperature.i94port is not null  """)

In [None]:

df_temperature = df_temperature.filter(df_temperature.i94port != 'null')

df_temperature.show()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
##### Dimension Table - immigration data Columns:
* I94YR = 4 digit year.
* I94MON = numeric month.
* I94CIT = 3 digit code of origin city.
* I94PORT = 3 character code of destination USA city.
* ARRDATE = arrival date in the USA.
* I94MODE = 1 digit travel code.
* DEPDATE = departure date from the USA.
* I94VISA = immigration reason.
##### Dimension Table - temperature data Columns:
* AverageTemperature = average temperature.
* City = city name.
* Country = country name,.
* Latitude= latitude.
* Longitude = longitude.
* I94PORT = 3 character code of destination USA city.

##### Fact Table - immigration data joined with the temperature Columns:
* year = 4 digit year.
* month = numeric month.
* city = 3 digit code of origin city.
* I94PORT = 3 character code of destination USA city.
* arrival_date = arrival date in the USA.
* travel_code = 1 digit travel code.
* departure_date = departure date from the USA.
* reason = immigration reason.
* AverageTemperature = average temperature of destination city,
* Latitude= latitude.
* Longitude = longitude.


#### 3.2 Mapping Out Data Pipelines
<img src="PIPline.png"> 


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
df_immigration=spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat')


In [None]:
#Read the temperature data
df_temperature = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")


In [None]:
# remove the rows where i94port is invalid
df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(validPorts.keys())))
df_immigration.createOrReplaceTempView("immigration")
immigration = spark.sql("""
                            select i94yr, i94mon, i94cit, i94port, arrdate, i94mode, depdate, i94visa
                            from immigration  
                              """)

In [None]:
#remove the rows that contains NaN AverageTemperature or duplicated locations.
df_temperature.createOrReplaceTempView("temperature")
temperature = spark.sql("""
                            select distinct temperature.AverageTemperature, temperature.City, 
                            temperature.Country, temperature.Latitude, temperature.Longitude
                            from temperature  
                            where temperature.AverageTemperature !='NaN'  """)

In [None]:
temperature = temperature.withColumn("i94port", get_city(temperature.City))
temperature = temperature.filter(temperature.i94port != 'null')


In [None]:
# Write immigration dimension table.
immigration.write.mode("append").partitionBy("i94port").parquet("/immigration/immigration.parquet")


In [None]:
# Write temperature dimension table.
temperature.write.mode("append").partitionBy("i94port").parquet("/temperature/temperature.parquet")


In [None]:
immigration.createOrReplaceTempView("immigration")
temperature.createOrReplaceTempView("temperature")

In [None]:
fact_table = spark.sql('''
select immigration.i94yr as year,
       immigration.i94mon as month,
       immigration.i94cit as city,
       immigration.i94port as i94port,
       immigration.arrdate as arrival_date,
       immigration.i94mode as travel_code,
       immigration.depdate as departure_date,
       immigration.i94visa as reason,
       immigration.AverageTemperature as temperature,
       immigration.Latitude as latitude,
       immigration.Longitude as longitude
from immigration
JOIN temperature ON (immigration.i94port = temperature.i94port)
''')

In [None]:
fact_table.write.mode("append").partitionBy("i94port").parquet("/fact/fact.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
def quality_checks(data, table):
    
    if data.count() == 0:
        print("Data quality failed for {} table with 0 events".format(table))
    else:
        print("Data quality passed for {} table with {} events".format(description, data.count()))
    return 0




In [None]:
quality_checks(immigration, "immigration")
quality_checks(temperature, "temperature")

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.
##### Dimension Table - immigration data Columns:
* I94YR = 4 digit year.
* I94MON = numeric month.
* I94CIT = 3 digit code of origin city.
* I94PORT = 3 character code of destination USA city.
* ARRDATE = arrival date in the USA.
* I94MODE = 1 digit travel code.
* DEPDATE = departure date from the USA.
* I94VISA = immigration reason.
##### Dimension Table - temperature data Columns:
* AverageTemperature = average temperature.
* City = city name.
* Country = country name,.
* Latitude= latitude.
* Longitude = longitude.
* I94PORT = 3 character code of destination USA city.
##### Fact Table - immigration data joined with the temperature Columns:
* year = 4 digit year.
* month = numeric month.
* city = 3 digit code of origin city.
* I94PORT = 3 character code of destination USA city.
* arrival_date = arrival date in the USA.
* travel_code = 1 digit travel code.
* departure_date = departure date from the USA.
* reason = immigration reason.
* AverageTemperature = average temperature of destination city,
* Latitude= latitude.
* Longitude = longitude.

### Step 5: Complete Project Write Up
##### Clearly state the rationale for the choice of tools and technologies for the project.
* i have chosen spark in order if there is a need for scaling up in the future
* i have chosen parquet format because it's columnar format and its compatible with hdfs if there is a need for scaling up in the future i could use hdfs
##### Propose how often the data should be updated and why.
* each month because the data is provided in monthly files.
##### Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     * i will add more spark nodes to scale up.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     * i will use orchestration tool like Airflow or Luigi to run the pipline every day if needed or i will use simple cron job depend on the situation  
 * The database needed to be accessed by 100+ people.
     * if it's not able to handle the number of user i may change the node profile or i will use a big data technolagy like hdfs or S3.