## Immigration and Temperature Data
### Data Engineering Capstone Project

#### Project Summary
The goal of this project is to build an ETL pipeline using  immigration data and city temperature data to answer questions about migration deeds , do individuals be likely to migrate to warmer places?

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [14]:
# Do all imports and installs here
import pandas as pd, re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, we will combined 94 immigration data by destination city to form our first dim table. Then we will combined city temperature data by city to form the second dim table. The two datasets will be joined on destination city to form the fact table.


#### Describe and Gather Data 
94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.

World Temperature Data: This dataset came from Kaggle. 
U.S. City Demographic Data: This data comes from OpenSoft.
* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination USA city
* arrdate = arrival date in the USA
* i94mode = 1 digit travel code
* depdate = departure date from the USA
* i94visa = reason for immigration

* AverageTemperature = average temperature
* City = city name
* Country = country name
* Latitude= latitude
* Longitude = longitude

In [15]:
# Read  immigration data into Pandas for exploration
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


In [16]:
df.head()


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [17]:
## Read in the temperature data into Pandas 
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df = pd.read_csv(fname, sep=',')
df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [18]:

from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


### Step 2: Explore and Assess the Data
#### Explore the Data 
For the  immigration data, we want to drop all entries where the destination city code i94port is not a valid value  as described in I94_SAS_Labels_Description.SAS. For the temperature data, we want to drop all entries where AverageTemperature is NaN, then drop all duplicate locations, and then add the i94port of the location in each entry.
#### Cleaning Steps


In [19]:
# Performing cleaning
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94port_valid = {}
with open('i94port_valid.txt') as f:
     for line in f:
         match = re_obj.search(line)
         i94port_valid[match[1]]=[match[2]]

def clean_i94_data(file):
  
      # Read I94 data into Spark
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(file)

    # Filter out entries where i94port is invalid
    df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(i94port_valid.keys())))

    return df_immigration


In [20]:
# Clean temperature data
df_temp=spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

# Filter out entries with NaN average temperature
df_temp=df_temp.filter(df_temp.AverageTemperature != 'NaN')

# Remove duplicate locations
df_temp=df_temp.dropDuplicates(['City', 'Country'])

@udf()
def get_i94port(city):
    
    for key in i94port_valid:
        if city.lower() in i94port_valid[key][0].lower():
            return key

# Add iport94 code based on city name
df_temp=df_temp.withColumn("i94port", get_i94port(df_temp.City))

# Remove entries with no iport94 code
df_temp=df_temp.filter(df_temp.i94port != 'null')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The first dim table will contain events from the immigration data. The columns below will be extracted from the immigration dataframe:

* i94yr 
* i94mon 
* i94cit 
* i94port 
* arrdate 
* i94mode 
* depdate 
* i94visa 
The second dim table will contain city temperature data. The columns below will be extracted from the temperature dataframe:

* i94port 
* AverageTemperature 
* City 
* Country 
* Latitude
* Longitude
The fact table will contain information from the  immigration data joined with the  temperature data on i94port:

* i94yr 
* i94mon 
* i94cit
* i94port
* arrdate 
* i94mode 
* depdate 
* i94visa
* AverageTemperature 

#### 3.2 Mapping Out Data Pipelines
The pipeline steps are described below:

Clean I94 data as described in step 2 to make Spark dataframe df_immigration for each month
Clean temperature data as defined in step 2 to build Spark dataframe df_temp 
Build immigration dimension table by picking related columns from df_immigration and write to parquet file partitioned by i94port
Make temperature dimension table by choosing related columns from df_temp and write to parquet file partitioned by i94port
Build fact table by joining immigration and temperature dimension tables on i94port and write to parquet file partitioned by i94port


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [21]:
# Write code here
immigration_data = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

# Clean  immigration data  ,store as Spark dataframe
df_immigration = clean_i94_data(immigration_data)

# Extract columns for immigration dimension table
immigration_table = df_immigration.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

#  parquet files partitioned by i94port
immigration_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

# temp table
temp_table = df_temp.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])

# Write parquet files partitioned by i94port
temp_table.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

# Create temp views of the immigration and temperature data
df_immigration.createOrReplaceTempView("immigration_view")
df_temp.createOrReplaceTempView("temp_view")

# Create the fact table by joining the immigration and temperature views
fact_table = spark.sql('''
SELECT immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as i94port,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       temp_view.AverageTemperature as temperature,
       temp_view.Latitude as latitude,
       temp_view.Longitude as longitude
FROM immigration_view
JOIN temp_view ON (immigration_view.i94port = temp_view.i94port)
''')

# Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks
ensure there are adequate number of entries in each table.


 
Run Quality Checks

In [22]:
# Perform quality checks here
def quality_check(df, description):
  
    
    result = df.count()
    if result == 0:
        print("Data quality check failed  {} with zero records".format(description))
    else:
        print("Data quality check passed  {} with {} records".format(description, result))
    return 0

# Perform data quality check
quality_check(df_immigration, "immigration table")
quality_check(df_temp, "temperature table")

Data quality check failed  immigration table with zero records
Data quality check failed  temperature table with zero records


0

#### 4.3 Data dictionary 
The first dim table will contain events from the immigration data. The columns below will be extracted from the immigration dataframe:

* - i94yr = 4 digit year
* - i94mon = numeric month
* - i94cit = 3 digit code of origin city
* - i94port = 3 character code of destination city
* - arrdate = arrival date
* - i94mode = 1 digit travel code
* - depdate = departure date
* - i94visa = reason for immigration
The second dimension table will contain city temperature data. The columns below will be extracted from the temperature dataframe:

* - i94port = 3 character code of destination city
* - AverageTemperature = average temperature
* - * - City = city name
* - Country = country name
* - Latitude= latitude
* - Longitude = longitude
The fact table will contain information from the I94 immigration data joined with the city temperature data on i94port:

* - i94yr = 4 digit year
* - i94mon = numeric month
* - i94cit = 3 digit code of origin city
* - i94port = 3 character code of destination city
* - arrdate = arrival date
* - i94mode = 1 digit travel code
* - depdate = departure date
* - i94visa = reason for immigration
* - AverageTemperature = average temperature of destination city
* - The tables will be saved to Parquet files partitioned by city (i94port).


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
Spark was chosen since it can easily handle multiple file formats
Spark SQL was chosen to process the large input files
* Propose how often the data should be updated and why.
Data model design is completely built on a hypothesis that data reception is monthly basis
in conjunction with the current raw file format.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 I would  go with cloud  storage, data analysis  using AWS services cause it provided high scalability, reliability and availability

 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 we could use a scheduling tool such as Airflow which controls and automates the task execution to run the ETL pipeline overnight.
 * The database needed to be accessed by 100+ people.
 If the database required to be accessed by 100+ people, we could consider issuing the parquet files to HDFS and giving read access to users that want it. If the users want to run SQL queries on the raw data, we could consider publishing to HDFS
