# Project Title
### Data Engineering Capstone Project

#### Project Summary

This project looks into using the immigration data to form patterns that can assist airlines in planning for flight capacity. The following fields will be fields of interest in this study:
1. Origination Port
2. Landing Port in US
3. Nationality of visitor
4. Class of admission
5. Race percentage in Landing Port

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from decimal import Decimal
import datetime
import os

pd.set_option("display.max_colwidth", 200)

In [2]:
spark = SparkSession.builder \
    .master("local") \
    .appName("dend_capstone") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .config("spark.sql.repl.eagerEval.truncate", 50) \
    .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.1.0-s_2.11") \
    .getOrCreate()

spark

### Step 1: Scope the Project and Gather Data

#### Scope 

This project analyzes the immigration data and air travel patterns in consideration with the following parameters:
1. Origination Port
2. Landing Port in US
3. Nationality of visitor
4. Class of admission
5. Race percentage in Landing Port

#### Describe and Gather Data 

Datasets
1. I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.
2. World Temperature Data: This dataset came from Kaggle. You can read more about it (GlobalLandTemperaturesByCity.csv) [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).
3. U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
4. Non Immigrant Class of Admission - Description of class of admission of Non Immigrants. Data obtained from [here](https://www.dhs.gov/immigration-statistics/nonimmigrant/NonimmigrantCOA). This page is manually scrapped since the excel on Data.gov is not available.
5. Port Code - Extracted from I94_SAS_Labels_Descriptions.SAS (ports.csv)
6. Country Code - Extracted from I94_SAS_Labels_Descriptions.SAS (country_code.csv)
7. Airlines Code - Airline code and information obtained from [here](https://openflights.org/data.html#airline)

In [3]:
df_schema = spark.read.csv("I94_SAS_Labels_Description.csv", header=True).toPandas()
df_schema

Unnamed: 0,field,description
0,i94yr,4 digit year
1,i94mon,Numeric month
2,i94cit,3 character code representing city of origination
3,i94res,3 character code likely to be representing city of residence
4,i94port,3 character code representing port of arrival
5,arrdate,Date of arrival in USA as a SAS numeric field
6,i94mode,1 character code representing the method of transport to USA
7,i94addr,2 character string representing state of stay in USA
8,depdate,Date of departure from USA as a SAS numeric field
9,i94bir,Age of Respondent in Years


#### Bad Data File

During data exploration `i94_jun16_sub.sas7bdat` was found to have additional columns defined in the file.

In [4]:
df_i94bad = spark.read \
    .format("com.github.saurfang.sas.spark") \
    .load("raw_sas_data/i94_jun16_sub.sas7bdat")
df_i94bad.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- validres: double (nullable = true)
 |-- delete_days: double (nullable = true)
 |-- delete_mexl: double (nullable = true)
 |-- delete_dup: double (nullable = true)
 |-- delete_visa: double (nullable = true)
 |-- delete_recdup: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- mat

In [5]:
df_i94bad.limit(5)

cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,validres,delete_days,delete_mexl,delete_dup,delete_visa,delete_recdup,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
4.0,2016.0,6.0,135.0,135.0,XXX,20612.0,,,,59.0,2.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,Z,,U,,1957.0,10032016,,,,14938462027.0,,WT
5.0,2016.0,6.0,135.0,135.0,XXX,20612.0,,,,50.0,2.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,Z,,U,,1966.0,10032016,,,,17460063727.0,,WT
6.0,2016.0,6.0,213.0,213.0,XXX,20609.0,,,,27.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1989.0,D/S,,,,1679297785.0,,F1
7.0,2016.0,6.0,213.0,213.0,XXX,20611.0,,,,23.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1993.0,D/S,,,,1140963185.0,,F1
16.0,2016.0,6.0,245.0,245.0,XXX,20632.0,,,,24.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1992.0,D/S,,,,1934535285.0,,F1


#### Convert to parquet files for easier manipulation

Original raw data has numeric types inferred incorrectly as doubles and bigger numbers like admnum gets converted into Scientific Notation. Supplying a schema does not appear to work as dtadfile has values that cannot be mapped correctly during the load. This part reads the raw data performs the correct casting and saves them as a parquet file. It also drops the columns that do not belong to the official schema in the files.

In [6]:
def clean_parquet_file(input_path, output_path, logger=(lambda message: None)):
    schema = {
        "cicid": T.IntegerType(),
        "i94yr": T.IntegerType(),
        "i94mon": T.IntegerType(),  
        "i94cit": T.IntegerType(),  
        "i94res": T.IntegerType(),  
        "i94port": T.StringType(), 
        "arrdate": T.IntegerType(),
        "i94mode": T.IntegerType(),
        "i94addr": T.StringType(),
        "depdate": T.IntegerType(),
        "i94bir": T.IntegerType(),
        "i94visa": T.IntegerType(),
        "count": T.IntegerType(), 
        "dtadfile": T.StringType(),
        "visapost": T.StringType(),
        "occup": T.StringType(),
        "entdepa": T.StringType(),
        "entdepd": T.StringType(),
        "entdepu": T.StringType(),
        "matflag": T.StringType(),
        "biryear": T.IntegerType(),
        "dtaddto": T.StringType(), 
        "gender": T.StringType(),
        "insnum": T.StringType(),
        "airline": T.StringType(),
        "admnum": T.LongType(),
        "fltno": T.StringType(),
        "visatype": T.StringType(),
    }

    df_target = spark.read \
        .format("com.github.saurfang.sas.spark") \
        .load(input_path)
    df_target.select([
        F.col(key).cast(value).alias(key) for (key,value) in schema.items()
    ]).write.mode('overwrite').parquet(output_path)    

In [7]:
for file in os.listdir("raw_sas_data/"):
    if not file.endswith("sas7bdat"):
        continue
        
    clean_parquet_file(f"raw_sas_data/{file}", f"sas_data/{file}")

#### Utility Functions and UDFs

In [8]:
@F.udf(returnType=T.DateType())
def sas_date_converter(sas_date):
    """
    UDF that performs a SAS date numeric field to a Python date.
    
    :param sas_date: SAS date numeric field 
    """    
    if sas_date == None:
        return None
    return (datetime.datetime(1960, 1, 1) + datetime.timedelta(sas_date))



### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data. 

##### Immigration Data of Interest

###### Basic data filtering

Filtering to keep only air travel and removing unused columns

In [9]:
def immigration_etl(input_path="sas_data/*", output_path="staging_data/immigration_etl_raw", logger=(lambda message: None)):
    """
    Function that performs extraction and transformation of the immigration data returning a raw dataframe
    
    :param input_path: path to immigration data in parquet format to extract and transform
    :param logger: callback for logging
    """       
    
    logger("\n### Starting immigration extraction and transformation ###")
    df = spark.read.parquet(input_path)
    
    logger(f"\n*** Start removing non air travel: {df.count()} records***")
    df = df.select(
        F.col("admnum").alias("admission_number"), 
        F.col("i94cit").alias("immigrant_city"),
        F.col("i94res").alias("immigrant_residence"),
        F.col("i94port").alias("arrival_port"),
        F.col("arrdate"),
        F.col("i94addr").alias("us_residence"),
        F.col("depdate"),
        F.col("i94bir").alias("age"),
        F.col("biryear").alias("year_of_birth"),
        F.col("airline"),
        F.col("gender")
    ).where(df.i94mode == 1)
    logger(f"*** End removing non air travel: {df.count()} records***")
        
    logger(f"\n*** Start dropping duplicates: {df.count()} records***")
    df = df.dropDuplicates()
    logger(f"*** Start dropping duplicates: {df.count()} records***")
                   
    logger(f"\n*** Start null immigrant_city cleanup: {df.count()} records***")
    df = df.where(F.isnull("immigrant_city") == False)
    logger(f"*** End null immigrant_city cleanup: {df.count()} records***")  
    logger(f"\n*** Start adding arrival_date_source ***")
    df = df.withColumn(
        "arrival_date_source", sas_date_converter(F.col("arrdate"))
    )
    logger(f"*** End adding arrival_date_source ***")

    logger(f"\n*** Start adding departure_date_source ***")
    df = df.withColumn(
        "departure_date_source", sas_date_converter(F.col("depdate"))
    )
    logger(f"*** End adding departure_date_source ***")

    logger(f"\n*** Start writing {df.count()} records to {output_path} ***")
    df.write.mode('overwrite').parquet(output_path)   
    logger(f"*** End writing {df.count()} records to {output_path} ***") 
    
    logger("\n### End immigration extraction and transformation ###")

    return df

In [11]:
df_columns_of_interest = immigration_etl("sas_data/i94_apr16_sub.sas7bdat", "staging_data/immigration_etl_raw", print)

print("\nChecking for nulls")
display(df_columns_of_interest.select([
    F.count(F.when(F.isnull(c), c)).alias(c) for c in df_columns_of_interest.columns
]).toPandas())


### Starting immigration extraction and transformation ###

*** Start removing non air travel: 3096313 records***
*** End removing non air travel: 2994505 records***

*** Start dropping duplicates: 2994505 records***
*** Start dropping duplicates: 2994495 records***

*** Start null immigrant_city cleanup: 2994495 records***
*** End null immigrant_city cleanup: 2994495 records***

*** Start adding arrival_date_source ***
*** End adding arrival_date_source ***

*** Start adding departure_date_source ***
*** End adding departure_date_source ***

*** Start writing 2994495 records to staging_data/immigration_etl_raw ***
*** End writing 2994495 records to staging_data/immigration_etl_raw ***

### End immigration extraction and transformation ###

Checking for nulls


Unnamed: 0,admission_number,immigrant_city,immigrant_residence,arrival_port,arrdate,us_residence,depdate,age,year_of_birth,airline,gender,arrival_date_source,departure_date_source
0,0,0,0,0,0,117410,123043,594,594,390,411902,0,123043


###### Schema

In [12]:
df_columns_of_interest.printSchema()

root
 |-- admission_number: long (nullable = true)
 |-- immigrant_city: integer (nullable = true)
 |-- immigrant_residence: integer (nullable = true)
 |-- arrival_port: string (nullable = true)
 |-- arrdate: integer (nullable = true)
 |-- us_residence: string (nullable = true)
 |-- depdate: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- year_of_birth: integer (nullable = true)
 |-- airline: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- arrival_date_source: date (nullable = true)
 |-- departure_date_source: date (nullable = true)



##### Airline Code

###### Load and basic data filtering

Airline code and information obtained from [here](https://openflights.org/data.html#airline)
- Airline ID: Unique OpenFlights identifier for this airline.
- Name: Name of the airline.
- Alias: Alias of the airline. For example, All Nippon Airways is commonly known as "ANA".
- IATA: 2-letter IATA code, if available.
- ICAO: 3-letter ICAO code, if available.
- Callsign: Airline callsign.
- Country: Country or territory where airport is located. See Countries to cross-reference to ISO 3166-1 codes.
- Active: "Y" if the airline is or has until recently been operational, "N" if it is defunct. This field is not reliable: in particular, major airlines that stopped flying long ago, but have not had their IATA code reassigned (eg. Ansett/AN), will incorrectly show as "Y".

In [13]:
def airline_etl(input_path="airline.csv", output_path="staging_data/airline_etl_raw", logger=(lambda message: None)):
    """
    Function that performs extraction and transformation of the airline data returning a raw dataframe
    
    :param input_path: path containing the airline data in csv format to extract and transform
    :param logger: callback for logging
    """       
    
    logger("\n### Starting airline extraction and transformation ###")
    df = spark.read.csv(
        input_path, header=True
    ).select(
        F.col("IATA").alias("iata"),        
        F.col("Name").alias("name"), 
        F.col("Alias").alias("alias"),
        F.col("Callsign").alias("callsign"),
        F.col("Country").alias("country")        
    )
    
    logger(f"\n*** Start removing invalid IATA: {df.count()} records***")
    df=df.where((F.isnull("iata") == False) & (F.length("iata") == 2))
    logger(f"*** End removing invalid IATA: {df.count()} records***")
            
    logger(f"\n*** Start dropping duplicates: {df.count()} records***")
    df = df.dropDuplicates(["iata"])
    logger(f"*** Start dropping duplicates: {df.count()} records***")
    
    logger(f"\n*** Start writing {df.count()} records to {output_path} ***")
    df.select(
        F.col("Name").alias("name"), 
        F.col("Alias").alias("alias"),
        F.col("IATA").alias("iata"),
        F.col("Callsign").alias("callsign"),
        F.col("Country").alias("country")        
    ).write.mode('overwrite').parquet(output_path)   
    logger(f"*** End writing {df.count()} records to {output_path} ***")    
    
    logger("\n### End airline extraction and transformation ###")
    
    return df

In [14]:
df_airline=airline_etl("airline.csv", "staging_data/airline_etl_raw", print)
print("\nChecking for nulls")
display(df_airline.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_airline.columns]).toPandas())
df_airline.limit(5)


### Starting airline extraction and transformation ###

*** Start removing invalid IATA: 6162 records***
*** End removing invalid IATA: 1534 records***

*** Start dropping duplicates: 1534 records***
*** Start dropping duplicates: 1120 records***

*** Start writing 1120 records to staging_data/airline_etl_raw ***
*** End writing 1120 records to staging_data/airline_etl_raw ***

### End airline extraction and transformation ###

Checking for nulls


Unnamed: 0,iata,name,alias,callsign,country
0,0,0,1037,228,6


iata,name,alias,callsign,country
07,Samurai Airlines,Samurai Airlines (DUMMY),Sam,Pakistan
1L,Open Skies Consultative Commission,,OPEN SKIES,United States
3P,Tiara Air,,TIARA,Aruba
DZ,Starline.kz,,ALUNK,Kazakhstan
LT,LTU International,,LTU,Germany


###### Schema

In [15]:
df_airline.printSchema()

root
 |-- iata: string (nullable = true)
 |-- name: string (nullable = true)
 |-- alias: string (nullable = true)
 |-- callsign: string (nullable = true)
 |-- country: string (nullable = true)



##### Country Code

###### Load and basic data filtering

This is a datasource transform manually using data in I94_SAS_Labels_Descriptions.SAS since there is no reference datasource available. And codifying the transform does not produce enough benefit to justify the work involved.

In [16]:
def country_etl(input_path="country_code.csv", output_path="staging_data/country_etl_raw", logger=(lambda message: None)):
    """
    Function that performs extraction and transformation of the country data returning a raw dataframe
    
    :param input_path: path containing the airline data in csv format to extract and transform
    :param logger: callback for logging
    """       
    schema = T.StructType([
        T.StructField("code", T.IntegerType(), False),
        T.StructField("country", T.StringType(), False)
    ])
    
    logger("\n### Starting country extraction and transformation ###")
    df = spark.read.csv(input_path, header=True, schema=schema)
            
    logger(f"\n*** Start dropping duplicates: {df.count()} records***")
    df = df.dropDuplicates(["code"])
    logger(f"*** End dropping duplicates: {df.count()} records***")

    logger(f"\n*** Start writing {df.count()} records to {output_path} ***")
    df.write.mode('overwrite').parquet(output_path)   
    logger(f"*** End writing {df.count()} records to {output_path} ***")     
    
    logger("\n### End country extraction and transformation ###")
    
    return df

In [17]:
df_country_code = country_etl("country_code.csv", "staging_data/country_etl_raw", print)
print("\nChecking for nulls")
display(df_country_code.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_country_code.columns]).toPandas())
df_country_code.limit(5)


### Starting country extraction and transformation ###

*** Start dropping duplicates: 289 records***
*** End dropping duplicates: 289 records***

*** Start writing 289 records to staging_data/country_etl_raw ***
*** End writing 289 records to staging_data/country_etl_raw ***

### End country extraction and transformation ###

Checking for nulls


Unnamed: 0,code,country
0,0,0


code,country
471,"INVALID: MARIANA ISLANDS, NORTHERN"
243,BURMA
392,MALI
737,INVALID: MIDWAY ISLANDS
516,TRINIDAD AND TOBAGO


###### Schema

In [18]:
df_country_code.printSchema()

root
 |-- code: integer (nullable = true)
 |-- country: string (nullable = true)



##### Ports

###### Load and basic data filtering

This is a datasource transform manually using data in I94_SAS_Labels_Descriptions.SAS since there is no reference datasource available. And codifying the transform does not produce enough benefit to justify the work involved.

In [19]:
def port_etl(input_path="ports.csv", output_path="staging_data/port_etl_raw", logger=(lambda message: None)):
    """
    Function that performs extraction and transformation of the ports data returning a raw dataframe
    
    :param input_path: path containing the ports data in csv format to extract and transform
    :param logger: callback for logging
    """       
    
    logger("\n### Starting ports extraction and transformation ###")
    df = spark.read.csv(input_path, header=True)
            
    logger(f"\n*** Start dropping duplicates: {df.count()} records***")
    df = df.dropDuplicates(["code"])
    logger(f"*** End dropping duplicates: {df.count()} records***")

    logger(f"\n*** Start writing {df.count()} records to {output_path} ***")
    df.write.mode('overwrite').parquet(output_path)   
    logger(f"*** End writing {df.count()} records to {output_path} ***")     
    
    logger("\n### End ports extraction and transformation ###")
    
    return df

In [20]:
df_ports = port_etl("ports.csv", "staging_data/port_etl_raw", print)
print("\nChecking for nulls")
display(df_country_code.select(
    [F.count(F.when(F.isnull(c), c)).alias(c) for c in df_country_code.columns]
).toPandas())
df_ports.limit(5)


### Starting ports extraction and transformation ###

*** Start dropping duplicates: 660 records***
*** End dropping duplicates: 660 records***

*** Start writing 660 records to staging_data/port_etl_raw ***
*** End writing 660 records to staging_data/port_etl_raw ***

### End ports extraction and transformation ###

Checking for nulls


Unnamed: 0,code,country
0,0,0


code,city,state,country
BGM,BANGOR,ME,US
FMY,FORT MYERS,FL,US
LEB,LEBANON,NH,US
DNS,DUNSEITH,ND,US
EGL,EAGLE,AK,US


###### Schema

In [21]:
df_ports.printSchema()

root
 |-- code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)



##### Non immigrant class of admission

###### Load and basic data filtering

Description of class of admission of Non Immigrants. Data obtained from [here](https://www.dhs.gov/immigration-statistics/nonimmigrant/NonimmigrantCOA). This page is manually scrapped since the excel on Data.gov is not available. Codifying the extract does not produce enough benefit to justify the work involved.

In [22]:
def class_of_admission_etl(input_path="non_immigrant_class_of_admission.csv", output_path="staging_data/class_of_admission_etl_raw", logger=(lambda message: None)):
    """
    Function that performs extraction and transformation of the class of admission data returning a raw dataframe
    
    :param input_path: path containing the class of admission data in csv format to extract and transform
    :param logger: callback for logging
    """       
    
    logger("\n### Starting class of admission extraction and transformation ###")
    df = spark.read.csv(input_path, header=True)
            
    logger(f"\n*** Start dropping duplicates: {df.count()} records***")
    df = df.dropDuplicates(["code"])
    logger(f"*** End dropping duplicates: {df.count()} records***")

    logger(f"\n*** Start writing {df.count()} records to {output_path} ***")
    df.write.mode('overwrite').parquet(output_path)   
    logger(f"*** End writing {df.count()} records to {output_path} ***")     
    
    logger("\n### End class of admission extraction and transformation ###")
    
    return df

In [23]:
df_class_of_admission = class_of_admission_etl("non_immigrant_class_of_admission.csv", "staging_data/class_of_admission_etl_raw", print)
print("\nChecking for nulls")
display(df_class_of_admission.select(
    [F.count(F.when(F.isnull(c), c)).alias(c) for c in df_class_of_admission.columns]
).toPandas())
df_class_of_admission.limit(5)


### Starting class of admission extraction and transformation ###

*** Start dropping duplicates: 90 records***
*** End dropping duplicates: 90 records***

*** Start writing 90 records to staging_data/class_of_admission_etl_raw ***
*** End writing 90 records to staging_data/class_of_admission_etl_raw ***

### End class of admission extraction and transformation ###

Checking for nulls


Unnamed: 0,code,description,category
0,0,0,0


code,description,category
E3D,Spouse or Child of E3,Temporary worker visas
U2,Spouse or U1,Other Visas
F2,Spouses and children of F1,Student And Exchange visas
H1C,Registered nurses participating in the Nursing ...,Temporary worker visas
V3,"Dependents of V1 or V2, visa pending",Other Visas


###### Schema

In [24]:
df_class_of_admission.printSchema()

root
 |-- code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- category: string (nullable = true)



##### US Cities Demographics

###### Load and basic data filtering

In [25]:
def us_city_demographics_etl(input_path="us-cities-demographics.csv", output_path="staging_data/us_city_demographics_etl_raw", logger=(lambda message: None)):
    """
    Function that performs extraction and transformation of the US city demographics data returning a raw dataframe
    
    :param input_path: path containing the US city demographics data in csv format to extract and transform
    :param logger: callback for logging
    """       
    
    logger("\n### Starting US city demographics extraction and transformation ###")
    df = spark.read.csv(input_path, sep=";", header=True)
            
    logger(f"\n*** Start dropping duplicates: {df.count()} records***")
    df = df.dropDuplicates()
    logger(f"*** End dropping duplicates: {df.count()} records***")

    logger(f"\n*** Start transforming to race ratio: {df.count()} source records ***")
    df = df.groupBy(
        "State", "City"
    ).pivot("Race").agg(
        F.sum(F.col("Count")).cast("long").alias("population_count")
    ).select(
        F.col("State").alias("state"), 
        F.col("City").alias("city"),
        F.col("American Indian and Alaska Native").alias("american_natives"),
        F.col("Asian").alias("asian"),
        F.col("Black or African-American").alias("african_amerian"),
        F.col("Hispanic or Latino").alias("hispanic_latino"),
        F.col("White").alias("white")
    ).orderBy("State", "City")
    logger(f"*** End transforming to race ratio: {df.count()} result records  ***")

    logger(f"\n*** Start writing {df.count()} records to {output_path} ***")
    df.write.mode('overwrite').parquet(output_path)   
    logger(f"*** End writing {df.count()} records to {output_path} ***")     
    
    logger("\n### End US city demographics extraction and transformation ###")
    
    return df

In [26]:
df_us_city_demographics = us_city_demographics_etl(
    "us-cities-demographics.csv", "staging_data/us_city_demographics_etl_raw", print
)
print("\nChecking for nulls")
display(df_us_city_demographics.select(
    [F.count(F.when(F.isnull(c), c)).alias(c) for c in df_us_city_demographics.columns]
).toPandas())
df_us_city_demographics.limit(5)


### Starting US city demographics extraction and transformation ###

*** Start dropping duplicates: 2891 records***
*** End dropping duplicates: 2891 records***

*** Start transforming to race ratio: 2891 source records ***
*** End transforming to race ratio: 596 result records  ***

*** Start writing 596 records to staging_data/us_city_demographics_etl_raw ***
*** End writing 596 records to staging_data/us_city_demographics_etl_raw ***

### End US city demographics extraction and transformation ###

Checking for nulls


Unnamed: 0,state,city,american_natives,asian,african_amerian,hispanic_latino,white
0,0,0,57,13,12,0,7


state,city,american_natives,asian,african_amerian,hispanic_latino,white
Alabama,Birmingham,1319.0,1500,157985,8940,51728
Alabama,Dothan,656.0,1175,23243,1704,43516
Alabama,Hoover,,4759,18191,3430,61869
Alabama,Huntsville,1755.0,6566,61561,10887,121904
Alabama,Mobile,2816.0,5518,96397,5229,93755


###### Schema

In [27]:
df_us_city_demographics.printSchema()

root
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- american_natives: long (nullable = true)
 |-- asian: long (nullable = true)
 |-- african_amerian: long (nullable = true)
 |-- hispanic_latino: long (nullable = true)
 |-- white: long (nullable = true)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The star model is chosen in the implementation providing the following:

- Query performance. The implementation uses 1 Fact data with direct joins to the dimension tables creating clear and simple join paths providing faster query performance compared to an OLTP system
- Model is easily understood by consumer of data

![Image of Conceptual Data Model](capstone-dend.png)


#### 3.2 Mapping Out Data Pipelines

- Transform and Load class of admission
- Transform and Load airline
- Transform and Load US city demographics
- Transform and Load Time
- Transform and Load Immigration

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [28]:
def create_class_of_admission(input_path, output_path, logger):
    """
    Function that performs takes in a path to the class of admission raw data transforms and loads it into the dimension version
    
    :param input_path: path containing the class of admission data in raw format to extract and transform
    :param logger: callback for logging
    """           
    df = spark.read.parquet(input_path)
    logger(f"\n*** Start writing {df.count()} records to {output_path} ***")
    df.select(
        F.col("code").alias("class_of_admission_id"), 
        F.col("description"),
        F.col("category")   
    ).write.mode('overwrite').parquet(output_path)   
    logger(f"*** End writing {df.count()} records to {output_path} ***")     

def create_airline(input_path, output_path, logger):
    """
    Function that performs takes in a path to the airline raw data transforms and loads it into the dimension version
    
    :param input_path: path containing the airline data in raw format to extract and transform
    :param logger: callback for logging
    """           
    df = spark.read.parquet(input_path)
    logger(f"\n*** Start writing {df.count()} records to {output_path} ***")
    df.select(
        F.col("iata").alias("airline_id"), 
        F.col("name"),
        F.col("alias"),
        F.col("callsign"),
        F.col("country")
    ).write.mode('overwrite').parquet(output_path)   
    logger(f"*** End writing {df.count()} records to {output_path} ***")  

def create_us_city_demographic(input_path, output_path, logger):
    """
    Function that performs takes in a path to the US city demographics raw data transforms and loads it into the dimension version
    
    :param input_path: path containing the US city demographics data in raw format to extract and transform
    :param logger: callback for logging
    """           
    df = spark.read.parquet(input_path)
    logger(f"\n*** Start writing {df.count()} records to {output_path} ***")
    df.select(
        F.col("state"), 
        F.col("city"),
        F.col("american_natives"),
        F.col("asian"),
        F.col("african_amerian"),
        F.col("hispanic_latino"),
        F.col("white")
    ).write.mode('overwrite').parquet(output_path)   
    logger(f"*** End writing {df.count()} records to {output_path} ***")  
    
def create_time(input_path, output_path, logger):
    """
    Function that performs takes in a path to the immigration raw data transforms and loads it into the dimension version
    
    :param input_path: path containing the immigration data in raw format to extract and transform
    :param logger: callback for logging
    """           
    df = spark.read.parquet(input_path)
    df_event_dates = df.select(
        F.col("arrival_date_source").alias("event_date_source")
    ).union(
        df.select(
            F.col("departure_date_source").alias("event_date_source")
        )
    ).distinct().where(
        F.isnull("event_date_source") == False
    ).withColumn(
        "event_date", F.to_timestamp(F.col("event_date_source"), "yyyyMMdd")
    ).withColumn(
        "day", F.dayofmonth("event_date_source")
    ).withColumn(
        "week", F.weekofyear("event_date_source")
    ).withColumn(
        "month", F.month("event_date_source")
    ).withColumn(
        "year", F.year("event_date_source")
    ) .withColumn(
        "weekday", F.dayofweek("event_date_source")
    )
    df_event_dates = df_event_dates.drop("event_date_source")
    logger(f"\n*** Start writing {df_event_dates.count()} records to {output_path} ***")    
    df_event_dates.write.mode('overwrite').parquet(
        output_path, partitionBy=[ "year", "month"]
    )   
    logger(f"*** End writing {df_event_dates.count()} records to {output_path} ***")  

def create_immigration(country_input_path, port_input_path, immigration_input_path, output_path, logger):
    """
    Function that performs takes in a path to the immigration raw data transforms and loads it into the fact version
    
    :param country_input_path: path containing the I94 country code data in raw format to extract and transform
    :param port_input_path: path containing the I94 port data in raw format to extract and transform
    :param immigration_input_path: path containing the immigration data in raw format to extract and transform
    :param logger: callback for logging
    """           
    
    logger(f"\n*** Start augmenting dataframe ***")
    df_country = spark.read.parquet(country_input_path)
    df_port = spark.read.parquet(port_input_path)
    df = spark.read.parquet(immigration_input_path)

    columns = df.columns
    columns.append("country as location_of_origination")
    df = df.join(
        df_country, df.immigrant_city == df_country.code, how="left"
    ).selectExpr(columns)
    
    columns.append("country as location_of_residence")
    df = df.join(
        df_country, df.immigrant_residence == df_country.code, how="left"
    ).selectExpr(columns)
    
    columns.append("city as arrival_city")
    columns.append("state as arrival_state")
    columns.append("country as arrival_country")
    df = df.join(
        df_port, df.arrival_port == df_port.code, how='left'
    ).selectExpr(columns)
    columns.append("city as us_city")
    columns.append("state as us_state")
    columns.append("country as us_country")
    df = df.join(
        df_port, df.us_residence == df_port.code, how='left'
    ).selectExpr(columns)   
    
    df = df.withColumn(
        "arrival_date", F.to_timestamp(F.col("arrival_date_source"), "yyyyMMdd")
    ).withColumn(
        "departure_date", F.to_timestamp(F.col("departure_date_source"), "yyyyMMdd")
    )  
    logger(f"*** End augmenting dataframe ***")
    
    logger(f"\n*** Start writing {df.count()} records to {output_path} ***")
    df.select(
        F.col("admission_number").alias("immigration_id"),
        F.col("location_of_origination"),
        F.col("location_of_residence"),
        F.col("arrival_date"),
        F.col("arrival_city"),
        F.col("arrival_state"),
        F.col("arrival_country"),
        F.col("us_city"),
        F.col("us_state"),
        F.col("us_country"),
        F.col("departure_date"),
        F.col("age"),
        F.col("gender"),
        F.col("airline").alias("airline_id")
    ).write.mode('overwrite').parquet(
        output_path, partitionBy=[ "arrival_date" ]
    )   
    logger(f"*** End writing {df.count()} records to {output_path} ***") 

In [29]:
create_class_of_admission(
    "staging_data/class_of_admission_etl_raw", "model_data/class_of_admission", print
)

create_airline(
    "staging_data/airline_etl_raw", "model_data/airline", print
)

create_us_city_demographic(
    "staging_data/us_city_demographics_etl_raw", "model_data/us_city_demographics", print
)

create_time(
    "staging_data/immigration_etl_raw", "model_data/time", print
)

create_immigration(
    "staging_data/country_etl_raw",
    "staging_data/port_etl_raw",
    "staging_data/immigration_etl_raw", 
    "model_data/immigration", print
)


*** Start writing 90 records to model_data/class_of_admission ***
*** End writing 90 records to model_data/class_of_admission ***

*** Start writing 1120 records to model_data/airline ***
*** End writing 1120 records to model_data/airline ***

*** Start writing 596 records to model_data/us_city_demographics ***
*** End writing 596 records to model_data/us_city_demographics ***

*** Start writing 200 records to model_data/time ***
*** End writing 200 records to model_data/time ***

*** Start augmenting dataframe ***
*** End augmenting dataframe ***

*** Start writing 2994495 records to model_data/immigration ***
*** End writing 2994495 records to model_data/immigration ***


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [30]:
# Perform quality checks here
def validateEmptyData(df, target):
    if df.count == 0:
        raise ValueError(f"{target} fails empty data check.")
    return f"{target} passes empty data check."

def validateUniqueColumn(df, target, columns):
    result = df.groupBy(
        columns
    ).count().where(
        F.col('count') > 1
    ).select(
        F.sum('count').alias("record_count")
    ).first()
    if (result != None) & (result[0] != None):
        if result[0] > 0:
            raise ValueError(f"{target} fails unique data check.")
    return f"{target} passes unique data check."

def validateNotNullColumn(df, target, columns):
    result = df.select(
        [F.count(F.when(F.isnull(column), column)).alias(column) for column in columns]
    ).first()
    
    if (result != None):
        total_count = 0
        for count in result:
            total_count += count
         
        if total_count > 0:
            raise ValueError(f"{target} fails not null data check.")
    return f"{target} passes not null data check."

def validateTable(df, target, columns):  
    print(f"\n********************************************")
    print(f"Start {target} validation")
    
    assert validateEmptyData(
            df, target
    ) == f"{target} passes empty data check."
    
    assert validateUniqueColumn(
        df, target, columns  
    ) == f"{target} passes unique data check."  
    
    assert validateNotNullColumn(
        df, target, columns   
    ) == f"{target} passes not null data check."      
    
    print(f"End {target} validation successfully")

validateTable(
    spark.read.parquet("model_data/class_of_admission"), 
    "class_of_admission", 
    ["class_of_admission_id"]
)

validateTable(
    spark.read.parquet("model_data/airline"), 
    "airline", 
    ["airline_id"]
)

validateTable(
    spark.read.parquet("model_data/us_city_demographics"), 
    "us_city_demographics", 
    ["state", "city"]
)

validateTable(
    spark.read.parquet("model_data/time"), 
    "time", 
    ["event_date"]
)


********************************************
Start class_of_admission validation
End class_of_admission validation successfully

********************************************
Start airline validation
End airline validation successfully

********************************************
Start us_city_demographics validation
End us_city_demographics validation successfully

********************************************
Start time validation
End time validation successfully


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

1. airline (Dimension): Information about the airlines obtained from openflights.org. Can be used to compare travel patterns in relation to home country of airlines.
```
 |-- airline_id: string (nullable = true), primary key representing the 2 digit IATA code
 |-- name: string (nullable = true), name of airline
 |-- alias: string (nullable = true), alias of airline used in the industry
 |-- callsign: string (nullable = true), callsign of airline used in the industry
 |-- country: string (nullable = true), home country of airline
```
2. class_of_admission (Dimension): Type of Visa used from entry. This data is sourced from the USCIS site. Can be used to looked into the patterns of travel for different visa holders like tourist, study, work and other needs.
```
 |-- class_of_admission_id: string (nullable = true), code representing the Visa
 |-- description: string (nullable = true), user friendly description
 |-- category: string (nullable = true), cateogries the visa belongs to
```
3. time (Dimension): unique time of the events happening in the immigration Fact table and is source from the I94 Immigration Data. Can be used to look into the data broken into time periods
```
 |-- event_date: timestamp (nullable = true), event date that is obtained from arrdate or depdate
 |-- day: integer (nullable = true), day in the month of event date
 |-- week: integer (nullable = true), week in the year of event date
 |-- weekday: integer (nullable = true), day in the week of event date
 |-- year: integer (nullable = true), year of event date
 |-- month: integer (nullable = true), month of event date
```
4. demographic (Dimension): US city demographics that can be used to obtained the race percentage of the city and states and find the relation of the number of vistors with their place of origin
```
 |-- state: string (nullable = true), 2 character state code
 |-- city: string (nullable = true), name of the city
 |-- american_natives: long (nullable = true), number of Native Americans or Alaska Native
 |-- asian: long (nullable = true), number of Asians
 |-- african_amerian: long (nullable = true), number of African Americans
 |-- hispanic_latino: long (nullable = true), number of Hispanice and Latino
 |-- white: long (nullable = true), number of Whites
```
5. immigration (Fact): Solves the travel patterns of foreigner to United States, It also contains Gender, Age that can be used for differnt analysis
```
 |-- immigration_id: long (nullable = true), USCIS identifier to record the entry
 |-- location_of_origination: string (nullable = true), place where foreigner comes from
 |-- location_of_residence: string (nullable = true), place where foreigner is residing in
 |-- arrival_city: string (nullable = true), city of arrival of the immigrant
 |-- arrival_state: string (nullable = true), state of arrival of the immigrant
 |-- arrival_country: string (nullable = true), country of arrival of the immigrant
 |-- us_city: string (nullable = true), city of stay of the immigrant
 |-- us_state: string (nullable = true), state of stay of the immigrant
 |-- us_country: string (nullable = true), country of stay of the immigrant
 |-- departure_date: timestamp (nullable = true), date the foriegner left the country
 |-- age: integer (nullable = true), age of the immigrant
 |-- gender: string (nullable = true), gender of the immigrant
 |-- airline_id: string (nullable = true), 2 digit IATA code of airlines
 |-- arrival_date: timestamp (nullable = true), arrival date of the immigrant
```


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

The scripts are broken into mini functions that do not access global state. This allows them to be moved easily to the Cloud and used in other systems. The functions are also generic in taking in an input and writing out the output as blob files. This allows data to be move around easily. The same technique allows Airflow to be added in later with lesser effort since the input and output can placed in an accessible location like S3, GCS or Azure. All scripts are written using Apache Spark which allowas easily pathway into processing with Cloud based services

* Propose how often the data should be updated and why.

airline and class of submission does not change frequently, so it can go on a Bi-weekly or monthly. 
The data is also pretty static and small, so it can be a full upload. 
This means the old data will be truncated with the new batch loading everything from scratch.

demographic the period of update is very low so this should go on a 6 motnh to a year for the frequency.
This will be an append update since there should little to none updates of the previous year or period's numbers.

immigration and time this tables come from the freqently and should be uploaded on a shorter schedule like daily of Weekly.
Since the data is big in size, the approached it to append to exisitng table instead of reloading everything every time
