# Immigration and Temperature Data
### Data Engineering Capstone Project

#### Project Summary
Gathering and analysing data which contains immigration and temperature by city,
and finding out the relationship how the temperature of the cities is affect to immigration

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [9]:
# Do all imports and installs here
import pandas as pd
import re
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession

### Step 1: Scope the Project and Gather Data

#### Scope
This project focuses on aggregating immigration data, and then temperature data by the cities.
Those two data sets will be placed in two different fact tables.

#### Describe and Gather Data 


In [10]:
# Read the immigration sample data before the processing of original data
df_im = pd.read_csv("immigration_data_sample.csv", sep = ',')

# Show first 5 rows of the immigration dataframe
df_im.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [12]:
# Read temperature data by cities into Pandas dataframe from csv
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname)

# Show first 5 rows of the temperature dataframe
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [13]:
fname = 'us-cities-demographics.csv'
df_demo = pd.read_csv(fname, sep = ';')
df_demo.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [14]:
# Create Spark Session
spark = SparkSession\
.builder\
.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 2: Explore and Assess the Data
#### Cleaning the immigration data
This function filters i94 immigration data, using city code i94port data in I94_SAS_Labels_Descriptions.SAS
1. Export the list of the valid city codes to .txt file for validation purpose (valid_city_codes.txt)
2. Grab the list of valid city codes, using regualr expression (pattern: 'key' = 'value'), and store the key and value pairs in a dictionary
3. Filter the original i94 immigration data, using valid city codes, and store the data in the Spark dataframe and return it

In [21]:
# Pattern for valid city codes

pattern = re.compile(r"\'(.*)\'.*\'(.*)\'")
valid_city_codes = {}

with open('valid_city_codes.txt', 'r') as file:
    for line in file:
        grab = pattern.search(line)
        valid_city_codes[grab[1]] = [grab[2]]
       

def filter_i94_data(origin_file):
    
    # Read I94 data into Spark
    df_im = spark.read.format('com.github.saurfang.sas.spark').load(origin_file)

    # Filter out entries where i94port is invalid
    df_im = df_im.filter(df_im.i94port.isin(list(valid_city_codes.keys())))

    return df_im


+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| null|      G|   null|      Y|   null| 1991.0|     D/S|     M|  null|   null|  3.73679633E9|00296|      F1|
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    nul

#### Cleaning the temperature data
This step is using the function which return the corresponding i94port (city code) when the name of city is passed in
1. Read .csv temperature data (same as above step 1, but using Spark, not pandas)
2. Filter the data by the column has 'NaN'
3. Remove duplicated cities (City and country names should be combined)
4. Add i94port code column in the filtered temperature dataframe
5. Filter out the rows which i94port column is null

In [26]:
@udf()
def get_city_code(city_name):
    
    for key in valid_city_codes:
        if city_name.lower() in valid_city_codes[key][0].lower():
            return key

        
origin_file = '../../data2/GlobalLandTemperaturesByCity.csv'

# 1
df_temp = spark.read.format('csv').option('header', 'true').load(origin_file)

# 2
df_temp = df_temp.filter(df_temp.AverageTemperature != 'NaN')

# 3
df_temp = df_temp.dropDuplicates(['City', 'Country'])

# 4
df_temp=df_temp.withColumn("i94port", get_city_code(df_temp.City))

# 5
df_temp=df_temp.filter(df_temp.i94port != 'null')

# Show results
df_temp.show()

+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|     City|             Country|Latitude|Longitude|i94port|
+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|1852-07-01|             15.488|                        1.395|    Perth|           Australia|  31.35S|  114.97E|    PER|
|1828-01-01|             -1.977|                        2.551|  Seattle|       United States|  47.42N|  121.97W|    SEA|
|1743-11-01|              2.767|                        1.905| Hamilton|              Canada|  42.59N|   80.73W|    HAM|
|1849-01-01|  7.399999999999999|                        2.699|  Ontario|       United States|  34.56N|  116.76W|    ONT|
|1821-11-01|              2.322|                        2.375|  Spokane|       United States|  47.42N|  117.24W|    SPO|
|1843-01-01| 18.874000000000002|

### Step 3: Define the Data Model

#### 3.1 Conceptual Data Model

`dim_immigration_table`
Contains filtered data from the I94 immigration data.
- i94yr = 4-digit year
- i94mon = numeric month
- i94cit = 3-digit code of origin city
- i94port = 3-character code of destination city
- arrdate = arrival date
- i94mode = 1-digit travel code
- depdate = departure date
- i94visa = reason for immigration

`dim_temp_city_table`
Contains city temperature data.
- AverageTemperature = average temperature
- City = city name
- Country = country name
- Latitude= latitude
- Longitude = longitude
- i94port = 3-character code of destination city

`fact_immigration_temp_table`
Contains information from the I94 immigration data joined with the temperature by city data on i94port (city code).
- i94yr = 4-digit year
- i94mon = numeric month
- i94cit = 3-digit code of origin city
- i94port = 3-character code of destination city
- arrdate = arrival date
- i94mode = 1-digit travel code
- depdate = departure date
- i94visa = reason for immigration
- AverageTemperature = average temperature of destination city

#### 3.2 Mapping Out Data Pipelines
1. Clean I94 data to create Spark dataframe df_immigration for each month
2. Clean temperature data to create Spark dataframe df_temp (already performed in step2)
3. Create immigration dimension table by selecting relevant columns from df_immigration and write to parquet file partitioned by i94port (city code)
4. Create temperature dimension table by selecting relevant columns from df_temp and write to parquet file partitioned by i94port (city code)
5. Create fact table by joining immigration and temperature dimension tables on i94port and write to parquet file partitioned by i94port (city code)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [24]:
immigration_data = '../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat'

# Filter I94 immigration data and store as Spark dataframe
df_im = filter_i94_data(immigration_data)

# Extract columns for immigration dimension table
dim_immigration_table = df_im.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

# Write immigration dimension table to parquet, partitioned by i94port
dim_immigration_table.write.mode("append").partitionBy("i94port").parquet("./results/immigration.parquet")

In [27]:
# Extract columns for temperature dimension table
dim_temp_city_table = df_temp.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])

# Write temperature dimension table to parquet, partitioned by i94port
dim_temp_city_table.write.mode("append").partitionBy("i94port").parquet("./results/temperature.parquet")

In [28]:
# Create temporary views
df_im.createOrReplaceTempView("im_view")
df_temp.createOrReplaceTempView("temp_view")

# Create the fact table by joining the immigration and temperature views
fact_immigration_temp_table = spark.sql('''
SELECT im_view.i94yr as year,
       im_view.i94mon as month,
       im_view.i94cit as city,
       im_view.i94port as i94port,
       im_view.arrdate as arrival_date,
       im_view.depdate as departure_date,
       im_view.i94visa as reason,
       temp_view.AverageTemperature as temperature,
       temp_view.Latitude as latitude,
       temp_view.Longitude as longitude
FROM im_view
        JOIN temp_view ON (im_view.i94port = temp_view.i94port)
''')

# Write fact table to parquet, partitioned by i94port
fact_immigration_temp_table.write.mode("append").partitionBy("i94port").parquet("./results/fact.parquet")

#### 4.2 Data Quality Checks

This quality_check function will check the rows of the source (spark dataframe), and the result (dimension table), and compare whether those are same or not, and show the result on console.


In [29]:
dim_immigration_table.count()

3508813

In [34]:
df_temp.count()

KeyboardInterrupt: 

In [32]:
def quality_check(df_name, table_name):
    df_count = df_name.count()
    table_count = table_name.count()
    if df_count == table_count:
        print("{} and {} counts are matched: {}".format(df_name, table_name, df_count))
    else:
        print("{} and {} counts are not matched: {}, {} each".format(df_name, table_name, df_count, table_count))
        
quality_check(df_im, dim_immigration_table)
quality_check(df_temp, dim_temp_city_table)

KeyboardInterrupt: 

#### 4.3 Data dictionary 
Please check data_dictionary.md

#### Step 5: Complete Project Write Up
Clearly state the rationale for the choice of tools and technologies for the project.
- Spark can handle big data with various formats such as JSON, csv, SAS. Also Spark SQL let use well-known SQL syntax with python wrapper, and dataframe like Pandas.

Propose how often the data should be updated and why.
- The immigration data should be updated monthly, since the immigtaion data is being released monthly basis.
- The temperature data should be updated weekly, since we need average temperature, and daily basis is too short and monthly basis is too long.

Write a description of how you would approach the problem differently under the following scenarios:

- The data was increased by 100x.

Distributed processing like HDFS, using Hadoop MapReduce

- The data populates a dashboard that must be updated on a daily basis by 7am every day.

Using Airflow to control the related batch in timely order

- The database needed to be accessed by 100+ people.

Distributed processing like HDFS, using Hadoop MapReduce