# Analysis on Imagration Events
### Data Engineering Capstone Project

#### Project Summary
The project goal is to build an ETL pipeline to process I94 immigration data and city temperature data.
Then it will be used to analyze whether temperature of city will influence immigration events.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [44]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, year, month, col, isnull
import pandas as pd
import numpy as np
from pyspark.sql.types import DateType
from datetime import datetime, timedelta
from glob import glob

In [2]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

Two datasets are used in this project:

    1. City temperature    --   Has Average temperature city level.
    2. I94 immigration     --   Has data related immigration events

City temperature data will be aggregated on the city level that will be used to join with I94 immigration data using i94port column and figure out will immigrants choose a city based on destination city temperature.


Spark is used in this project to handle large data like I94 immigration data.


#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

#### Data Sets

**World Temperature Data :** Data comes from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).
It is in csv format and contains world temparature data.
>**Important Columns**
>- AverageTemperature = average temperature
>- City = city name
>- Country = country name

 **I94 Immigration Data:** Data comes from [US National Tourism and Trade Office](https://travel.trade.gov/research/reports/i94/historical/2016.html).It is in SAS7BDAT format and contains immigration events.
>**Important Columns**
>- cicid  = This is the unique Identifier
>- i94yr  = 4 digit year of the arrival
>- i94mon = numeric month of the arrival
>- i94cit = 3 digit code of origin city
>- i94port = 3 character code of destination city
>- arrdate = arrival date in the USA
>- i94mode = 1 digit travel code
>- depdate = departure date from the USA
>- i94visa = reason for immigration


In [3]:
# Read in the data here
temparatureData = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")
immigrationData = spark.read.format('com.github.saurfang.sas.spark').load("../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat")

In [4]:
#printing first five rows
temparatureData.show(5)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



In [5]:
#printing first five rows
immigrationData.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|validres|delete_days|delete_mexl|delete_dup|delete_visa|delete_recdup|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|  4.0|2016.0|   6.0| 135.0| 135.0|    XXX|20612.0|   null|   null|   null|  59.0|    2.0|  1.0|     1.0|        0.0|    

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

**World Temperature Data :** By seeing above sample data we can observe that AverageTemperature has null values.

**I94 Immigration Data:** We will clean i94yr column which contains invalid values(XXX, 99, NaN, etc).We will filter 
i94yr columns with valid values presnt in I94_SAS_Labels_Descriptions file.
#### Cleaning Steps
Document steps necessary to clean the data 

In [6]:
#i94port city and their respective code is stored in key value in python dictionary
fname = r'destination_city.txt'
city_and_code = {}
with open(fname) as f:
    for line in f:
        words = line.split('\'')
        city = words[3].split(',')[0].lower()
        code = words[1]
        city_and_code[city] = code    

In [7]:
#creating user defined function thaat takes city name and returns city code if exists else None
@udf
def getCitybyCode(city):
    code = city_and_code.get(city.lower())
    return code

In [8]:
#we filtered country to do analysis on us data and filtered dt column only latest year because immigration data contains only 2016 dat
temparatureData = temparatureData.filter(year(col("dt"))==2013)\
.filter(col("Country")=="United States")\
.filter(col("AverageTemperature") != 'NaN')\
.withColumn("i94port",getCitybyCode(col("City")))\

In [9]:
temparatureData.createOrReplaceTempView("temparatureTbl")

In [10]:
#filtering non null columns averaging temparature column based on required columns
temparatureClean = spark.sql('''
select i94port, City, Country, round(avg(AverageTemperature),2) as AVGCityTemparature from  temparatureTbl 
where i94port is not null
group by i94port, City, Country
''')

In [11]:
temparatureClean.printSchema()

root
 |-- i94port: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- AVGCityTemparature: double (nullable = true)



In [12]:
temparatureClean.show(5)

+-------+------------+-------------+------------------+
|i94port|        City|      Country|AVGCityTemparature|
+-------+------------+-------------+------------------+
|    GRP|Grand Rapids|United States|             10.76|
|    NOG|     Nogales|United States|             19.71|
|    NWH|   New Haven|United States|             12.33|
|    JAC|Jacksonville|United States|             22.49|
|    KAN| Kansas City|United States|             13.99|
+-------+------------+-------------+------------------+
only showing top 5 rows



Clean immigration data

In [13]:
def cleanImmigirationData(file):
    """
    Input: Takes file that is in SAS7BDAT format.
    Output: cleans the file and returns dataframe
    """
    df = spark.read.format('com.github.saurfang.sas.spark').load(file)
    
    df = df.filter(df.i94addr.isNotNull())\
            .filter(df.i94res.isNotNull())\
            .filter(df.i94port.isin(list(city_and_code.values())))\
            .withColumn("Year",col("i94yr").cast("integer"))\
            .withColumn("Month",col("i94mon").cast("integer"))
    df = df.select("cicid","Year","Month",col("i94cit").alias("OriginCity"),col("i94port").alias("DestiCity"),\
                   "arrdate",col("i94mode").alias("TravelMode"),"depdate",col("i94visa").alias("VisaReason"))
    
    return df

In [15]:
sas_files = glob('../../data/18-83510-I94-Data-2016/*.sas7bdat')

In [17]:
for file in sas_files:
    df = cleanImmigirationData(file)
    df.write.mode('append').parquet("immigration_files/i94immigration.parquet")

In [18]:
ImmigirationData = spark.read.parquet("immigration_files/i94immigration.parquet")

In [19]:
ImmigirationData.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- OriginCity: double (nullable = true)
 |-- DestiCity: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- TravelMode: double (nullable = true)
 |-- depdate: double (nullable = true)
 |-- VisaReason: double (nullable = true)



In [20]:
ImmigirationData.show(5)

+---------+----+-----+----------+---------+-------+----------+-------+----------+
|    cicid|Year|Month|OriginCity|DestiCity|arrdate|TravelMode|depdate|VisaReason|
+---------+----+-----+----------+---------+-------+----------+-------+----------+
|6642585.0|2016|   12|     204.0|      BOS|20817.0|       1.0|20826.0|       2.0|
|6642586.0|2016|   12|     129.0|      BOS|20817.0|       1.0|20831.0|       2.0|
|6642587.0|2016|   12|     691.0|      MIA|20817.0|       1.0|20827.0|       2.0|
|6642588.0|2016|   12|     135.0|      NYC|20817.0|       1.0|20826.0|       2.0|
|6642590.0|2016|   12|     209.0|      NEW|20817.0|       1.0|20819.0|       2.0|
+---------+----+-----+----------+---------+-------+----------+-------+----------+
only showing top 5 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model
We will follow start Schema 

**Dimension Tables**

**DimTemperature :**  It contains average temparature city in U.S.
* i94port = code for city
* AverageTemperature = average temperature
* City = city name
* Country = country name


**DimImmigration:**   It contains immigration events.
* cicid  = This is the unique Identifier
* Year  = 4 digit year of the arrival
* Month = numeric month of the arrival
* OriginCity = 3 digit code of origin city
* DestiCity = 3 character code of destination city
* Arrdate = arrival date in the USA
* TravelMode = 1 digit travel code
* Depdate = departure date from the USA
* VisaReason = reason for immigration

**Fact Table**
**FactImmigration Data:**   It contains data from both immigration and  temparature dimension tables joined based on i94port(DestiCity).
* cicid  = This is the unique Identifier
* Year  = 4 digit year of the arrival
* Month = numeric month of the arrival
* OriginCity = 3 digit code of origin city
* DestiCity = 3 character code of destination city
* Arrdate = arrival date in the USA
* TravelMode = 1 digit travel code
* Depdate = departure date from the USA
* VisaReason = reason for immigration
* AverageTemperature = average temperature


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

    1.DimTemperature is crested by reading data from csv file and aggregated data after cleaning (completed instep 2)

    2.DimImmigration is created from list of sas files and cleaned (completed instep 2)

    3.FactImmigration is created by joining i94port(DestiCity) by using DimTemperature and DimImmigration tables

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

we already cleaned data in step 2 now we will use those dataframes and store them in a parquet files

In [31]:
ImmigirationData.createOrReplaceTempView("dimImmig")

In [32]:
temparatureClean.createOrReplaceTempView("dimTemp")

In [33]:
# fact table created using two dim tables by joining i94port
factImmig = spark.sql('''
SELECT im.cicid,
       im.Year,
       im.Month,
       im.OriginCity,
       im.arrdate,
       im.depdate,
       im.VisaReason ,
       tmp.AVGCityTemparature as Citytemperature,
       tmp.City as DesitinationCity,
       tmp.i94port
FROM dimImmig  im
JOIN dimTemp  tmp ON (im.DestiCity = tmp.i94port)
''')

In [37]:
factImmig.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- OriginCity: double (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- depdate: double (nullable = true)
 |-- VisaReason: double (nullable = true)
 |-- Citytemperature: double (nullable = true)
 |-- DesitinationCity: string (nullable = true)
 |-- i94port: string (nullable = true)



In [34]:
factImmig.write.mode("append").partitionBy("i94port").parquet("/target/factImmigiration.parquet")

In [35]:
temparatureClean.write.mode("append").partitionBy("i94port").parquet("/target/dimTemparature.parquet")

In [38]:
factImmig.write.mode("append").partitionBy("i94port").parquet("/target/dimImmigiration.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [40]:
# Perform quality checks here
# we will check counts
factImmig.count()

32548841

In [41]:
temparatureClean.count()

90

In [42]:
factImmig.count()

32548841

In [45]:
# will check for null values in unique columns
factImmig.select(isnull('cicid').alias('imgID')).dropDuplicates().show()

+-----+
|   id|
+-----+
|false|
+-----+



In [46]:
temparatureClean.select(isnull('i94port').alias('tempID')).dropDuplicates().show()

+------+
|tempID|
+------+
| false|
+------+



In [48]:
factImmig.select(isnull('cicid').alias('factID')).dropDuplicates().show()

+------+
|factID|
+------+
| false|
+------+



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from.


**Dimension Tables**

**DimTemperature :**  It contains average temparature city in U.S.
* i94port = code for city
* AverageTemperature = average temperature
* City = city name
* Country = country name


**DimImmigration:**   It contains immigration events.
* cicid  = This is the unique Identifier
* Year  = 4 digit year of the arrival
* Month = numeric month of the arrival
* OriginCity = 3 digit code of origin city
* DestiCity = 3 character code of destination city
* Arrdate = arrival date in the USA
* TravelMode = 1 digit travel code
* Depdate = departure date from the USA
* VisaReason = reason for immigration

**Fact Table**
**FactImmigration Data:**   It contains data from both immigration and  temparature dimension tables joined based on i94port(DestiCity).
* cicid  = This is the unique Identifier
* Year  = 4 digit year of the arrival
* Month = numeric month of the arrival
* OriginCity = 3 digit code of origin city
* DestiCity = 3 character code of destination city
* Arrdate = arrival date in the USA
* TravelMode = 1 digit travel code
* Depdate = departure date from the USA
* VisaReason = reason for immigration
* AverageTemperature = average temperature

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
  Python is used here as a programming language it is easy to maintain and read.
  It has vast libraries we can use to deal with different files
  
  Spark is used because it processes data fast compared to Hadoop.
  It has API in python that is easy to implement ETL pipeline


* Propose how often the data should be updated and why.
    It mostly depends on the requirement of users.
    If needed Data can be updated incrementally on a daily basis.
    so, that it will be analyzed to make business decisions on the current trend.

* Write a description of how you would approach the problem differently under the following scenarios:

* The data was increased by 100x.
 
     We will use spark with more nodes for processing and use s3 as a data lake.
     It has in-memory processing capability and faster than Hadoop.
     Files in s3 can be used to build cloud data warehouse using redshift.


* The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
    we will use Airflow to schedule jobs on a daily basis.
    We will configure mail alerts and data quality checks.


* The database needed to be accessed by 100+ people.
 
    we can use AWS Redshift as it is petabyte scalable.
    we will enable auto-scaling and copies of data is made available in different region near to the user.