# Project Title: Cities demographics, Immigration and airports
### Data Engineering Capstone Project  

#### Project Summary
Capstone, a startup has grown their data warehouse and want to move their data in a data lake. Their data resides in S3, in a directory of parquet on immigration events, as well as two directories with CSV files on US cities demographics and airport codes respectively .

We are tasked with building an ETL pipeline that extracts their data from S3, processes them using Spark, and loads the data back into S3 as a set of dimensional tables and fact table. This will allow their analytics team to continue finding insights such as the date, duration of immigration in terms of number of days, weeks, months, years for example, immigrants data such as the visa type, the mode of immigration and cities demographics and the airports in the respective cities as well.

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
%%spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1647154920989_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Do all imports and installs here
import pandas as pd
import configparser
from datetime import datetime, timedelta 
import os
import pyspark.sql
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 1: Scope of the Project and Data Gathering

#### Scope 

The project consist in building an ETL pipeline that extracts their data from S3, processes them using Spark, and loads the data back into S3 as a set of dimensional tables and fact table.
For this purpose, We are going to execute the following tasks:
* Build a data model base on star schema for analytics processes efficiency.
* Create a spark session.
* Four functions are built to run ETL respectively for three dimension tables :
    process_cities_data()
    process_airport_data()
    process_immigrant_data()
  and one fact table:
    process_immigration_data() 
* Access S3 using AWS credentials.
* Run a pipline containing the functions mentionned above and load the data back into S3 as a set of dimensional tables and fact table
* Perform data quality check.


#### Data description and Gathering

#### Datasets

#### I94 Immigration Data:
This data comes from the US National Tourism and Trade Office.In the past all foreign visitors to the U.S. arriving via air or sea were required to complete paper Customs and Border Protection Form I-94 Arrival/Departure Record or Form I-94W Nonimmigrant Visa Waiver Arrival/Departure Record and this dataset comes from this forms. Contains SAS format data.

#### U.S. City Demographic Data: 
This data comes from OpenSoft.This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. Contains CSV format data.

#### Airport Code Table: 
This is a simple table of airport codes and corresponding cities. Contains CSV format data.

### Tools

#### Python: 
* For data processing

#### Pandas: 
* For data exploratory, data analysis on small data set

#### AWS S3: data storage

#### PySpark: 
* For data processing on large data set

In [4]:
# Read in the immigration data 
df_spark_immigration=spark.read.parquet("s3://raoul-bucket/sas_data") 

# create temporary table
immigrant_table = df_spark_immigration.createOrReplaceTempView("immigrant_table")

# create temporary table
immigration_table = df_spark_immigration.createOrReplaceTempView("immigration_table")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# Customization of the cities data schema
df_spark_cities_schema=StructType([
                       StructField("city",StringType()),
                       StructField("state_name",StringType()),
                       StructField("median_age",DoubleType()),
                       StructField("male_population",IntegerType()),
                       StructField("female_population",IntegerType()),
                       StructField("total_population",IntegerType()),
                       StructField("number_veterans",IntegerType()),
                       StructField("foreign_born",IntegerType()),
                       StructField("avg_household_size",DoubleType()),
                       StructField("state_code",StringType()),
                       StructField("race",StringType()),
                       StructField("race_count",IntegerType())])

# Read in the cities data 
df_spark_cities=spark.read.csv("s3://raoul-bucket/us-cities-demographics.csv", schema=df_spark_cities_schema, sep=";", mode="DROPMALFORMED")

# create temporary table
cities_table=df_spark_cities.createOrReplaceTempView("cities_table")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Read in the airport data
df_spark_airport = spark.read.csv("s3://raoul-bucket/airport-codes_csv.csv", sep=",", header=True)

# put coordinates column in first normal form
df_spark_airport = df_spark_airport.withColumn("latitude", split(df_spark_airport["coordinates"], ", ").getItem(0).cast("double"))\
                                           .withColumn("longitude", split(df_spark_airport["coordinates"], ", ").getItem(1).cast("double"))\
                                           .withColumn("iso_region", substring(df_spark_airport["iso_region"], 4,2))\
                                           .drop("coordinates")  
# create temporary table
airport_table = df_spark_airport.createOrReplaceTempView("airport_table")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
 output_bucket = "s3a://raoul-bucket/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 2: Explore and Assess the Data
#### Data exploration 
Identification of data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
 Cleaning tasks for cities data. The steps involved are the following:
##### 1- Select valuable columns
##### 2- Drop duplicate entries
##### 3- Drop lines containing Null value
##### 4- Put the table at the first normal form

In [8]:
# Cleaning tasks for cities data. The steps involved are the following:
# 1- Select valuable columns
# 2- Drop duplicate entries
# 3- Drop lines containing Null value
# 4- Put the table at the first normal form

def process_cities_data():
    """
       The function:
       Extracts data from us-cities-demographic.csv 
       Transforms the data and create cities_table 
       Loads cities_table back to S3 in parquet format
    """
    
       # Create cities_table 
    cities_table=spark.sql("""SELECT md5(city||state_name) city_id,
                                        city,
                                        state_name,
                                        state_code,
                                        median_age,
                                        male_population,
                                        female_population,
                                        total_population                               
                                 FROM  cities_table 
                                 ORDER BY city 
                                      """).dropDuplicates().dropna()
    
    # load cities_table to s3 in parquet format
    cities_table.write.partitionBy("state_code","city").mode("append").parquet(output_bucket+'us_cities_demographics.parquet')
    
    cities_table.show(n=3)
    return cities_table.count()
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# Cleaning tasks for airport data and creation of airport dimension table
# 1- Select valuable columns
# 2- Drop duplicate entries
# 3- Drop lines containing Null value
# 4- Put the table at the first normal form

def process_airport_data():
        """
            The function:
            Extracts data from airport-codes.csv 
            Transforms the data and create airport_table 
            Loads airport_table back to S3 in parquet format.
        """
        # Create airport_table
        
        airport_table = spark.sql("""SELECT ident airport_id,
                                            type airport_type,
                                            name airport_name,
                                            int(elevation_ft) elevation_ft,
                                            iso_region state_code,
                                            municipality city,
                                            latitude,
                                            longitude
                                      
                                     FROM   airport_table
                                     WHERE type != "closed"
                                     ORDER BY airport_id
                                     
                                  """).dropDuplicates().dropna()
        
        # load the table in s3 under parquet format
        airport_table.write.partitionBy("state_code","city").mode("append").parquet(output_bucket+'airports_data.parquet')
        
        airport_table.show(n=3)
        return airport_table.count()
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# Cleaning tasks for immigration data and creation of immigrant dimension table
# 1- Select valuable columns
# 2- Drop duplicate entries
# 3- Drop lines containing Null value
# 4- Put the table at the first normal form

def process_immigrant_data():
        """
            The function:
            Extracts data from SAS_data 
            Transforms the data and create immigrant_table 
            Loads immigrant_table back to S3 in parquet format.
        """
      # Create immigrant_table    
        immigrant_table=spark.sql(""" SELECT int(cicid) imm_id,
                                           int(i94bir) age,
                                           biryear birth_year,
                                           gender,
                                           visatype visa_type,
                                           i94addr state_code
                                    FROM immigrant_table
                                 """).dropDuplicates().dropna()
        
        # load the table in s3 under parquet format
        immigrant_table.write.mode("append").parquet(output_bucket+'immigrants_data.parquet')
        
        immigrant_table.show(n=3)
        return immigrant_table.count()
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# This function converts SAS date format to datetime
def sas_to_datetime(date):
    if date is not None:
        return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')
    
sas_to_datetime_udf = udf(sas_to_datetime, DateType())


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# Performing cleaning tasks for immigration data and creation of immigration Fact table
# 1- Select valuable columns
# 2- Drop duplicate entries
# 3- Drop lines containing Null value
# 4- Put the table at the first normal form

def process_immigration_data():
        """
            The function:
            Extracts data from SAS_data 
            Transforms the data and create immigration_table 
            Loads immigration_table back to S3 in parquet format.
        """
                   
       # Create immigration_table 
        immigration_table = spark.sql("""SELECT int(immigration_table.cicid) imm_id,
                                               airport_table.ident airport_id,                                     
                                               md5(city||state_name) city_id,                                     
                                               int(immigration_table.arrdate) arrival_date,
                                               int(immigration_table.depdate) departure_date,
                                               CASE WHEN int(immigration_table.i94mode)=1 THEN "air"
                                                    WHEN int(immigration_table.i94mode)=2 THEN "sea"
                                                    WHEN int(immigration_table.i94mode)=3 THEN "Land" 
                                                    ELSE "Not reported" 
                                                    END as mode,
                                               immigration_table.i94addr state_code,
                                               int(immigration_table.i94yr) immigration_year,
                                               int(immigration_table.i94mon) immigration_month,
                                               airport_table.latitude,
                                               airport_table.longitude                                               
                                                                           
                                          FROM  immigration_table
                                          JOIN  immigrant_table
                                          ON    immigration_table.cicid=immigrant_table.cicid
                                          JOIN  airport_table
                                          ON    immigrant_table.i94addr=airport_table.iso_region
                                          JOIN  cities_table
                                          ON    airport_table.municipality=cities_table.City AND airport_table.iso_region=cities_table.state_code                           
                               
                                        """).dropDuplicates().dropna()
    
       # Convert SAS arrival_date and departure_date columns to Datetime format 
        immigration_table=immigration_table.withColumn("arrival_date", sas_to_datetime_udf("arrival_date"))
        immigration_table=immigration_table.withColumn("departure_date", sas_to_datetime_udf("departure_date"))
        
       # Create the stay_duration_in_days column 
        immigration_table=immigration_table.withColumn("stay_duration_in_days", datediff(col("departure_date"),col("arrival_date")))
        immigration_table=immigration_table.withColumn("stay_duration_in_days", round(immigration_table["stay_duration_in_days"], scale=0))
    
       # Create the stay_duration_in_weeks column
        immigration_table=immigration_table.withColumn("stay_duration_in_weeks", datediff(col("departure_date"),col("arrival_date"))*52/365)
        immigration_table=immigration_table.withColumn("stay_duration_in_weeks", round(immigration_table["stay_duration_in_weeks"], scale=1))
        
       # Create the stay_duration_in_months column
        immigration_table=immigration_table.withColumn("stay_duration_in_months", months_between(col("departure_date"),col("arrival_date")))
        immigration_table=immigration_table.withColumn("stay_duration_in_months", round(immigration_table["stay_duration_in_months"], scale=1))
        
       # Create the stay_duration_in_years column 
        immigration_table=immigration_table.withColumn("stay_duration_in_years", datediff(col("departure_date"),col("arrival_date"))/365)
        immigration_table=immigration_table.withColumn("stay_duration_in_years", round(immigration_table["stay_duration_in_years"], scale=1))
        
       # load the table in s3 under parquet format
        immigration_table.write.partitionBy("immigration_year","immigration_month").mode("append").parquet(output_bucket+'immigration_data.parquet')
        
        immigration_table.show(n=3)
        return immigration_table.count()

      

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

#### Dimension tables:
##### cities_table
           city_id    PK
           city
           state_name
           state_code
           median_age
           male_population
           female_population
           total_population
           
##### airport_table
           airport_id     PK
           airport_type
           airport_name
           elevation_ft
           state_code
           city
           latitude
           longitude 
           
##### immigrant_table
           imm_id         PK
           age
           birth_year
           gender
           visa_type
           state_code
           
#### Fact table:
##### immigration_table
           imm_id          PK
           airport_id      FK
           city_id         FK
           arrival_date
           departure_date
           mode
           state_code
           immigration_year
           immigration_month
           stay_duration_in_days
           stay_duration_in_weeks
           stay_duration_in_months
           stay_duration_in_years

                         
#### 3.2 Mapping Out Data Pipelines

The data is modelized by the following functions which take two parameters each:
  The staging dataframe and the output.
* process_cities_data(df_spark_cities, output_data)
* process_airport_data(df_spark_airport, output_data)
* process_immigrant_data(df_spark_immigration, output_data)
* process_immigration_data(df_spark_immigration, output_data)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [13]:
# This function triggers the ETL process on the Three datasets, create the star schema data model and load the Fact and dimension tables in to S3  
def pipeline():
    
    count_cities_table=process_cities_data()
    print("cities data ETL done.")
    
    count_airport_table=process_airport_data()
    print("Airports data ETL done.")
    
    count_immigrant_table=process_immigrant_data()
    print("immigrants data ETL done.")
    
    count_immigration_table=process_immigration_data()
    print("immigration data ETL done.")
    
pipeline() 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------+----------+----------+----------+---------------+-----------------+----------------+
|             city_id|       city|state_name|state_code|median_age|male_population|female_population|total_population|
+--------------------+-----------+----------+----------+----------+---------------+-----------------+----------------+
|2d0c364938ff09a61...|Bloomington|  Illinois|        IL|      35.1|          37972|            40323|           78295|
|1f2154b1c8397d311...|    Kenosha| Wisconsin|        WI|      36.3|          49349|            50507|           99856|
|501c7d03a03961d9a...|Cheektowaga|  New York|        NY|      40.7|          37476|            38599|           76075|
+--------------------+-----------+----------+----------+----------+---------------+-----------------+----------------+
only showing top 3 rows

cities data ETL done.
+----------+-------------+--------------------+------------+----------+-----------+------------------+------------------

#### 4.2 Data Quality Checks

In [14]:
def check_count_dataset_to_spark(df_spark, df_spark_name):
    
    """ This function checks the number of rows loaded from dataset files/folder to spark dataframe"""
    
    numrows = df_spark.count()
    if numrows > 0:
        print(f"{numrows} rows loaded to {df_spark_name}")
        return True
    else:
        return False

check_count_dataset_to_spark(df_spark_cities, 'df_spark_cities')
check_count_dataset_to_spark(df_spark_airport, 'df_spark_airport')
check_count_dataset_to_spark(df_spark_immigration, 'df_spark_immigration')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2892 rows loaded to df_spark_cities
55075 rows loaded to df_spark_airport
3096313 rows loaded to df_spark_immigration
True

In [14]:
def check_count_table_to_s3(df_spark, table_name):
    
    """ This function checks the number of rows loaded from tables to table         
    """
    num_rows = df_spark.count()
    if num_rows > 0:
        print(f"{num_rows} rows loaded from to {table_name}")
        return True
    else:
        return False

check_count_table_to_s3(df_spark_cities, 'cities_table')
check_count_table_to_s3(df_spark_airport, 'airport_table')
check_count_table_to_s3(df_spark_immigration, 'immigrant_table')
check_count_table_to_s3(df_spark_immigration, 'immigration_table')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2892 rows loaded from to cities_table
55075 rows loaded from to airport_table
3096313 rows loaded from to immigrant_table
3096313 rows loaded from to immigration_table
True

#### 4.3 Data dictionary 

#### Dimension tables:
##### cities_table
        city_id :    Primary key of city table obtained by md5(city||state_name)     
        city :       The name of the city
        state_name:  the full name of the state
        state_code:  The state code which is made of the two first letters of the state name 
        median_age:  The median age in the city 
        male_population:   total count of male population 
        female_population: total count of female population
        total_population:  total population of the city
           
##### airport_table
        airport_id:   Primary key of the airport which is also the local code of the airport
        airport_type: contains the size type of the airport
        airport_name: contains the name of the airport
        elevation_ft: contains the elevation foot of the airport
        state_code:   contains the state code where the airport is located
        city :        The name of the city where the airport is located
        latitude:     extracted from coordinates column, contains the latitude of the coordinates
        longitude:    extracted from coordinates column, contains the latitude of the coordinates                                                      
           
##### immigrant_table
        imm_id:     Primary key of the immigrant_table, contains the immigrant identifier
        age:        contains the age of the immigrant
        birth_year: contains the birth year of the immigrant
        gender:     contains the gender of the immigrant
        visa_type:  contains the visa type of the immigrant
        state_code: contains the state code where the immigrant is located. which is made of the two first letters of the state name       
           
#### Fact table:
##### immigration_table
        imm_id:        Primary key of the immigration_table, and also the identifier of immigrant_table contains the immigration identifier  
        airport_id:    Foreign Key, is the identifier of the airport_table which is aloso th local code of the airport
        city_id:       Foreign Key, is the identifier of cities_table obtained by md5(city||state_name)
        arrival_date:   Contains the arrival date converted from SAS format to datetime format
        departure_date: Contains the departure date converted from SAS format to datetime format
        mode:           Contains the mode of arrival. Either sea, air, Land or Not reported
        state_code:     contains the state code where the immigration is done. which is made of the two first letters of the state name
        immigration_year:    contains the immigration year
        immigration_month:    contains the immigration month
        stay_duration_in_days:  contains the immigration duration in terms of number days
        stay_duration_in_weeks:  contains the immigration duration in terms of number weeks
        stay_duration_in_months:  contains the immigration duration in terms of number months
        stay_duration_in_years:  contains the immigration duration in terms of number years

#### Step 5: Complete Project Write Up
#### Rational for the choice of tools and technologies for the project.
#### PySpark running on AWS Elastic Map Reduce (EMR) cluster: 

* To modelize structured and un/semi-structured datasets, to Extract Transform and Load data in a set of tables with customizable schemas. Suitable for large data processing. Has an API with Pandas and cutting-edge query features such as SQL.

#### AWS S3:

* Object storage service offering industry-leading scalability, data availability, security, and performance.

#### Pandas: 

* For sample dataset exploratory, data analysis is a fast, powerful, flexible and easy to use open source data analysis and manipulation tool, built on top of the Python         programming language.


#### Data update and why.
* Tables created from immigration data set should be updated monthly since the raw data set is built up monthly.
* Tables created from demography data set could be updated annually since demography data collection takes time and high frequent demography might take high cost but generate wrong conclusion.
* Tables created from airport data could be updated every five years since airport data are not much dynamic
* All tables should be update in an append-only mode.


#### Approach of the problem differently under the following scenarios:

##### The data was increased by 100x.

* If Spark with standalone server mode can not process 100x data set, the appropriate solution is to rationaly scale the capacity in terms of number of nodes, memory and network of the EMR cluster, which is a distributed data cluster for processing large data sets on cloud.
 
##### The data populates a dashboard that must be updated on a daily basis by 7am every day.

* Apache Airflow could be used for building up a ETL data pipeline to be triggered regular interval and update the date and populate a report. Airflow provides many plug-and-play operators that are ready to execute desired tasks and deliever more powerful task automation.
   
##### The database needed to be accessed by 100+ people.

* AWS Redshift database can handle hundreds of connections.Massively parallel processing,Columnar data storage, Query optimizer, Result caching, are the characteristics of redshift that make it a powerful technology to handle many connections. Moving the database to Redshift will be a rational choice to envisage.

 
#### Evidence that the ETL processes results in the data model 
Below is a SparkSQL query that includes multiple JOIN statements on all of the four tables of our data model.

In [15]:
evidence_table = spark.sql("""SELECT int(immigration_table.cicid) imm_id,
                                               airport_table.ident airport_id,                                     
                                               md5(city||state_name) city_id,                                     
                                               int(immigration_table.arrdate) arrival_date,
                                               int(immigration_table.depdate) departure_date,
                                               CASE WHEN int(immigration_table.i94mode)=1 THEN "air"
                                                    WHEN int(immigration_table.i94mode)=2 THEN "sea"
                                                    WHEN int(immigration_table.i94mode)=3 THEN "Land" 
                                                    ELSE "Not reported" 
                                                    END as mode,
                                               immigration_table.i94addr state_code,
                                               int(immigration_table.i94yr) immigration_year,
                                               int(immigration_table.i94mon) immigration_month,
                                               airport_table.latitude,
                                               airport_table.longitude,
                                               immigrant_table.visatype visa
                                               
                                                                           
                                          FROM  immigration_table
                                          JOIN  immigrant_table
                                          ON    immigration_table.cicid=immigrant_table.cicid
                                          JOIN  airport_table
                                          ON    immigrant_table.i94addr=airport_table.iso_region
                                          JOIN  cities_table
                                          ON    airport_table.municipality=cities_table.City AND airport_table.iso_region=cities_table.state_code                           
                               
                                        """).dropDuplicates().dropna()   
        
evidence_table.show(n=3)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+--------------------+------------+--------------+----+----------+----------------+-----------------+------------------+------------------+----+
|imm_id|airport_id|             city_id|arrival_date|departure_date|mode|state_code|immigration_year|immigration_month|          latitude|         longitude|visa|
+------+----------+--------------------+------------+--------------+----+----------+----------------+-----------------+------------------+------------------+----+
|646322|      00FD|239a1b21f5bf8dab2...|       20548|         20571| sea|        FL|            2016|                4|-82.34539794921875|28.846599578857422|  B1|
|522855|      00FD|239a1b21f5bf8dab2...|       20547|         20553| air|        FL|            2016|                4|-82.34539794921875|28.846599578857422|  B2|
|620141|      00FD|239a1b21f5bf8dab2...|       20547|         20554| air|        FL|            2016|                4|-82.34539794921875|28.846599578857422|  B2|
+------+----------+---