# Immigration Data ETL
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

## Step 2 Explore and assess the data

### Overview
In this section we will explore the datasets chosen by **Flyby Salads** product management department using Apache Spark. We will idenfify and implement necessary transformations and finally export the data to S3 in parquet format.

Subsections of this Notebook:
* Preparations (imports, file locations, etc.)
* Reading files using Spark
* Data Exploration and description
    * Dataframe schema analysis
    * Data Quality analysis of the data
    * Data Type transformation steps
* Defining a new schema and storing it in JSON format
* Logical Data exploration examples
  (Analyzing table contents using PySpark to find more transformation actions)
    * Creating an airport dimension table
    * Linking airports to immigration data
    * 
* Exporting the resulting tables to Parquet format

**Note:**

    * We want to keep track of transformation steps
    * So if a required transformation is identified, it will be assigned a number like "TFA-x"
    * Where "TFA" stands for transformation action (plus a number)

### Preparations

#### Imports and Installs

In [38]:
# Do all imports and installs here
import pandas as pd
import numpy as np
import logging
import sys
from datetime import datetime
from os.path import getsize
from nb_helpers import summarize_data, get_sas_definitions, read_sas_in_chunks, \
                        read_csv_print, print_stat, non_iso_date_change, change_nullables
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark import SparkContext, SparkConf
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import pandas_udf

# Logging
logging.basicConfig(
    level=logging.ERROR,
    format='%(asctime)s %(levelname)s \t %(message)s ',
    datefmt='%Y-%m-%d %H:%M:%S',
    stream=sys.stdout,
)
log = logging.getLogger('log')

# Improve view
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)


### Reading data files into Spark Dataframes
The next cell contains the file locations for each dataset, you may adjust it if required. Also note the configuration of the sample size which will improve performance while making some basic analysis.

In [2]:
# Source file locations
dem_file = 'us-cities-demographics.csv'
imm_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
air_file = 'airport-codes_csv.csv'

# Schema file locations
air_schema_file = 'air_schema.json'
dem_schema_file = 'dem_schema.json'
imm_schema_file = 'imm_schema.json'


# If working on a sample of the large immigration dataset, set your sample size here:
sample_size = 0.01

During my analysis I changed the schema of each dataset imported and based on the transformations done created a new schema definition. The definition is stored in a JSON file.

The data will be **imported again** using those schema **at the end** of the "Explore and Assess" section.

Now we setup the Spark Session and read all three files into a dataframe

In [3]:
# Setup Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder \
                    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
                    .enableHiveSupport() \
                    .getOrCreate()
start_time = datetime.now()

# Read data from sources
imm_df = spark.read \
                .format('com.github.saurfang.sas.spark') \
                .option("inferSchema","true") \
                .load(imm_file)
imm_rows = imm_df.count()
air_df = spark.read.csv(air_file, header=True)
air_rows = air_df.count()
dem_df = spark.read.csv(dem_file, header=True)
dem_rows = dem_df.count()

# List of our staging dataframes
df_list = {'imm_df': imm_df, 'air_df': air_df, 'dem_df': dem_df}

# Save a copy of initial total data, because we will work with a sample only in this notebook
imm_full = imm_df

# Output of results
elapse = datetime.now() - start_time
print('Reading ', imm_rows , ' lines from ', imm_file)
print('Reading ', dem_rows, ' lines from ', dem_file)
print('Reading ', air_rows, ' lines from ', air_file)
print('Operation runtime ', elapse)

Reading  3096313  lines from  ../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat
Reading  2891  lines from  us-cities-demographics.csv
Reading  55075  lines from  airport-codes_csv.csv
Operation runtime  0:00:42.500266


### Explore and Describe data
This section contains the following subsections:
* Spark schema analysis
* Spark dataframe descriptions analysis

Before we start this section we are naming each dataframe and reducing immigration data to a sample of the complete dataframe, keeping the full set in variable `imm_full` (the sample size being configured at the beginning of this notebook).

In [4]:
imm_df = imm_full.sample(sample_size)
imm_df.name = 'i94 immigration data'
dem_df.name = 'city demographics'
air_df.name = 'airport codes'

#### Display schemas created automatically by Spark

In [5]:
imm_df.printSchema()
air_df.printSchema()
dem_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

**Note** (TFA-1) Several columns in the i94 dataframe were infered as double while they should be of data type integer. Those types should be changed (see `cicid, i94yr,...`).

**Note** (TFA-2) Several columns in airport and demographic data were infered as string where they should be either double or integer (see airport data, `elevation_ft` or demographic data `median_age`

**Note** (TFA-3) Demographics data file was imported as a one-column file due to ';' being used as delimiter. We read the file again:

In [6]:
dem_df = spark.read.csv(dem_file, header=True, sep=';')
dem_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



**Note**: (TFA-4) For easier handling we strip columns of spaces, apply lower case and replace inner spaces and '-' with underscore '_'

In [7]:
def trim_col_headers(df):
    for col in df.columns:
        changed_col = col.strip().lower().replace(' ', '_').replace('-', '_')
        df = df.withColumnRenamed(col, changed_col)
    return df

dem_df = trim_col_headers(dem_df)

### Dataset descriptions

#### Statistical summary on data
The descriptive statistics were shown in Step 1 already, so we will skip this step here

#### Missing values per dataset
First we are measuring the amount of empty cells per column

In [8]:
# Get total amount of entries per column
imm_col_count = imm_full.describe().filter(F.col('summary') == 'count')
air_col_count = air_df.describe().filter(F.col('summary') == 'count')
dem_col_count = dem_df.describe().filter(F.col('summary') == 'count')

In [9]:
# Divide number of entries per column by total dataframe rows to get a percentage value
for df, numrows in zip([imm_col_count, air_col_count, dem_col_count], \
                       [imm_rows, air_rows, dem_rows]):
    for column in df.columns:
        if column == 'summary':
            next
        else:
            df = df.withColumn(column, F.col(column).cast('int'))
            df = df.withColumn(column, F.bround((F.col(column) / numrows * 100), scale=1))
    print('{}'.format(df.show()))

+-------+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|summary|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|admnum|fltno|visatype|
+-------+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|  count|100.0|100.0| 100.0| 100.0| 100.0|  100.0|  100.0|  100.0|   95.1|   95.4| 100.0|  100.0|100.0|   100.0|    39.2|  0.3|  100.0|   95.5|    0.0|   95.5|  100.0|  100.0|  86.6|   3.7|   97.3| 100.0| 99.4|   100.0|
+-------+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------

#### Unique values per column
We can count the number of unique values per dataframe or column using `dropDuplicates()` and `count()`

In [16]:
# Count **distinct** entries per column and print
for df, numrows in zip([imm_df, air_df, dem_df], [imm_rows, air_rows, dem_rows]):
    for column in df.columns:
        distinct_count = df.select(F.col(column)).distinct().count()
        print('No. of unique entries in table "{}": {}'.format(column, distinct_count))
    # print('{}'.format(df.show()))

No. of unique entries in table "cicid": 31020
No. of unique entries in table "i94yr": 1
No. of unique entries in table "i94mon": 1
No. of unique entries in table "i94cit": 182
No. of unique entries in table "i94res": 186
No. of unique entries in table "i94port": 149
No. of unique entries in table "arrdate": 30
No. of unique entries in table "i94mode": 5
No. of unique entries in table "i94addr": 101
No. of unique entries in table "depdate": 173
No. of unique entries in table "i94bir": 99
No. of unique entries in table "i94visa": 3
No. of unique entries in table "count": 1
No. of unique entries in table "dtadfile": 65
No. of unique entries in table "visapost": 257
No. of unique entries in table "occup": 14
No. of unique entries in table "entdepa": 13
No. of unique entries in table "entdepd": 12
No. of unique entries in table "entdepu": 2
No. of unique entries in table "matflag": 2
No. of unique entries in table "biryear": 99
No. of unique entries in table "dtaddto": 265
No. of unique ent

### Data Transformations
This section will contain the steps required to transform the datasets.

From previous notes we now carry out some transformation actions.
1. Double to Integer (TFA-1)
1. String to numbers (TFA-2)
1. SAS Date Type to date (TFA-5)
1. Non-Iso Date Type to date (TFA-6)

Note that TFA-3 and TFA-4 were already carried out in the previous chapter.

#### Integer Type Transformation (TFA-1)
The fields mentioned here are IntegerType but were infered as double, we are changing them to integer.

In addition the string column `entdepa` will also be changed.

In [10]:
int_columns = ['cicid','i94yr', 'i94mon', 'i94cit', 'i94res', \
              'arrdate', 'depdate', 'i94mode', 'i94bir', \
              'i94visa', 'count', 'biryear']

def change_type(df=None, fields=None, totype=None):
    if ((totype is not None) and (fields is not None)):
        for column in fields:
            cur_type = df.schema[column]
            df = df.withColumn(column, F.col(column).cast(totype))
            new_type = df.schema[column]
            print('Done, switched column {} from {} to {}' \
                  .format(column, cur_type.dataType, new_type.dataType))
        print(df.select(fields).show(5))
        return df
    else:
        return None

imm_df = change_type(imm_df, int_fields, 'int')
imm_df = change_type(imm_df, ['admnum'], 'bigint')

Done, switched column cicid from DoubleType to IntegerType
Done, switched column i94yr from DoubleType to IntegerType
Done, switched column i94mon from DoubleType to IntegerType
Done, switched column i94cit from DoubleType to IntegerType
Done, switched column i94res from DoubleType to IntegerType
Done, switched column arrdate from DoubleType to IntegerType
Done, switched column depdate from DoubleType to IntegerType
Done, switched column i94mode from DoubleType to IntegerType
Done, switched column i94bir from DoubleType to IntegerType
Done, switched column i94visa from DoubleType to IntegerType
Done, switched column count from DoubleType to IntegerType
Done, switched column biryear from DoubleType to IntegerType
+-----+-----+------+------+------+-------+-------+-------+------+-------+-----+-------+
|cicid|i94yr|i94mon|i94cit|i94res|arrdate|depdate|i94mode|i94bir|i94visa|count|biryear|
+-----+-----+------+------+------+-------+-------+-------+------+-------+-----+-------+
|   21| 2016| 

#### String to numbers (TFA-2)
In dataframes for demographic and airport data no float types were infered. Instead we see floating point numbers imported as strings.
Those will be changed here.

In [11]:
# Airport data:
float_columns = ['elevation_ft']
int_columns = None

air_df = change_type(air_df, float_columns, 'double')

Done, switched column elevation_ft from StringType to DoubleType
+------------+
|elevation_ft|
+------------+
|        11.0|
|      3435.0|
|       450.0|
|       820.0|
|       237.0|
+------------+
only showing top 5 rows

None


In [12]:
# Demographic Data
float_columns = ['median_age', 'average_household_size', 'count' ]
int_columns = ['male_population', 'female_population', 'total_population', 'number_of_veterans', 'foreign_born']

dem_df = change_type(dem_df, float_columns, 'double')
dem_df = change_type(dem_df, int_columns, 'int')

Done, switched column median_age from StringType to DoubleType
Done, switched column average_household_size from StringType to DoubleType
Done, switched column count from StringType to DoubleType
+----------+----------------------+-------+
|median_age|average_household_size|  count|
+----------+----------------------+-------+
|      33.8|                   2.6|25924.0|
|      41.0|                  2.39|58723.0|
|      38.5|                  2.58| 4759.0|
|      34.5|                  3.18|24437.0|
|      34.6|                  2.73|76402.0|
+----------+----------------------+-------+
only showing top 5 rows

None
Done, switched column male_population from StringType to IntegerType
Done, switched column female_population from StringType to IntegerType
Done, switched column total_population from StringType to IntegerType
Done, switched column number_of_veterans from StringType to IntegerType
Done, switched column foreign_born from StringType to IntegerType
+---------------+-------------

#### SAS Date Type Transformations (TFA-5)
The SAS formatted dates (e.v. "20573.0") should be changed to proper date types.
Note we are not putting this into the nb_helpers.py because it' s a more exemplary kind of transformation.

In [13]:
sas_date_columns = ['arrdate', 'depdate']
from pyspark.sql.functions import date_add

def sasdate_to_date(df=None, column_list=None):
    # SAS has its own epoch, which we add temporarily as a column
    df = df.withColumn('epoch_start', F.lit("01-01-1960 00:00:00"))
    df = df.withColumn('epoch_start', F.to_date(F.col('epoch_start'), "dd-M-yyyy"))
    # Check each column and convert double to int, then add this int to epoch_start
    for column in column_list:
        df = df.withColumn(column, F.col(column).cast('int'))
        stm = 'date_add(epoch_start, {})'.format(column)
        df = df.withColumn(column, F.expr(stm))
    return df

imm_df = sasdate_to_date(imm_df, sas_date_columns)
imm_df.select(sas_date_columns).show(5)

+----------+----------+
|   arrdate|   depdate|
+----------+----------+
|2016-04-01|2016-04-09|
|2016-04-01|2016-08-05|
|2016-04-01|2016-07-30|
|2016-04-01|2016-04-06|
|2016-04-01|2016-04-07|
+----------+----------+
only showing top 5 rows



#### Non-Iso Date Type Transformations (TFA-6)
Columns `dtadtto` and `dtadtofile` contain non-iso dateformats and will be converted into proper date types as well.

In [14]:
# if required, update the dictionary with a new column and a new pattern or change existing
imm_non_iso_dates = {'dtaddto': 'MMddyyyy', 'dtadfile': 'yyyyMMdd'}    
imm_df = non_iso_date_change(imm_df, imm_non_iso_dates)

print('Here a view on some examples: ')
print(imm_df.select('dtaddto').show(5), imm_df.select('dtadfile').show(5))

### Changing SAS Date Type to Spark Datetype
Done, switched column dtaddto from StringType to DateType
Done, switched column dtadfile from StringType to DateType
Here a view on some examples: 
+----------+
|   dtaddto|
+----------+
|2016-09-30|
|2016-09-30|
|      null|
|2016-06-29|
|2016-06-29|
+----------+
only showing top 5 rows

+----------+
|  dtadfile|
+----------+
|2016-04-01|
|2016-04-01|
|2016-04-01|
|2016-04-01|
|2016-04-01|
+----------+
only showing top 5 rows

None None


#### Missing values and nullable setting (TFA-7)

##### Changing nullable setting
Changing columns in the i94 dataset to `nullable = true` where applicable.

In [15]:
# Define fields which should not be empty
not_null_fields = ['cicid', 'admnum']
imm_df = imm_df.dropna(subset=not_null_fields)
imm_df = change_nullables(session=spark, df=imm_df, column_list=not_null_fields)

Done, Schema changed for  2  columns.


Demographic file did not have any missing values, so we set all fields to nullable = false

In [16]:
not_null_fields = dem_df.columns
dem_df = dem_df.na.drop(subset=not_null_fields)
dem_df = change_nullables(session=spark, df=dem_df, column_list=not_null_fields)

Done, Schema changed for  12  columns.


From airport data we want the items with iata code filled only

In [17]:
not_null_fields = ['iata_code']
air_df = air_df.na.drop(subset=not_null_fields)
air_df = change_nullables(session=spark, df=air_df, column_list=not_null_fields)

Done, Schema changed for  1  columns.


#### Exporting a PySpark schema to JSON
**(optional: re-importing data with updated schema from JSON)**
Now that we have altered the dataframe schemas we export those schemas into JSON files.

In [20]:
# Write to Json file using a function
def write_schema_json(df, schema_file):
    import json
    import sys
    with open(schema_file, "w") as f:
        json.dump(df.schema.jsonValue(), f)

# Let's call that for each dataframe
write_schema_json(imm_df, imm_schema_file)
write_schema_json(dem_df, dem_schema_file)
write_schema_json(air_df, air_schema_file)

**At this point we could read the JSON schema and apply it for importing data.**

*However during my work I got a ClassCastException after importing the data on ".show()" or any other actions.
This is why the following cell is turned into markdown and exempted from execution*

#Save some lines by using a function to import a schema from JSON

`def import_schema(schema_file):`   
    `import json
    schema_df = spark.sparkContext.wholeTextFiles(schema_file)
    schema_content = schema_df.collect()[0][1]
    schema_dict = json.loads(str(schema_content))
    new_schema = T.StructType.fromJson(schema_dict)
    return new_schema`

#Now apply the schema for each
`start_time = datetime.now()
imm_df = spark.read.format('com.github.saurfang.sas.spark').load(path=imm_file, schema=import_schema(imm_schema_file))
air_df = spark.read.csv(air_file, header=True, schema=import_schema(air_schema_file))
dem_df = spark.read.csv(dem_file, header=True, sep=';', schema=import_schema(dem_schema_file))
elapse = datetime.now() - start_time
print('Reading ', imm_df.count(), ' lines from ', imm_file)
print('Reading ', dem_df.count(), ' lines from ', dem_file)
print('Reading ', air_df.count(), ' lines from ', air_file)
print('Operation runtime ', elapse)`

#### Resulting schema definitions
In the sections above we have transformed specific values 

In [18]:
imm_df.printSchema()
air_df.printSchema()
dem_df.printSchema()

root
 |-- cicid: integer (nullable = false)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: date (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- dtadfile: date (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: date (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: long (nullable =

### Logical exploration using PySpark
In this chapter we will show some more explorative analysis that we help us understanding the data.

**While doing this we will carry out more transformations. Those will be designated again as "TFA-x" to keep track of them**

#### Example 01 - Cleaning airport data to get an airport dimension table (TFA-8)
The airport codes table has over 50k entries. We can reduce it by filtering for *type airport* and for matching with immigration data we also need the *iata_code*.

The resulting dataframe from the airport dataset will be kept as a transformed version of the table (trimmed of spaces in column `iata_code` and exported to Parquet later (dimension table "airport").

The code below creates five examples of travellers and their corresponding airport information.

In [25]:
# Read all available airport codes from the airport dataframe
airports_fields = ['iata_code', 'name', 'municipality', 'iso_region', \
                   'iso_country', 'coordinates']
dim_airports = air_df.filter(air_df.iata_code.isNotNull()).select(airports_fields)

# Split the coordinates column into latitude and longitude, remove spaces
dim_airports = dim_airports.withColumn('latitude', F.split(dim_airports['coordinates'], ',').getItem(0)).withColumn('longitude', F.split(dim_airports['coordinates'], ',').getItem(1))
dim_airports = dim_airports.withColumn('iata_code', F.trim(F.col('iata_code'))).withColumn('latitude', F.trim(F.col('latitude'))).withColumn('longitude', F.trim(F.col('longitude')))
dim_airports = dim_airports.drop('coordinates')

# TEST if data can be matched
# Read some columns from immigration dataset including the airport codes, remove spaces
imm_airports = imm_df.select(['cicid', 'arrdate', 'i94port']).withColumnRenamed('i94port', 'airport').distinct()
imm_airports = imm_airports.withColumn('airport', F.trim(F.col('airport')))

# Join with immigration dataset
leftouter_set = imm_airports.join(dim_airports, dim_airports.iata_code == imm_airports.airport, 'leftouter')
matching = leftouter_set.select(['iata_code']).filter(F.col('iata_code').isNotNull()).count()
not_matching = leftouter_set.select(['iata_code']).filter(F.col('iata_code').isNull()).count()
print('Successfully joined lines: {} \t\t\tJoin failed on {} lines'.format(matching, (not_matching)))
print(leftouter_set.show(5))

Successfully joined lines: 20489 			Join failed on 10530 lines
+-------+----------+-------+---------+--------------------+------------+----------+-----------+------------------+-----------+
|  cicid|   arrdate|airport|iata_code|                name|municipality|iso_region|iso_country|          latitude|  longitude|
+-------+----------+-------+---------+--------------------+------------+----------+-----------+------------------+-----------+
| 155458|2016-04-01|    BGM|      BGM|Greater Binghamto...|  Binghamton|     US-NY|         US|      -75.97979736|42.20869827|
| 429152|2016-04-02|    FMY|      FMY|          Page Field|  Fort Myers|     US-FL|         US|-81.86329650879999|26.58659935|
|4903447|2016-04-26|    FMY|      FMY|          Page Field|  Fort Myers|     US-FL|         US|-81.86329650879999|26.58659935|
|5221677|2016-04-28|    FMY|      FMY|          Page Field|  Fort Myers|     US-FL|         US|-81.86329650879999|26.58659935|
|3149638|2016-04-16|    FMY|      FMY|          

This catches **only about 65% of the entries** in our fact table.

Now we could also try to match the city name in the SAS values list to the airport`municipality` column, but we consider this even worse.

Therefore we check the airport codes in the immigration table which are not matching to column `i94port` and **count missed joins per airport**:

In [26]:
# Get list of missing entries where airport code could not match
not_matching = leftouter_set.select(['airport', 'iata_code']).filter(F.col('iata_code').isNull())
# Count number of missing entries
not_matching.groupBy('airport').count().orderBy('count', ascending=False).show()

+-------+-----+
|airport|count|
+-------+-----+
|    NYC| 4706|
|    HHW| 1408|
|    CHI| 1305|
|    FTL|  948|
|    LVG|  906|
|    WAS|  742|
|    SAI|  241|
|    SAJ|   93|
|    NOL|   47|
|    XXX|   41|
|    YGF|   29|
|    X96|   22|
|    PHU|   21|
|    YHC|    7|
|    INP|    3|
|    JKM|    3|
|    EPI|    2|
|    MIL|    2|
|    SSM|    2|
|    PHR|    1|
+-------+-----+
only showing top 20 rows



**This list sets a priority for cleaning up our airport dataframe or immigration data.**

*Example: if we find out why "NYC" is not matching an entry of our airport CSV list we would have an additional 4.888 joins with our fact table(!)*

One way to improve data quality on airports would be updating the airport code table with those missing codes and their corresponding city names. Another way would be to check if the airport code in the immigration data is correct at all.

*At least for our example "NYC" it seems that this is not a proper iata_code. New York's airports have individual codes like "LGA" for LaGuardia Airport.*

*This indicates that we should **correct the data in the immigration table** with the correct value. This requires however to **somehow derive the correct iata_code from other columns in the table**.

I will take this data quality action **out of scope**.

In [24]:
imm_df.columns

['cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'i94addr',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'dtadfile',
 'visapost',
 'occup',
 'entdepa',
 'entdepd',
 'entdepu',
 'matflag',
 'biryear',
 'dtaddto',
 'gender',
 'insnum',
 'airline',
 'admnum',
 'fltno',
 'visatype',
 'epoch_start']

#### Example 02 - Create status code dimension table (TFA-9)


In [31]:
# Extract unique entries of status flags from immigration dataframe
status_columns = ['entdepa', 'entdepd', 'entdepu', 'matflag']

dim_status = imm_df.select(status_columns) \
                .dropna() \
                .dropDuplicates() \
                .withColumn("statusFlagId", \
                            F.monotonically_increasing_id()) 
dim_status.name = 'dim_status'
dim_status.show()

+-------+-------+-------+-------+-------------+
|entdepa|entdepd|entdepu|matflag| statusFlagId|
+-------+-------+-------+-------+-------------+
|      H|      O|      U|      M| 120259084288|
|      Z|      I|      U|      M| 876173328384|
|      G|      N|      U|      M| 919123001344|
|      G|      O|      U|      M|1640677507072|
+-------+-------+-------+-------+-------------+



Now we check if we can match those to our airports dataframe using the short code in column `i94port`:

#### Example 03 - Time Data dimension table (TFA-10)
We create a dimension table from the provided immigration data

In [None]:
# Define the relevant fields where to collect date entries
dim_time_columns = ['arrdate', 'depdate']

# Collect all entries and remove duplicates
dates_in_col = imm_df \
                .select(time_fields) \
                .dropDuplicates()

# Concatenate the two columns (which needs another type conversion back to date)
dim_time = dates_in_col.withColumn('datestamp', F.explode(F.col('arrdate')))
dim_time.show()

# Create a dataframe and fill it with all missing columns
dim_time = spark.createDataFrame({'datestamp': dim_time})            
dim_time.name = 'dim_time'
dim_time = dim_time \
            .withColumn('day_of_week', F.dayofmonth('datestamp')) \
            .withColumn('week', F.weekofyear('datestamp')) \
            .withColumn('month', F.month('datestamp')) \
            .withColumn('year', F.year('datestamp')) \
            .dropDuplicates()

#Output
dim_time.show()

#### Example 04 - Demographics dimension table (TFA-11)

In [None]:
demographics_columns = dem_df.columns
dim_demographics = dem_df.select(demographics_columns).dropDuplicates()
dim_demographics.name = 'dim_demographics'
dim_demographics.show(5)

#### Example 05 - Immigration Fact Table


In [None]:
immigration_fact_fields = ['cicid']
immigration_facts = imm_df \
                    .select(immigration_fact_fields) \
                    .withColumnRenamed('arrdate','arrival_dt') \
                    .withColumnRenamed('depdate','departure_dt') \
immigration_facts.name = 'immigration_facts'

In [None]:
imm_df.columns

### Exporting the data to Parquet
We export the collected data in Parquet format after we have processed the cleaning steps above.
Note that in this notebook we are only writing a sample of the immigration data. Dataframe `imm_full` contains all 3M rows we will write in production.

In [None]:
immigration_facts.write.parquet("immigration_facts_data", mode='overwrite', compression='gzip')
dim_demographics.write.parquet('dim_demographics_data', mode='overwrite', compression='gzip')
dim_airports.write.parquet('dim_airports_data', mode='overwrite', compression='gzip')
dim_time.write.parquet('dim_time_data', mode='overwrite', compression='gzip')
dim_status.write.parquet('dim_status_data', mode='overwrite', compression='gzip')