# Project Title
### Data Engineering Capstone Project

#### Project Summary
`This project aims to be able to answers questions on US immigration such as what are the most popular cities for immigration, what is the gender distribution of the immigrants, what is the visa type distribution of the immigrants, what is the average age per immigrant and what is the average temperature per month per city. We extract data from 3 different sources, the I94 immigration dataset of 2016, city temperature data from Kaggle and US city demographic data from OpenSoft. We design 4 dimension tables: Cities, immigrants, monthly average city temperature and time, and 1 fact table: Immigration. We use Spark for ETL jobs and store the results in parquet for downstream analysis.`

In [1]:
# Do all imports and installs here
import pandas as pd
import os
import glob
import re
from datetime import datetime, timedelta
from pprint import pprint
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, year, month, avg, round, dayofweek, weekofyear, isnull
from pyspark.sql.types import StringType, IntegerType

### Step 1: Scope the Project and Gather Data

1.1 Scope

`The goal of this project is pull data from 3 different sources and create fact and dimension table to be able to do analysis on US immigration using factors of city monthly average temperature, city demographics and seasonality.`

1.2 Data Soures

    * I94 Immigration Data: `comes from the U.S. National Tourism and Trade Office and contains various statistics on international visitor arrival in USA and comes from the US National Tourism and Trade Office. The dataset contains data from 2016.`
    * World Temperature Data: `comes from Kaggle and contains average weather temperatures by city.`
    * U.S. City Demographic Data: `comes from OpenSoft and contains information about the demographics of all US cities such as average age, male and female population.`

In [2]:
# Create spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

In [3]:
# Read Data
# Read i94 data
i94_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
# Read temparature data
temperature_df = spark.read.format("csv").option("delimiter", ",").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")
# Read demographics data
demo_df = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("us-cities-demographics.csv")

### Step 2: Explore and Assess the Data

## Immigration Data

In [4]:
# Performing cleaning tasks here
i94_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [5]:
# Drop nas
cleaned_i94_df = i94_df.dropna(how="any", subset=["i94port", "i94addr", "gender","i94mode"])

# Drop non us immigration data

cleaned_i94_df = cleaned_i94_df.filter(cleaned_i94_df.i94addr != 'other')

In [6]:
# Get valid states from demographics data
valid_states = demo_df.toPandas()["State Code"].unique()

In [7]:
# Validate states in i94 data
@udf(StringType())
def validate_state(x):  
    if x in valid_states:
        return x
    return 'other'

cleaned_i94_df = cleaned_i94_df.withColumn("i94addr", validate_state(cleaned_i94_df.i94addr))

In [8]:
# Convert sas date format
@udf(StringType())
def convert_datetime(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

cleaned_i94_df = cleaned_i94_df.withColumn("arrdate", convert_datetime(cleaned_i94_df.arrdate))

In [9]:
# Convert to staging table schema

staging_i94_df = cleaned_i94_df.select(col("cicid").alias("immigrant_id"), 
                                       col("arrdate").alias("date"),
                                       col("i94port").alias("city_code"),
                                       col("i94addr").alias("state_code"),
                                       col("I94res").alias("resident_country"),
                                       col("i94bir").alias("age"),
                                       col("gender").alias("gender"),
                                       col("i94visa").alias("visa_type"),"count").drop_duplicates()

## Temparature Data

### Clean temparature data to have only valid cities

In [10]:
# Get city, state and visa type maaping from SAS data label
i94_sas_label_descriptions_fname= "I94_SAS_Labels_Descriptions.SAS"
with open(i94_sas_label_descriptions_fname) as f:
    lines = f.readlines()

re_compiled = re.compile(r"\'(.*)\'.*\'(.*)\'")
valid_ports = []
valid_states = []
visa_types = []
country_map = []
valid_ports_map = {}

for line in lines[10:298]:
    country_code = line.split("=")[0].strip()
    country_name = line.split("=")[1].strip()
    country_map.append({"country_code":country_code, "country_name": country_name})
    
for line in lines[302:961]:
    results = re_compiled.search(line)
    valid_ports.append({"city_code":results.group(1), "city_name":results.group(2).strip()})
    valid_ports_map[results.group(1).strip()] = results.group(2).strip()
    
for line in lines[982:1036]:
    results = re_compiled.search(line)
    valid_states.append({"state_code":results.group(1).strip(), "state_name" : results.group(2).strip()})
    
for line in lines[1046:1049]:
    visa_type = line.split("=")[0].strip()
    visa_desc = line.split("=")[1].strip()
    visa_types.append({"visa_code":visa_type, "visa_desc": visa_desc})

In [11]:
# Convert state mappning to df
dim_port_map = spark.createDataFrame(valid_ports, schema=["country_code","country_name"])

# Convert state mapping to df
dim_state_map = spark.createDataFrame(valid_states, schema=["state_code","state_name"])

# Convert visa mapping to df
dim_visa_map = spark.createDataFrame(visa_types, schema = ["visa_code", "visa_desc"])

# Convert to country df

dim_country_map = spark.createDataFrame(country_map, schema = ["country_code", "country_name"])



In [12]:
# Filter temparature data using udf

@udf(StringType())
def city_to_port(city):
    for key in valid_ports_map:
        if city.lower() in valid_ports_map[key].lower():
            return key
        
cleaned_temp_df = temperature_df.filter(temperature_df["Country"] == "United States") \
    .withColumn("year", year(temperature_df['dt'])) \
    .withColumn("month", month(temperature_df["dt"])) \
    .withColumn("i94port", city_to_port(temperature_df["City"])) \
    .withColumn("AverageTemperature", col("AverageTemperature").cast("float")) \
    .dropna(how='any', subset=["i94port"])

In [13]:
# Convert to staging table structure
staging_temp_df = cleaned_temp_df.select(col("year"), col("month"), col("i94port").alias("city_code"),
                                         round(col("AverageTemperature"), 1).alias("avg_temperature"),
                                         col("Latitude").alias("lat"), col("Longitude").alias("long")).drop_duplicates()

## Demographics data

In [14]:
# Calculate percentages of numeric columns and create new ones
cleaned_demo_df = demo_df.withColumn("median_age", demo_df['Median Age']) \
    .withColumn("pct_male_pop", (demo_df['Male Population'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_female_pop", (demo_df['Female Population'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_veterans", (demo_df['Number of Veterans'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_foreign_born", (demo_df['Foreign-born'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_race", (demo_df['Count'] / demo_df['Total Population']) * 100) \
    .withColumn("city_code", city_to_port(demo_df["City"])) \
    .dropna(how='any', subset=["city_code"])

cleaned_demo_df = cleaned_demo_df.select(col("City").alias("city_name"), col("State Code").alias("state_code"), 
                         "median_age", "pct_male_pop", "pct_female_pop","pct_veterans", 
                         "pct_foreign_born", col("Total Population").alias("total_pop"), 
                         col("Race").alias("race"), "pct_race").drop_duplicates()

In [15]:
#Pivot table based on race 
pivot_demo_df = cleaned_demo_df.groupBy("city_name","state_code", "median_age", "pct_male_pop",
                                        "pct_female_pop","pct_veterans", "pct_foreign_born", "total_pop").pivot("Race").avg("pct_race")

pivot_demo_df = pivot_demo_df.withColumn("city_code", city_to_port(pivot_demo_df["city_name"])) \
    .dropna(how='any', subset=["city_code"])

In [16]:
#Convert to staging structure 


staging_demo_df = pivot_demo_df.select("city_code", "state_code", "median_age",
                                    round(col("pct_male_pop"), 1).alias("pct_male_pop"),
                                    round(col("pct_female_pop"), 1).alias("pct_female_pop"),
                                    round(col("pct_veterans"), 1).alias("pct_veterans"),
                                    round(col("pct_veterans"), 1).alias("pct_foreign_born"),
                                    round(col("American Indian and Alaska Native"), 1).alias("pct_native_american"),
                                    round(col("Asian"), 1).alias("pct_asian"),
                                    round(col("Black or African-American"), 1).alias("pct_black"),
                                    round(col("Hispanic or Latino"), 1).alias("pct_hispanic_or_latino"),
                                    round(col("White"), 1).alias("pct_white"), "total_pop")

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The star schema is chosen as the data model because it is simple and yet effective. users can write simple queries by joing fact and dimension tables to analyze the data.

Here are the tables of the schema:

1. Staging Tables
    * staging_i94_df
    
    `id
    date
    city_code
    state_code
    age
    gender
    visa_type
    count`

    * staging_temp_df
    
    `year
    month
    city_code
    city_name
    avg_temperature
    lat
    long`

    * staging_demo_df
     
    `city_code
    state_code
    median_age
    pct_male_pop
    pct_female_pop
    pct_veterans
    pct_foreign_born
    pct_native_american
    pct_asian
    pct_black
    pct_hispanic_or_latino
    pct_white
    total_pop`
    
2. Dimension Tables

   * dim_demos        
     `city_code
      state_code
      city_name
      median_age
      pct_male_pop
      pct_female_pop
      pct_veterans
      pct_foreign_born
      pct_native_american
      pct_asian
      pct_black
      pct_hispanic_or_latino
      pct_white
      total_pop
      lat
      long`
        
   * dim_monthly_city_temp
        
     `city_code
      year
      month
      avg_temperature`

   * dim_time
         
     `date
     dayofweek
     weekofyear
     month`
         
   * dim_country
          
      `country_code
       country_name`
      
   * dim_states
    
      `state_code
       state_name`
       
   * dim_city
   
       `city_code
       city_name`
           
3. Fact Table
    * immigration_df
        
        `immigrant_id
        state_code
        city_code
        resident_country
        date
        age
        gender
        visa_type
        count`

#### 3.2 Mapping Out Data Pipelines

1. Clean the data on nulls, data types, duplicates, etc
2. Load staging tables for staging_i94_df, staging_temp_df and staging_demo_df
3. Create dimension tables
4. Create fact table ensuring referential integrity
6. Save processed dimension and fact tables in parquet for downstream query

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

#### Dimenssion tables

In [17]:
# Dimenssion table for immigrants

immigrant_df = staging_i94_df.select("immigrant_id", "gender", "age", "visa_type").drop_duplicates()

#Dimenssion table for city

city_df = staging_demo_df.join(staging_temp_df, "city_code") \
    .select("city_code", "state_code", "median_age", "pct_male_pop", "pct_female_pop", "pct_veterans",
           "pct_foreign_born", "pct_native_american", "pct_asian", "pct_black",
           "pct_hispanic_or_latino", "pct_white", "total_pop", "lat", "long").drop_duplicates()

# Dimenssion table for temparature

monthly_city_temp_df = staging_temp_df.select("city_code", "year", "month", "avg_temperature").drop_duplicates()

#Dimenssion table for time

time_df = staging_i94_df.withColumn("dayofweek", dayofweek("date"))\
                .withColumn("weekofyear", weekofyear("date"))\
                .withColumn("month", month("date"))
                        
time_df = time_df.select("date", "dayofweek", "weekofyear", "month").drop_duplicates()

In [18]:
#Dimenssion table for city

dim_demos = staging_demo_df.join(staging_temp_df, "city_code") \
    .select("city_code", "state_code", "median_age", "pct_male_pop", "pct_female_pop", "pct_veterans",
           "pct_foreign_born", "pct_native_american", "pct_asian", "pct_black",
           "pct_hispanic_or_latino", "pct_white", "total_pop", "lat", "long").drop_duplicates()

# Dimenssion table for temparature

dim_monthly_city_temp = staging_temp_df.select("city_code", "year", "month", "avg_temperature").drop_duplicates()

#Dimenssion table for time

time_df = staging_i94_df.withColumn("dayofweek", dayofweek("date"))\
                .withColumn("weekofyear", weekofyear("date"))\
                .withColumn("month", month("date"))
                        
dim_time = time_df.select("date", "dayofweek", "weekofyear", "month").drop_duplicates()

In [None]:
# Write to dim_demos
dim_demos.coalesce(1).write.mode("overwrite").partitionBy("state_code","city_code").parquet("sparkify/dim_demos")

# Write to dim_monthly_city_tempearture
dim_monthly_city_temp.coalesce(1).write.mode("overwrite").parquet("sparkify/dim_monthly_city_temperature")

# Write to dim_time
dim_time.coalesce(1).write.mode("overwrite").parquet("sparkify/dim_time")

# Write to dim_country 
dim_port_map.coalesce(1).write.mode("overwrite").parquet("sparkify/dim_city")

# Write to dim_states
dim_state_map.coalesce(1).write.mode("overwrite").parquet("sparkify/dim_state")

# Write to dim_visa
dim_visa_map.coalesce(1).write.mode("overwrite").parquet("sparkify/dim_visa")

# Write to dim_country
dim_country_map.coalesce(1).write.mode("overwrite").parquet("sparkify/dim_country")

#### Fact table

In [20]:
# Fact table for immigration
fact_immigration = staging_i94_df.select("immigrant_id", "gender", "age", "visa_type", "resident_country", "city_code","state_code","date","count") .drop_duplicates()

In [None]:
# Write fact table
fact_immigration.coalesce(1).write.mode("overwrite").partitionBy("resident_country","state_code","city_code").parquet("sparkify/fact_immigration")

#### 4.2 Data Quality Checks
Run Quality Checks

In [21]:
# Check if table exsists

def table_exists(df):
    if df is not None:
        return True
    else:
        return False
        
if table_exists(dim_demos) & table_exists(dim_monthly_city_temp) & table_exists(dim_time) & table_exists(fact_immigration) & table_exists(dim_port_map) & table_exists(dim_state_map) & table_exists(dim_country_map):
    print("data quality check passed")
    print("dimension tables and fact table exist")
    print()
else:
    print("data quality check failed")
    print("table missing...")

data quality check passed
dimension tables and fact table exist



In [None]:
# Check all tables have data

def table_not_empty(df):
    return df.count() != 0 

if table_not_empty(dim_demos) & table_not_empty(dim_monthly_city_temp) & table_not_empty(dim_time) & table_not_empty(fact_immigration)  & table_not_empty(dim_port_map) & table_not_empty(dim_state_map) & table_not_empty(dim_country_map):
    print("data quality check passed!")
    print("dimension tables and fact table contain records")
    print()
else:
    print("data quality check failed!")
    print("null records...")

#### 4.3 Data dictionary 

1. ER Diagram 

    <img src="img/erd.PNG">

1. Dimension Tables

    * dim_demos - This holds data about a city e.g - city_name, state, population details (median_age, percentage of male & female etc)
        
        `city_code: city port code --> primary key
        state_code: state code of the city --> partition key
        median_age: median age of the city
        pct_male_pop: city's male population in percentage
        pct_female_pop: city's female population in percentage
        pct_veterans: city's veteran population in percentage
        pct_foreign_born: city's foreign born population in percentage
        pct_native_american: city's native american population in percentage
        pct_asian: city's asian population in percentage
        pct_black: city's black population in percentage
        pct_hispanic_or_latino: city's hispanic or latino population in percentage
        pct_white: city's white population in percentage
        total_pop: city's total population
        lat: latitude of the city
        long: longitude of the city`

    * dim_monthly_temperature_by_city - This holds temparature data for a city.
        
        `city_code: city port code --> Foreign Key References dim_cities
        year: year
        month: month 
        avg_temperature: average temperature in city for given month`

    * dim_time - Holds details of a day.
        
        `date: date --> primary key
        dayofweek: day of the week
        weekofyear: week of year
        month: month`
        
    * dim_country - Holds country mapping
          
      `country_code : country code --> Primary Key
       country_name: Country Name`
      
   * dim_states - Holds state_code to state mapping
    
      `state_code : Stqte Code --> Primary Key
       state_name : State Name`
       
   * dim_city - Holds city_code to city_name mapping
   
       `city_code : City Code --> Primary Key
       city_name : City Name`
       
   * dim_visa - Holds Visa type to visa description mapping
       
       `visa_code: Visa Code --> Primary Key
       visa_desc: Visa Description`
        
2. Fact Table
    * fact_immigration - Fact table where the foreign keys are taken from different dimenssion tables and stores the count of immigrants entered in us.
        
     `immigrant_id: id --> Primary Key
      state_code: --> state code of arrival city Foreign Key dim_state
      city_code: city port code of arrival city --> Foreign Key REFERENCES dim_city
      resident_country: Country of residence --> Foreign Key References dim_city
      date: date of arrival --> Foreign Key REFERENCES dim_time
      age : Age
      gender: Gender
      visa_type: Visa Type
      count: count of immigrant's entries into the US`

#### Step 5: Complete Project Write Up

Spark is chosen here as the data processing framework as it has capability of processing large distributed data efficiently. Apart from that spark has a wide variety of read, write and data processing transformations available for different file formats and filesystems.

There are also considerations in terms of scaling existing solution.

1. If the data was increased by 100x: 

    `We can cosider having large scale emr set up or setting up spark on Kubernetes so that we can make use of vertical and horizontal scalings.`

    `Also, we might consider using Cassandra as a temprary storage which is higly available and provides pretty fast write speed. From there we can load our DW dbs as required.`

2. If the data populates a dashboard that must be updated on a daily basis by 7am every day: 

    `We can consider using Airflow or Kubeflow to schedule and automate the data pipeline jobs. We can set up prometheus and grafana to do automatic alerting based on specific conditions as well as retrying the failed jobs.`

3. If the database needed to be accessed by 100+ people: 

    `We can consider hosting our solution in production scale data warehouse in the cloud, with larger capacity to serve more users, and workload management to ensure equitable usage of resources across users.`
    
4. If there is a chance the data structure can change in future :
    ` We can use a data cataloging tool like AWS glue to manage and maintain the schema.`
    
5. If Raw & Unstructured Data access is needed:

    `Desigining a data lake with AWS Glue can be a good option along with Querying via AWS Athena / Apache Drill.`