# ETL with Demographics, Temperatures and City Immigration Datasets
### Data Engineering Capstone Project

#### Project Summary
In this project we will analyze and perform ETL on US immigration data, such as what are the most popular cities for immigration, what is the gender distribution of the immigrants, what is the visa type distribution of the immigrants, what is the average age per immigrant and what is the average temperature per month per city, We have 3 data sources: I94 immigration dataset, temperature dataset from Kaggle and US demographics dataset from OpenSoft.

Our final tables:

4 dimension tables:
- immigrant
- city
- temp
- time

1 fact table:
- immigration

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [28]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import month, avg, round, dayofweek, weekofyear, isnull, count, col, udf, year
from pyspark.sql.types import StringType, IntegerType
import os
import glob
import re


In [29]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
We will create 3 dimension tables, and one fact table

#### Describe and Gather Data 
- I94 Immigration: US National Tourism and Trade Office 
- World Temperature: Kaggle
- US Cities Demographic: OpenSoft

In [30]:
# Read i94 data
files = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")
fname = "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
df_i94 = spark.read.format("com.github.saurfang.sas.spark").load(fname)

In [31]:
# Read Temperatures data
fname = "../../data2/GlobalLandTemperaturesByCity.csv"
df_temp = spark.read.format("csv").option("delimiter", ",").option("header", "true").load(fname)

In [32]:
# Read demographics data
fname = "us-cities-demographics.csv"
df_demo = spark.read.format("csv").option("delimiter", ";").option("header", "true").load(fname)

### Step 2: Explore and Assess the Data
#### Explore the Data 
- Fixing missing values
- Fixing duplicates

### View first 5 rows of i94 data

In [33]:
df_i94.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


### Create udf to convert to pyspark date 

In [34]:
@udf(StringType())
def convert_datetime(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

### Clean i94 data

In [35]:
def i94_cleaning(df_i94):
    df_i94.dropna()
    df_i94.drop_duplicates()
    
    df_i94 = df_i94.withColumn("arrdate", convert_datetime(df_i94.arrdate))
    
    df_i94 = df_i94.filter(df_i94.i94addr != 'other')
    
    df_i94 = df_i94.select(col("i94port").alias("c_code"),
                       col("i94addr").alias("s_code"),
                       col("i94bir").alias("age"),
                       col("cicid").alias("id"), 
                       col("arrdate").alias("date"),
                       col("gender").alias("gender"),
                       col("i94visa").alias("v_type"),
                       "count")
    return df_i94

df_i94 = i94_cleaning(df_i94)

### Show first 5 rows to ensure it's i94 data is now right

In [36]:
df_i94.limit(5).toPandas()

Unnamed: 0,c_code,s_code,age,id,date,gender,v_type,count
0,ATL,AL,25.0,7.0,2016-04-07,M,3.0,1.0
1,WAS,MI,55.0,15.0,2016-04-01,M,2.0,1.0
2,NYC,MA,28.0,16.0,2016-04-01,,2.0,1.0
3,NYC,MA,4.0,17.0,2016-04-01,,2.0,1.0
4,NYC,MI,57.0,18.0,2016-04-01,,1.0,1.0


### Show first 5 rows of Temperatures data

In [37]:
df_temp.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


### Read and create list of valid prots

In [38]:
fname = "I94_SAS_Labels_Descriptions.SAS"
with open(fname) as fp:
    all_lines = fp.readlines()

re_after = re.compile(r"\'(.*)\'.*\'(.*)\'")
valid_ports = {}
for l in all_lines[302:961]:
    valid_ports[re_after.search(l).group(1)] = re_after.search(l).group(2)

### Create udf to validate state

In [39]:
@udf(StringType())
def convert_city_to_port(city):
    for key in valid_ports:
        if city.lower() in valid_ports[key].lower():
            return key

### Clean Temprature data

In [40]:
def temp_cleaning(df_temp):
    df_temp = df_temp.filter(df_temp["Country"] == "United States") \
    .withColumn("year", year(df_temp['dt'])) \
    .withColumn("month", month(df_temp["dt"])) \
    .withColumn("i94port", convert_city_to_port(df_temp["City"])) \
    .withColumn("AverageTemperature", col("AverageTemperature").cast("float")) \
    .dropna(how='any', subset=["i94port"])
    
    df_temp = df_temp.filter(df_temp["year"] == 2013)
    
    df_temp = df_temp.select(col("year"), 
                         col("month"), 
                         col("i94port").alias("c_code"),
                         col("AverageTemperature").alias("avg_temperature"),
                         col("Latitude").alias("lat"), 
                         col("Longitude").alias("long")).drop_duplicates()
    return df_temp

df_temp = temp_cleaning(df_temp)

### Show first 5 rows of Tempratures data

In [41]:
df_temp.limit(5).toPandas()

Unnamed: 0,year,month,c_code,avg_temperature,lat,long
0,2013,1,CHI,1.056,42.59N,87.27W
1,2013,5,RFD,16.014999,42.59N,89.45W
2,2013,4,SRQ,16.259001,32.95N,96.70W
3,2013,2,MHT,-3.314,42.59N,72.00W
4,2013,6,MIL,17.913,42.59N,87.27W


### Show first 5 rows of Demographics data

In [42]:
df_demo.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


### Clean Demographics data

In [43]:
def demo_cleaning(df_demo):
    df_demo = df_demo.withColumn("median_age", df_demo['Median Age']) \
    .withColumn("pct_male_pop", (df_demo['Male Population'] / df_demo['Total Population']) * 100) \
    .withColumn("pct_female_pop", (df_demo['Female Population'] / df_demo['Total Population']) * 100) \
    .withColumn("pct_veterans", (df_demo['Number of Veterans'] / df_demo['Total Population']) * 100) \
    .withColumn("pct_foreign_born", (df_demo['Foreign-born'] / df_demo['Total Population']) * 100) \
    .withColumn("pct_race", (df_demo['Count'] / df_demo['Total Population']) * 100) \
    .withColumn("c_code", convert_city_to_port(df_demo["City"])) \
    .dropna(how='any', subset=["c_code"])
    
    df_demo = df_demo.select(col("City").alias("c_name"), col("State Code").alias("s_code"), 
                         "median_age", "pct_male_pop", "pct_female_pop","pct_veterans", 
                         "pct_foreign_born", col("Total Population").alias("total_pop"), 
                         col("Race").alias("race"), "pct_race").drop_duplicates()
    
    df_demo = df_demo.groupBy("c_name", "s_code", "median_age", "pct_male_pop","pct_female_pop","pct_veterans", 
                          "pct_foreign_born", "total_pop").pivot("Race").avg("pct_race")

    df_demo = df_demo.withColumn("c_code", convert_city_to_port(df_demo["c_name"])) \
        .dropna(how='any', subset=["c_code"])
    
    df_demo = df_demo.select("c_code", "s_code", "c_name", "median_age",
                         col("pct_male_pop").alias("pct_male_pop"),
                         col("pct_female_pop").alias("pct_female_pop"),
                         col("pct_veterans").alias("pct_veterans"),
                         col("pct_veterans").alias("pct_foreign_born"),
                         col("American Indian and Alaska Native").alias("pct_native_american"),
                         col("Asian").alias("pct_asian"),
                         col("Black or African-American").alias("pct_black"),
                         col("Hispanic or Latino").alias("pct_hispanic_or_latino"),
                         col("White").alias("pct_white"), "total_pop")
    return df_demo

df_demo = demo_cleaning(df_demo)

### Show first 5 rows

In [44]:
df_demo.limit(5).toPandas()

Unnamed: 0,c_code,s_code,c_name,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,pct_native_american,pct_asian,pct_black,pct_hispanic_or_latino,pct_white,total_pop
0,TUC,AZ,Tucson,33.6,49.822448,50.177552,7.181468,7.181468,4.590971,4.643635,6.376088,43.452379,76.050738,531674
1,MCA,TX,Allen,37.2,52.297785,47.702215,3.571501,3.571501,0.231307,16.089588,13.389309,10.816401,71.165094,98138
2,CRP,TX,Corpus Christi,35.0,49.5208,50.4792,7.738165,7.738165,0.917052,2.792812,4.62167,61.940188,90.305231,324082
3,FMY,FL,Fort Myers,37.3,49.787205,50.212795,5.825846,5.825846,,4.754442,23.370938,24.13835,67.782206,74015
4,ORL,FL,Orlando,33.1,48.332146,51.667854,4.71805,4.71805,0.876283,4.106055,25.129837,32.96434,66.11656,270917


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
We chosed a star schema as our schema, because it's a simple schema, but also very efficient, we can do many joins with our tables, and analyze the result easliy

#### 3.2 Mapping Out Data Pipelines
We need several steps to ensure our data pipeline runs without any issuse
- remove nans
- remove duplicates
- create dimension tables
- create fact table
- save all these tables into parquet file

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

### Create immigrant (Dimension Table)

In [45]:
df_immigrant = df_i94.select("id", "gender", "age", "v_type").drop_duplicates()

### Create city (Dimension Table)

In [46]:
df_city = df_demo.join(df_temp, "c_code").select("c_code", "s_code", "c_name", 
                                                 "median_age", "pct_male_pop", "pct_female_pop", "pct_veterans",
                                                 "pct_foreign_born", "pct_native_american", "pct_asian", "pct_black",
                                                 "pct_hispanic_or_latino", "pct_white", "total_pop", "lat", "long").drop_duplicates()

### Create month temperature (Dimension Table)

In [47]:
month_temp_df = df_temp.select("c_code", "year", "month", "avg_temperature").drop_duplicates()

### Create Time (Dimension Table)

In [48]:
df_time = df_i94.withColumn("dayofweek", dayofweek("date"))\
                .withColumn("weekofyear", weekofyear("date"))\
                .withColumn("month", month("date"))

df_time = df_time.select("date", "dayofweek", "weekofyear", "month").drop_duplicates()

### Create immigration (Fact Table)

In [49]:
df_immigration  = df_i94.select("id", "s_code", "c_code", "date", "count").drop_duplicates()

### Save all tables as parquet files

In [23]:
df_i94.write.mode("overwrite").partitionBy("gender", "age").parquet("immigrants")
df_city.write.mode("overwrite").partitionBy("s_code").parquet("cities")
month_temp_df.write.mode("overwrite").parquet("monthly_city_temperatues")
df_time.write.mode("overwrite").parquet("time")
df_i94.write.mode("overwrite").partitionBy("s_code", "c_code").parquet("immigration")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

### Ensure all tables are exist

In [23]:
def is_table_exists(df):
    if df is not None:
        return True
    else:
        return False

if is_table_exists(df_immigrant) & is_table_exists(df_city) & is_table_exists(month_temp_df) & is_table_exists(df_time) & is_table_exists(df_immigration):
    print("quality check succeeded")
else:
    print("quality check failed")

quality check succeeded


### Ensure all tables are not empty

In [24]:
def is_table_empty(df):
    return df.count() != 0 

if is_table_empty(df_immigrant) & is_table_empty(df_city) & is_table_empty(month_temp_df) & is_table_empty(df_time) & is_table_empty(df_immigration):
    print("quality check succeeded")
else:
    print("quality check failed!")

quality check succeeded


### Create tables

In [50]:
df_city.createOrReplaceTempView("city")
df_immigration.createOrReplaceTempView("immigration")
df_immigrant.createOrReplaceTempView("immigrant")
month_temp_df.createOrReplaceTempView("temperature")
df_time.createOrReplaceTempView("time")

In [51]:
spark.sql("""
SELECT * FROM immigration
JOIN immigrant
ON immigration.id = immigrant.id
JOIN time
ON time.date = immigration.date
JOIN temperature
ON temperature.c_code = immigration.c_code
JOIN city
ON immigration.c_code = city.c_code
LIMIT 5
""").printSchema()

root
 |-- id: double (nullable = true)
 |-- s_code: string (nullable = true)
 |-- c_code: string (nullable = true)
 |-- date: string (nullable = true)
 |-- count: double (nullable = true)
 |-- id: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- v_type: double (nullable = true)
 |-- date: string (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- c_code: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- avg_temperature: float (nullable = true)
 |-- c_code: string (nullable = true)
 |-- s_code: string (nullable = true)
 |-- c_name: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- pct_male_pop: double (nullable = true)
 |-- pct_female_pop: double (nullable = true)
 |-- pct_veterans: double (nullable = true)
 |-- pct_foreign_born: double (nullable = true)
 |-- pct_na

### Most popular city for immigration

In [67]:
spark.sql("""
SELECT c_code, COUNT(c_code) as count FROM immigration
GROUP BY c_code
ORDER BY count DESC
LIMIT 10
""").show()

+------+------+
|c_code| count|
+------+------+
|   NYC|474904|
|   MIA|329385|
|   LOS|292243|
|   SFR|148637|
|   ORL|144693|
|   HHW|137100|
|   NEW|133376|
|   CHI|126139|
|   HOU| 95736|
|   FTL| 91756|
+------+------+



### AVG age of immigrants

In [68]:
spark.sql("""
SELECT AVG(age) FROM immigrant
""").show()

+----------------+
|        avg(age)|
+----------------+
|41.7599608199143|
+----------------+



### Gender distribution

In [71]:
spark.sql("""
SELECT gender, COUNT(gender) as count FROM immigrant
GROUP BY gender
ORDER BY count DESC
""").show()

+------+-------+
|gender|  count|
+------+-------+
|     M|1308290|
|     F|1242009|
|     X|    778|
|     U|    325|
|  null|      0|
+------+-------+



#### 4.3 Data dictionary 
We have 4 dimension tables:
- immigrant
    - id (Primary key)
    - gender
    - age
    - v_type


- city
    - c_code
    - s_code (Primary key)
    - c_name
    - median_age
    - pct_male_pop
    - pct_female_pop
    - pct_veterans
    - pct_foreign_born
    - pct_native_american
    - pct_asian
    - pct_black
    - pct_hispanic_or_latino
    - pct_white
    - total_pop
    - lat
    - long


- temp
    - c_code (Primary key)
    - year
    - month
    - avg_temperature


- time
    - date (Primary key)
    - dayofweek
    - weekofyear
    - month

We have 1 fact table:
- immigration
    - id (Primary key)
    - s_code
    - c_code
    - date
    - count

In [27]:
df_city.printSchema()

root
 |-- c_code: string (nullable = true)
 |-- s_code: string (nullable = true)
 |-- c_name: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- pct_male_pop: double (nullable = true)
 |-- pct_female_pop: double (nullable = true)
 |-- pct_veterans: double (nullable = true)
 |-- pct_foreign_born: double (nullable = true)
 |-- pct_native_american: double (nullable = true)
 |-- pct_asian: double (nullable = true)
 |-- pct_black: double (nullable = true)
 |-- pct_hispanic_or_latino: double (nullable = true)
 |-- pct_white: double (nullable = true)
 |-- total_pop: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)



In [28]:
df_immigration.printSchema()

root
 |-- id: double (nullable = true)
 |-- s_code: string (nullable = true)
 |-- c_code: string (nullable = true)
 |-- date: string (nullable = true)
 |-- count: double (nullable = true)



In [29]:
month_temp_df.printSchema()

root
 |-- c_code: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- avg_temperature: float (nullable = true)



In [30]:
df_time.printSchema()

root
 |-- date: string (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)
 |-- month: integer (nullable = true)



In [31]:
df_immigrant.printSchema()

root
 |-- id: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- v_type: double (nullable = true)



#### Step 5: Complete Project Write Up
* We chosed to work with spark as it good for processing large amount of data, and can be easily scaled up by adding more nodes
* We can update our data once every month to keep updated with temperature data
* The data was increased by 100x: We can add more work node to be able to handle this large amount of data
* The data populates a dashboard that must be updated on a daily basis by 7am every day: We can use AirFlow with its DAGs and schedulers
* The database needed to be accessed by 100+ people: We can create a Data Warehouse or data lake, for example, we can use more EC2s, as a data warehouse or data lake consist of clusters, and have more processing power, and more storage capacity, which makes it accessible by +100 users