# Project Title
### Data Engineering Capstone Project

#### Project Summary
A travel agency startup, Travellers Inc, wants to hope on to the digital transformation and transform their business with technology or TravelTech.

As their data engineer, you are tasked with building an ETL pipeline that extracts data from the sources they have collected previously and transforms the data into a set of dimensional tables for the analytics team to continue finding insights into where and what attracts travellers to visit the United States.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import re
import os
from pathlib import Path
import shutil
import glob
import numpy as np
import json
import csv

In [2]:
import psycopg2
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType, StringType, IntegerType
from pyspark.sql.functions import count, desc, col, when, from_unixtime

In [185]:
DATABASE_HOST="127.0.0.1"
DATABASE_PORT="5432"
DATABASE_NAME="travellers"
DATABASE_USER="<database_user>"
DATABASE_PASS="<database_pass>"
DATABASE_URL=f"jdbc:postgresql://{DATABASE_HOST}:{DATABASE_PORT}/{DATABASE_NAME}"
DATABASE_POSTGRESQL_URL=f"postgresql://{DATABASE_USER}:{DATABASE_PASS}@{DATABASE_HOST}:{DATABASE_PORT}/{DATABASE_NAME}"

### Step 1: Scope the Project and Gather Data

#### 1.1: Scope 

This are the data which we have on hands
* 194 Immigration Data
* Temperature Data 
* US Demographics Data
* Airport Code Data

1. We may want to study how the weather affects the travelers coming into the country. <br /> 
Eg. During Summer, there are less `pleasure` travelers. During Fall or Spring, we might see more of `student` travelers. <br /> 
This would allow us to better plan and cater the air traffic resources during different period of the year. <br /> 
In addition, this data could be provided to retailers so that they can planed for events throughout the year to attract the resepctive types of travellers <br /> 

2. We could also the data to study where in particular is a popular spot for travellers throughout the year. <br />
Eg. Canada might be a hotspot for `pleasure` travelers during the autumn season <br />
This would allow us to planned ahead for the increase in tourists so as to provide them with a more pleasant experience <br />

#### 1.2: Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [4]:
!head -5 ./dataset/immigration_data_sample.csv

,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,07202016,F,,JL,56582674633.0,00782,WT
2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94361995930.0,XBLNG,B2
589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,07052016,M,,LH,55780468433.0,00464,WT
2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789696030.0,00739,B2


In [85]:
!head -5 ../../data2/GlobalLandTemperaturesByCity.csv

dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
1743-11-01,6.068,1.7369999999999999,Århus,Denmark,57.05N,10.33E
1743-12-01,,,Århus,Denmark,57.05N,10.33E
1744-01-01,,,Århus,Denmark,57.05N,10.33E
1744-02-01,,,Århus,Denmark,57.05N,10.33E


In [86]:
!head -5 ./dataset/us-cities-demographics.csv

City;State;Median Age;Male Population;Female Population;Total Population;Number of Veterans;Foreign-born;Average Household Size;State Code;Race;Count
Silver Spring;Maryland;33.8;40601;41862;82463;1562;30908;2.6;MD;Hispanic or Latino;25924
Quincy;Massachusetts;41.0;44129;49500;93629;4147;32935;2.39;MA;White;58723
Hoover;Alabama;38.5;38040;46799;84839;4819;8229;2.58;AL;Asian;4759
Rancho Cucamonga;California;34.5;88127;87105;175232;5821;33878;3.18;CA;Black or African-American;24437


In [87]:
!head -5 ./dataset/airport-codes_csv.csv

ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
00A,heliport,Total Rf Heliport,11,NA,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
00AA,small_airport,Aero B Ranch Airport,3435,NA,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
00AK,small_airport,Lowell Field,450,NA,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
00AL,small_airport,Epps Airpark,820,NA,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"


### Step 2: Explore and Assess the Data

#### Uitlity Methods

In [103]:
def create_spark_session():
    spark_session = SparkSession.builder.\
    config("spark.jars.repositories", "https://repos.spark-packages.org/").\
    config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
    enableHiveSupport().getOrCreate()
    return spark_session
    #     config("spark.jars","./driver/postgresql-42.4.0.jar").\
#     config("spark.jars.packages", "org.postgresql:postgresql:jar:42.4.0").\
# config('spark.driver.extraClassPath', './driver/postgresql-42.4.0.jar').\

In [180]:
def show_spark_config(spark):
    print(spark.sparkContext.getConf().getAll())

In [89]:
def load_csv(file_path, separator=',', header='infer'):
    if (not os.path.exists(file_path)):
        print(f"No data loaded - {file_path} does not exists!")
    return pd.read_csv(file_path, sep=separator, header=header)

In [90]:
def load_sas_files(spark_session, sas_file_pattern, sas_columns):
    count=0
    df_spark=None
    
    for sas_file_path in glob.glob(sas_file_pattern, recursive=True):
        print(f"{count}")
        file_name_without_extension=Path(sas_file_path).stem
        sas_output_folder=f"./sas_data/sas_data_{file_name_without_extension}"
        print(f"Processing: Loading {sas_file_path} into {sas_output_folder}")
        df_sas = load_sas_file(spark_session, sas_file_path, sas_output_folder)
        df_sas = df_sas.select(sas_columns)
        total_records = df_sas.count()
        total_columns = len(df_sas.columns)
        print(f"Processing: Loaded {sas_file_path} with [{total_records}] records with [{total_columns}] columns")
        print(df_sas.columns)
        source_columns = 0 if df_spark is None else len(df_spark.columns)
        if (df_spark is None):
            print(f"Processing: Assigning Dataset to Immigration")
            df_spark = df_sas
        elif (total_columns == source_columns):
            print(f"Processing: Union Dataset to Immigration")
            df_spark = df_spark.union(df_sas)
        else:
            print(f"Processing: Columns Mismatched to Immigration - has [{total_columns}] expected [{source_columns}]")
        print("--")
        count+=1
    #     if (count > 0):
    #         break
    return df_spark

In [91]:
def load_sas_file(spark_session, sas_file_path, sas_output_folder):
    df_spark = spark_session.read.format('com.github.saurfang.sas.spark')\
        .load(sas_file_path)

#     sas_output_folder="./sas_data"
    
    sas_output_path = Path(sas_output_folder)
#     if sas_output_path.exists():
#         print (f"Processing: Loading {sas_file_path} into {sas_output_path}")
#         shutil.rmtree(sas_output_path, ignore_errors=False, onerror=None)

    #write to parquet
    if not sas_output_path.exists():
        df_spark.write.parquet(sas_output_folder)
    df_spark=spark_session.read.parquet(sas_output_folder)
    
    return df_spark

In [92]:
def describe_dataframe(df):
    total_rows=len(df)
    total_columns=len(df.columns)
    print(f"Processing: There are a total of [{total_rows}] rows")
    print("--")
    print(f"Processing: The following is the available [{total_columns}] columns")
#     df_immigration.columns
#     print(df.dtypes)
    print(df.info())
    
    print("--")
    print(f"Processing: The following is the available statistics of the [{total_columns}] columns")
    print(df.describe())

In [93]:
def housekeep_rows_with_empty_values(df):
    total_rows=len(df)
    print(f"Processing: Before removing empty rows - [{total_rows}] rows")
    df_non_empty=df.dropna()
    total_rows=len(df_non_empty)
    print(f"Processing: After removing empty rows - [{total_rows}] rows")
    print("--")
    return df_non_empty

In [94]:
def housekeep_rows_with_duplicated_values(df):
    total_rows=len(df)
    print(f"Processing: Before removing duplicated rows - [{total_rows}] rows")
    df_non_duplicate=df.drop_duplicates()
    total_rows=len(df_non_duplicate)
    print(f"Processing: After removing duplicated rows - [{total_rows}] rows")
    print("--")
    return df_non_duplicate

In [95]:
def housekeep_empty_and_duplicates(df):
    df=housekeep_rows_with_empty_values(df)
    df=housekeep_rows_with_duplicated_values(df)
    return df

In [96]:
def replace_series_value(series, value_to_remove, value_to_replace=""):
    print(f"Processing: Replacing Series Value [{value_to_remove}] to [{value_to_replace}]")
    series=series.astype(str).str.replace("["+value_to_remove+"]",value_to_replace,regex=True)
    series=series.astype(str).str.strip()
    return series

In [97]:
def convert_sas_date_to_readable_format(df, sas_column_name, readable_column_name, readable_format):
    print(f"Processing: Converting SAS Date [{sas_column_name}] to [{readable_column_name}] with date format ({readable_format})")
    return df.withColumn(readable_column_name, \
                from_unixtime(df_immigration[sas_column_name]  * 86400 - 315619200, readable_format) if df_immigration[sas_column_name] is not None else None)

In [98]:
def get_seasonality(month):
    # April to June as Spring
    # July to September as Summer
    # October to December as Fall
    # January to March as Winter
    seasonality = {
        "january": "winter",
        "february": "winter",
        "march": "winter",
        "april": "spring",
        "may": "spring",
        "june": "spring",
        "july": "summer",
        "august": "summer",
        "september": "summer",
        "october": "fall",
        "november": "fall",
        "december": "fall"
    }
    
    return seasonality.get(month.lower(), "")

#### 2.1: Data Exploration and Cleaning (194 Immigration Data)
Identify data quality issues, like missing values, duplicate data, etc. <br />
Document steps necessary to clean the data

In [99]:
!wc -l "./dataset/immigration_data_sample.csv"

1001 ./dataset/immigration_data_sample.csv


In [100]:
df_immigration_sample = load_csv('./dataset/immigration_data_sample.csv')

In [101]:
describe_dataframe(df_immigration_sample)

Processing: There are a total of [1000] rows
--
Processing: The following is the available [29] columns
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 29 columns):
Unnamed: 0    1000 non-null int64
cicid         1000 non-null float64
i94yr         1000 non-null float64
i94mon        1000 non-null float64
i94cit        1000 non-null float64
i94res        1000 non-null float64
i94port       1000 non-null object
arrdate       1000 non-null float64
i94mode       1000 non-null float64
i94addr       941 non-null object
depdate       951 non-null float64
i94bir        1000 non-null float64
i94visa       1000 non-null float64
count         1000 non-null float64
dtadfile      1000 non-null int64
visapost      382 non-null object
occup         4 non-null object
entdepa       1000 non-null object
entdepd       954 non-null object
entdepu       0 non-null float64
matflag       954 non-null object
biryear       1000 non-null float64
dtaddto       1000 n

As the each month migration data is stored in separate file, we will have to loop through each file to extract the data and add all the extracted data to become the final dataframe

There are more columns in `i94_jun16_sub.sas7bdat` compared to the rest or the dataset, hence we will extract the columns that are interested so that we could union all the dataframes

In [104]:
# sas_columns = ['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline', 'admnum', 'fltno', 'visatype']
sas_columns = ['i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94visa']
sas_file_pattern = r'../../data/**/*.sas7bdat'
spark = create_spark_session()
df_immigration = load_sas_files(spark, sas_file_pattern, sas_columns)

0
Processing: Loading ../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat into ./sas_data/sas_data_i94_apr16_sub
Processing: Loaded ../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat with [3096313] records with [10] columns
['i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94visa']
Processing: Assigning Dataset to Immigration
--
1
Processing: Loading ../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat into ./sas_data/sas_data_i94_sep16_sub
Processing: Loaded ../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat with [3733786] records with [10] columns
['i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94visa']
Processing: Union Dataset to Immigration
--
2
Processing: Loading ../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat into ./sas_data/sas_data_i94_nov16_sub
Processing: Loaded ../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat with [2914926] records with [10]

In [105]:
df_immigration.count() # 40790529

40790529

As the `arrdate` and `deptdate` is in SAS NUmeric Format, we will need to convert them back to a more readable format such as `yyyy-MM-dd`.
Based on finding, SAS Numeric Format seems to start on `1960-01-01` and the number represent the days from this date, hence we will convert this number to seconds and substract away 315619200 (Seconds from `1960-01-01` to `1970-01-01`) to derive the readable format

In [106]:
df_immigration_with_dates = convert_sas_date_to_readable_format(df_immigration, "arrdate", "arrival_dt", "yyyy-MM-dd")
df_immigration_with_dates = convert_sas_date_to_readable_format(df_immigration_with_dates, "arrdate", "arrival_month", "MMMMM")
df_immigration_with_dates = convert_sas_date_to_readable_format(df_immigration_with_dates, "depdate", "departure_dt", "yyyy-MM-dd")
df_immigration_with_dates.take(1)

Processing: Converting SAS Date [arrdate] to [arrival_dt] with date format (yyyy-MM-dd)
Processing: Converting SAS Date [arrdate] to [arrival_month] with date format (MMMMM)
Processing: Converting SAS Date [depdate] to [departure_dt] with date format (yyyy-MM-dd)


[Row(i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94visa=1.0, arrival_dt='2016-04-30', arrival_month='April', departure_dt='2016-05-08')]

In [107]:
# df_immigration_cleaned = df_immigration.withColumn("arrival_dt", \
#     from_unixtime(df_immigration.arrdate  * 86400 - 315619200, "yyyy-MM-dd") if df_immigration.arrdate is not None else None)
# df_immigration_cleaned = df_immigration.withColumn("arrival_month", \
#     from_unixtime(df_immigration.arrdate  * 86400 - 315619200, "MMMMM") if df_immigration.arrdate is not None else None)
# df_immigration_cleaned = df_immigration_cleaned.withColumn("departure_dt", \
#     from_unixtime(df_immigration_cleaned.depdate  * 86400 - 315619200, "yyyy-MM-dd") if df_immigration_cleaned.depdate is not None else None)

In [108]:
# convert_sas_date = udf(lambda x : pd.to_datetime(x  * 86400 - 315619200, unit = 's') if x is not None else None, DateType())
# df_immigration_cleaned = df_immigration.withColumn("arrival_dt", convert_sas_date(df_immigration.arrdate))
# df_immigration_cleaned = df_immigration_cleaned.withColumn("departure_dt", convert_sas_date(df_immigration_cleaned.depdate))

In [109]:
# convert_month_name = udf(lambda x : x.month_name() if x is not None else None, StringType())
# df_immigration_cleaned = df_immigration_cleaned.withColumn("arrival_month", convert_month_name(df_immigration_cleaned.arrival_dt))

As we wanted to see what kind of travellers there are, we will be consolidating the total travellers by their travelling purpose

In [110]:
df_immigration_agg = df_immigration_with_dates\
    .groupBy("arrival_month", "i94cit", "i94port", "i94addr")\
    .agg(count(col("*")).alias("total_travellers"),\
            count(when(col("i94visa") == "1.0", True)).alias("business_travellers"),\
            count(when(col("i94visa") == "2.0", True)).alias("leisure_travellers"),\
            count(when(col("i94visa") == "3.0", True)).alias("student_travellers"))\
    .orderBy(desc("total_travellers"))

In [111]:
df_immigration_agg.count()

810989

In [112]:
df_immigration_agg.take(1)

[Row(arrival_month='August', i94cit=209.0, i94port='HHW', i94addr='HI', total_travellers=146523, business_travellers=911, leisure_travellers=145060, student_travellers=552)]

In [113]:
# df_immigration_citizen.groupby('arrival_month').count().show()

In [114]:
# import pyspark.sql.functions as F
# df_immigration_citizen.groupBy(F.spark_partition_id()).count().show()

#### 2.2: Data Exploration and Cleaning  (World Temperature Data)
Identify data quality issues, like missing values, duplicate data, etc. <br />
Document steps necessary to clean the data

In [115]:
!wc -l "../../data2/GlobalLandTemperaturesByCity.csv"

8599213 ../../data2/GlobalLandTemperaturesByCity.csv


In [116]:
df_world_temp = load_csv("../../data2/GlobalLandTemperaturesByCity.csv")

In [117]:
describe_dataframe(df_world_temp)

Processing: There are a total of [8599212] rows
--
Processing: The following is the available [7] columns
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB
None
--
Processing: The following is the available statistics of the [7] columns
       AverageTemperature  AverageTemperatureUncertainty
count        8.235082e+06                   8.235082e+06
mean         1.672743e+01                   1.028575e+00
std          1.035344e+01                   1.129733e+00
min         -4.270400e+01                   3.400000e-02
25%          1.029900e+01                   3.370000e-01
50%          1.883100e+

As we wanted to study how the season affects the travellers pattern, we would need to tag the season to the weather data based on the `month` and `day`. We will be using the dates in [US Seasonality 2022](https://www.calendarr.com/united-states/seasons-of-the-year-in-the-united-states/#:~:text=Fall%3A%20Starts%20on%20March%2020,until%20March%2020%20or%2021.) as a gauge for the start and end of each season
* Spring: Starts on March 21, and ends on June 20 => We will set April to June as Spring
* Summer: Starts on June 21, and ends on September 22 => We will set July to September as Summer
* Fall: Starts on September 23, and ends on December 21 => We will set October to December as Fall
* Winter: Starts on December 21, and ends on March 20 => We will set January to March as Winter

In [118]:
df_us_temp=df_world_temp[df_world_temp['Country']=="United States"]

In [119]:
describe_dataframe(df_us_temp)

Processing: There are a total of [687289] rows
--
Processing: The following is the available [7] columns
<class 'pandas.core.frame.DataFrame'>
Int64Index: 687289 entries, 47555 to 8439246
Data columns (total 7 columns):
dt                               687289 non-null object
AverageTemperature               661524 non-null float64
AverageTemperatureUncertainty    661524 non-null float64
City                             687289 non-null object
Country                          687289 non-null object
Latitude                         687289 non-null object
Longitude                        687289 non-null object
dtypes: float64(2), object(5)
memory usage: 41.9+ MB
None
--
Processing: The following is the available statistics of the [7] columns
       AverageTemperature  AverageTemperatureUncertainty
count       661524.000000                   661524.00000
mean            13.949335                        1.08955
std              9.173337                        1.15068
min            -25.16300

In [120]:
df_us_temp_wo_na=housekeep_empty_and_duplicates(df_us_temp)

Processing: Before removing empty rows - [687289] rows
Processing: After removing empty rows - [661524] rows
--
Processing: Before removing duplicated rows - [661524] rows
Processing: After removing duplicated rows - [661524] rows
--


In [121]:
df_us_temp_wo_na['dt']=pd.to_datetime(df_us_temp_wo_na['dt'])

In [122]:
df_us_temp_wo_na['month']=df_us_temp_wo_na['dt'].dt.month
df_us_temp_wo_na['month_name']=df_us_temp_wo_na['dt'].dt.month_name()

In [123]:
df_us_temp_agg=df_us_temp_wo_na.groupby(['Country', 'City', 'month', 'month_name']).agg({'AverageTemperature': ['count','min','max','mean']})
df_us_temp_agg=df_us_temp_agg.reset_index(level=['Country','City', 'month', 'month_name']) # reset multi indexed to single indexed
df_us_temp_agg.columns = ['country', 'city', 'month', 'month_name', 'total_recordings', 'min_temperature', 'max_temperature', 'avg_temperature']

In [124]:
# df_us_temp_agg.head(5)

As we want to check whether the season affects the travellers travelling patterns, we will tag the `month` with a corresponding `season`

In [125]:
df_us_temp_agg['season']=df_us_temp_agg['month_name'].apply(lambda x : get_seasonality(x))

In [126]:
df_us_temp_agg.head(5)

Unnamed: 0,country,city,month,month_name,total_recordings,min_temperature,max_temperature,avg_temperature,season
0,United States,Abilene,1,January,193,-0.568,10.145,5.310202,winter
1,United States,Abilene,2,February,194,0.065,12.256,7.527418,winter
2,United States,Abilene,3,March,194,6.431,17.104,12.064526,winter
3,United States,Abilene,4,April,194,13.574,21.069,16.999969,spring
4,United States,Abilene,5,May,194,18.527,26.183,21.741758,spring


#### 2.3: Data Exploration and Cleaning  (US City Demographic Data)
Identify data quality issues, like missing values, duplicate data, etc. <br />
Document steps necessary to clean the data

In [127]:
!wc -l "./dataset/us-cities-demographics.csv"

2892 ./dataset/us-cities-demographics.csv


In [128]:
df_demographic = load_csv("./dataset/us-cities-demographics.csv", separator=';')

In [129]:
describe_dataframe(df_demographic)

Processing: There are a total of [2891] rows
--
Processing: The following is the available [12] columns
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB
None
--
Processing: The following is the available statistics of the [12] columns
        Median Age  Male Population  Female Population  Total Population  \

In [130]:
# df_demographic.head(5)

One other study we like to understand is which is the travellers favourite destination. For that, we would need to see if they prefered to travelled to populated areas or less dense area. To acheive that, we will be extracting some population information of the various cities in US.

In [131]:
df_demographic_filtered=df_demographic[['City','State', 'State Code', 'Median Age','Male Population','Female Population', 'Total Population', 'Race', 'Count']]
df_demographic_filtered.head(5)

Unnamed: 0,City,State,State Code,Median Age,Male Population,Female Population,Total Population,Race,Count
0,Silver Spring,Maryland,MD,33.8,40601.0,41862.0,82463,Hispanic or Latino,25924
1,Quincy,Massachusetts,MA,41.0,44129.0,49500.0,93629,White,58723
2,Hoover,Alabama,AL,38.5,38040.0,46799.0,84839,Asian,4759
3,Rancho Cucamonga,California,CA,34.5,88127.0,87105.0,175232,Black or African-American,24437
4,Newark,New Jersey,NJ,34.6,138040.0,143873.0,281913,White,76402


In [132]:
# describe_dataframe(df_demographic_filtered)

In [133]:
# df_demographic_filtered[df_demographic_filtered['State Code']=='CA'].sort_values(by=['State','City'])

There is one city (The Villages) where the female and male population is not populated, however as we are more interested in the total population and the resident race's in the area, we will not be dropping this rows.

In [134]:
df_demographic_filtered[df_demographic_filtered['Male Population'].isna()]

Unnamed: 0,City,State,State Code,Median Age,Male Population,Female Population,Total Population,Race,Count
333,The Villages,Florida,FL,70.5,,,72590,Hispanic or Latino,1066
449,The Villages,Florida,FL,70.5,,,72590,Black or African-American,331
1437,The Villages,Florida,FL,70.5,,,72590,White,72211


In [135]:
df_demographic_wo_duplicates=housekeep_rows_with_duplicated_values(df_demographic_filtered)

Processing: Before removing duplicated rows - [2891] rows
Processing: After removing duplicated rows - [2891] rows
--


In [136]:
df_demographic_wo_duplicates.columns

Index(['City', 'State', 'State Code', 'Median Age', 'Male Population',
       'Female Population', 'Total Population', 'Race', 'Count'],
      dtype='object')

In [137]:
df_demographic_agg=df_demographic_wo_duplicates.groupby(['City', 'State', 'State Code', 'Median Age', 'Male Population',
       'Female Population', 'Total Population']).agg({'Race': ['count'], 'Count':['sum']})
df_demographic_agg=df_demographic_agg.reset_index()
df_demographic_agg.columns=['city','state','state_code','median_age','total_male','total_female','total_population','total_race','total_race_count']

In [138]:
df_demographic_agg.head(5)

Unnamed: 0,city,state,state_code,median_age,total_male,total_female,total_population,total_race,total_race_count
0,Abilene,Texas,TX,31.3,65212.0,60664.0,125876,5,147900
1,Akron,Ohio,OH,38.1,96886.0,100667.0,197553,5,210305
2,Alafaya,Florida,FL,33.5,39504.0,45760.0,85264,4,115476
3,Alameda,California,CA,41.4,37747.0,40867.0,78614,5,89174
4,Albany,Georgia,GA,33.3,31695.0,39414.0,71109,5,73478


#### 2.4: Data Exploration and Cleaning (Airport Code Data)
Identify data quality issues, like missing values, duplicate data, etc. <br />
Document steps necessary to clean the data

In [139]:
!wc -l "./dataset/airport-codes_csv.csv"

55076 ./dataset/airport-codes_csv.csv


In [140]:
df_airport = load_csv("./dataset/airport-codes_csv.csv")

In [141]:
describe_dataframe(df_airport)

Processing: There are a total of [55075] rows
--
Processing: The following is the available [12] columns
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB
None
--
Processing: The following is the available statistics of the [12] columns
       elevation_ft
count  48069.000000
mean    1240.789677
std     1602.363459
min    -1266.000000
25%      205.000000
50%      718.000000
75%     1497.000000
max    22000.000000


In [142]:
# df_airport.head(5)

As we are more interested in travellers entering, we will only need those airport that are in US. With the airport information, we will then be able to see which airport is the most busiest at different period of the year

In [143]:
# Check any rows that ident is different from local_code
df_airport_ident_checks=df_airport[(df_airport['local_code'].notnull()) & (df_airport['local_code'] != df_airport['ident'])]
df_airport_ident_checks.head(10)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
221,03MT,small_airport,Cascade Field,3580.0,,US,US-MT,Cascade,3MT7,,3MT7,"-111.71748, 47.267327"
501,08IN,seaplane_base,Winona Lake Seaplane Base,812.0,,US,US-IN,Winona Lake,02D,,02D,"-85.830556, 41.223056"
580,09TA,small_airport,Lazy G Bar Ranch Airport,923.0,,US,US-TX,Decatur,,,09T,"-97.497002, 33.282101"
636,0C7,small_airport,Grandpas' Farm Mendota Airport,727.0,,US,US-IL,Mendota,IL22,,IL22,"-89.132599, 41.521999"
693,0D9,small_airport,Air Park North,1170.0,,US,US-MI,Alba,MI30,,MI30,"-84.9587, 44.958"
754,0ID6,heliport,Steele Memorial Heliport,4004.0,,US,US-ID,Salmon,67ID,,67ID,"-113.891423, 45.173791"
813,0L5,small_airport,Goldfield Airport,5680.0,,US,US-NV,Goldfield,NV50,,NV50,"-117.236368, 37.722751"
863,0MI1,small_airport,Sugar Springs Airpark,940.0,,US,US-MI,Gladwin,,,5M6,"-84.4375, 44.140301"
1019,0PA0,small_airport,Philadelphia Gliderport,670.0,,US,US-PA,Hilltown,3PA2,,3PA2,"-75.248125, 40.331227"
1195,0Z3,seaplane_base,Shannons Pond Seaplane Base,80.0,,US,US-AK,Dillingham,AA15,,AA15,"-158.577191, 59.058998"


In [144]:
df_airport_filtered=df_airport[['ident', 'gps_code', 'iata_code', 'local_code', 'name', 'type', 'iso_country', 'iso_region', 'municipality',  'coordinates']]

In [145]:
df_us_airport = df_airport_filtered[df_airport_filtered['iso_country']=='US']

In [146]:
describe_dataframe(df_us_airport)

Processing: There are a total of [22757] rows
--
Processing: The following is the available [10] columns
<class 'pandas.core.frame.DataFrame'>
Int64Index: 22757 entries, 0 to 54896
Data columns (total 10 columns):
ident           22757 non-null object
gps_code        20984 non-null object
iata_code       2019 non-null object
local_code      21236 non-null object
name            22757 non-null object
type            22757 non-null object
iso_country     22757 non-null object
iso_region      22757 non-null object
municipality    22655 non-null object
coordinates     22757 non-null object
dtypes: object(10)
memory usage: 1.9+ MB
None
--
Processing: The following is the available statistics of the [10] columns
        ident gps_code iata_code local_code                        name  \
count   22757    20984      2019      21236                       22757   
unique  22757    20932      2014      21170                       21245   
top      ID00     14LA       AUS        6X8  Memorial Hospi

In [147]:
df_us_airport[df_us_airport['municipality'].isnull()]

Unnamed: 0,ident,gps_code,iata_code,local_code,name,type,iso_country,iso_region,municipality,coordinates
1544,15SD,15SD,,,Watertown / Brownlee Heliport,heliport,US,US-SD,,"-97.1080899239, 44.883264878199995"
2452,21ID,21ID,,,Nordman / Phillabaum Heliport,heliport,US,US-ID,,"-116.871174574, 48.631483378700004"
4401,3ME7,3ME7,,,Peru / Destiny Cove SPB,seaplane_base,US,US-ME,,"-70.396957, 44.460597"
7653,6XA4,6XA4,,,Zadow Airstrip,small_airport,US,US-TX,,"-95.954353809, 29.991738550900003"
7887,74xa,74XA,,,Gun Barrel City Airpark,small_airport,US,US-TX,,"-96.1456650496, 32.3551499558"
8082,79ID,79ID,,,Kooskia (Clear Creek Int) Airport,small_airport,US,US-ID,,"-115.869691372, 46.0488642914"
8114,79WT,79WT,,,Ellensburg (Rotor Ranch) Airport,small_airport,US,US-WA,,"-120.589778423, 47.091426059499994"
9055,8FA4,8FA4,,,Samsula / Coe Field,small_airport,US,US-FL,,"-81.1328315735, 29.0102045831"
9855,99XA,99XA,,,Briggs / Skotz Airfield,small_airport,US,US-TX,,"-98.0037117004, 30.863976076700002"
11676,AUS,KAUS,AUS,,Austin Robert Mueller Municipal,closed,US,US-TX,,"-97.6997852325, 30.2987223546"


Removing Airports with `iso_region` with value `US-U-A` as the airport name doesn't seems quite right and there is no state that is `U-A`

In [148]:
df_us_airport_filtered=df_us_airport[df_us_airport['iso_region']!='US-U-A']

In [149]:
df_us_airport_filtered['state']=df_us_airport_filtered['iso_region'].apply(lambda x : '-'.join(x.split('-')[1:]))

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.


In [150]:
df_us_airport_filtered['state'].value_counts().sort_index()

AK     829
AL     361
AR     406
AZ     359
CA    1088
CO     505
CT     164
DC      21
DE      57
FL     967
GA     522
HI      64
IA     338
ID     315
IL     902
IN     697
KS     439
KY     257
LA     592
MA     257
MD     257
ME     208
MI     549
MN     569
MO     578
MS     281
MT     331
NC     473
ND     321
NE     309
NH     179
NJ     442
NM     198
NV     156
NY     668
OH     799
OK     537
OR     492
PA     918
RI      35
SC     217
SD     211
TN     356
TX    2277
UT     170
VA     505
VT     102
WA     578
WI     624
WV     140
WY     127
Name: state, dtype: int64

Splitting `coordinates` data as it is comma separated, and by right database should stored each data in a column for ease of query and retreival

In [151]:
df_us_airport_filtered['coordinates_x']=df_us_airport_filtered['coordinates'].apply(lambda x : x.split(',')[0])
df_us_airport_filtered['coordinates_y']=df_us_airport_filtered['coordinates'].apply(lambda x : x.split(',')[1])

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


In [152]:
# df_us_airport_filtered.head(5)

In [153]:
df_us_airport_filtered.columns

Index(['ident', 'gps_code', 'iata_code', 'local_code', 'name', 'type',
       'iso_country', 'iso_region', 'municipality', 'coordinates', 'state',
       'coordinates_x', 'coordinates_y'],
      dtype='object')

In [154]:
df_us_airport_agg = df_us_airport_filtered[['ident', 'gps_code', 'iata_code', 'local_code', 'name', 'type', 'iso_country', 'state', 'municipality', 'coordinates_x', 'coordinates_y']]
df_us_airport_agg.columns=['id', 'gps_code', 'iata_code', 'local_code', 'name', 'type', 'country', 'state', 'city', 'coordinates_x', 'coordinates_y']
df_us_airport_agg.head(5)

Unnamed: 0,id,gps_code,iata_code,local_code,name,type,country,state,city,coordinates_x,coordinates_y
0,00A,00A,,00A,Total Rf Heliport,heliport,US,PA,Bensalem,-74.93360137939453,40.07080078125
1,00AA,00AA,,00AA,Aero B Ranch Airport,small_airport,US,KS,Leoti,-101.473911,38.704022
2,00AK,00AK,,00AK,Lowell Field,small_airport,US,AK,Anchor Point,-151.695999146,59.94919968
3,00AL,00AL,,00AL,Epps Airpark,small_airport,US,AL,Harvest,-86.77030181884766,34.86479949951172
4,00AR,,,,Newport Hospital & Clinic Heliport,closed,US,AR,Newport,-91.254898,35.6087


#### 2.5: Data Exploration and Cleaning (SAS Data Label)
Identify data quality issues, like missing values, duplicate data, etc. <br />
Document steps necessary to clean the data <br />

Based on the `194_SAS_Labels_Descriptions.SAS`, addtional data is extracted to complement the immigration data
* `i94cntyl_traveller_country.map` - This is mapping file for the fields `I94CIT` and `I94RES`
* `i94prtl_arrival_airport_city.map` - This is mapping file for the fields `I94PORT`
* `i94model_mode_of_transport.map` - This is mapping file for the fields `I94MODE`
* `i94addrl_arrival_airport_state.map` - This is mapping file for the fields `I94ADDR`
* `i94visa_travelling_purpose.map` - This is mapping file for the fields `I94VISA`

##### 2.5.1: Data Exploration and Cleaning (Traveller Country)

In [155]:
!wc -l "./dataset/i94cntyl_traveller_country.map"

288 ./dataset/i94cntyl_traveller_country.map


In [156]:
df_traveller_country=load_csv('./dataset/i94cntyl_traveller_country.map', separator='=', header=None)
df_traveller_country.columns=['country_code','country_name']
df_traveller_country['country_name']=replace_series_value(df_traveller_country['country_name'], "'")
df_traveller_country.head(5)

Processing: Replacing Series Value ['] to []


Unnamed: 0,country_code,country_name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


##### 2.5.2: Data Exploration and Cleaning (Arrival Airport City)

In [157]:
!wc -l "./dataset/i94prtl_arrival_airport_city.map"

659 ./dataset/i94prtl_arrival_airport_city.map


In [158]:
df_airport_city=load_csv('./dataset/i94prtl_arrival_airport_city.map', separator='=', header=None)
df_airport_city.columns=['city_code','city_name_with_state']
df_airport_city['city_code']=replace_series_value(df_airport_city['city_code'], "'|\t")
df_airport_city['city_name_with_state']=replace_series_value(df_airport_city['city_name_with_state'], "'|\t")
df_airport_city['city_name']=df_airport_city['city_name_with_state'].apply(lambda x : x.split(',')[0])
df_airport_city['state_code']=df_airport_city['city_name_with_state'].apply(lambda x : x.split(',')[-1])
df_airport_city.head(5)

Processing: Replacing Series Value ['|	] to []
Processing: Replacing Series Value ['|	] to []


Unnamed: 0,city_code,city_name_with_state,city_name,state_code
0,ALC,"ALCAN, AK",ALCAN,AK
1,ANC,"ANCHORAGE, AK",ANCHORAGE,AK
2,BAR,"BAKER AAF - BAKER ISLAND, AK",BAKER AAF - BAKER ISLAND,AK
3,DAC,"DALTONS CACHE, AK",DALTONS CACHE,AK
4,PIZ,"DEW STATION PT LAY DEW, AK",DEW STATION PT LAY DEW,AK


##### 2.5.3: Data Exploration and Cleaning (Mode of Transport)

In [159]:
!wc -l "./dataset/i94model_mode_of_transport.map"

3 ./dataset/i94model_mode_of_transport.map


In [160]:
df_mode=load_csv('./dataset/i94model_mode_of_transport.map', separator='=', header=None)
df_mode.columns=['mode_code','mode_name']
df_mode['mode_name']=replace_series_value(df_mode['mode_name'], "'|\t")
df_mode.head(5)

Processing: Replacing Series Value ['|	] to []


Unnamed: 0,mode_code,mode_name
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


##### 2.5.4: Data Exploration and Cleaning (Arrival Airport State)

In [161]:
!wc -l "./dataset/i94addrl_arrival_airport_state.map"

54 ./dataset/i94addrl_arrival_airport_state.map


In [162]:
df_airport_state=load_csv('./dataset/i94addrl_arrival_airport_state.map', separator='=', header=None)
df_airport_state.columns=['state_code','state_name']
df_airport_state['state_code']=replace_series_value(df_airport_state['state_code'], "'|\t")
df_airport_state['state_name']=replace_series_value(df_airport_state['state_name'], "'|\t")
df_airport_state.head(5)

Processing: Replacing Series Value ['|	] to []
Processing: Replacing Series Value ['|	] to []


Unnamed: 0,state_code,state_name
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


##### 2.5.5: Data Exploration and Cleaning (Travelling PUrpose)

In [163]:
df_purpose=load_csv('./dataset/i94visa_travelling_purpose.map', separator='=', header=None)
df_purpose.columns=['purpose_code','purpose_name']
# df_purpose['state_code']=replace_series_value(df_purpose['state_code'], "'|\t")
# df_purpose['state_name']=replace_series_value(df_purpose['state_name'], "'|\t")
df_purpose.head(5)

Unnamed: 0,purpose_code,purpose_name
0,1,Business
1,2,Pleasure
2,3,Student


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

The tables are broken down into `fact` and `dimension` tables. 
The database schema make use of the `snowflake schema` design. 
This design is chosen as it will allows the simplified on the query and faster aggregation.

The following are the `fact` table:
1. Arrival Summary Table 

<br/>

The following is the `dimension` tables:
1. Arrival State Code Table 
2. Arrival City Code Table 
3. Arrival Country Code Table 
4. Season Table 
5. Temperature Table 
6. Population Table 
7. Airport Table 

<br/>

![Data_Schema](./screenshots/database_schema.png)

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

The following are the necessary steps:
1. Create the `fact` and `dimension` tables in `PostgreSQL`
1. Using `Spark`, load the `194 Immigration` data and perform cleaning and ingesting to `PostgreSQL`
2. Using `Pandas`, load the `World Temperature`, `US City Demographic`, `Airport Code` data and perform cleaning and ingesting to `PostgreSQL`
3. Once all the data are ingested into their respective tables, run a check on the total count of rows in each table to verify that the data is ingested correctly
4. Finally, once all the data are verified, we can run a few queries to check on the quality of the data

![Data_Pipeline](./screenshots/data_pipeline.png)

### Step 4: Run Pipelines to Model the Data 

#### Utility

In [164]:
def create_database_connection(database_host, database_name, database_user, database_pass):
    conn = psycopg2.connect(f"host={database_host} dbname={database_name} user={database_user} password={database_pass}")
    return conn

In [165]:
def convert_dataframe_to_tuples(insert_dataframe):
    if len(insert_dataframe) == 0:
        return (),""
    
    insert_tuples = [tuple(x) for x in insert_dataframe.values]
    insert_columns = ','.join(list(insert_dataframe.columns))
    return insert_tuples, insert_columns

In [166]:
def save_dataframe_to_database(insert_query, insert_tuples):
    total_rows = len(insert_tuples)
    print(f"Executing: {insert_query}")
    print(f"Executing: Inserting [{total_rows}] rows")
    try:
        print(f"Executing: Connecting to [{DATABASE_NAME}] with {DATABASE_USER}@{DATABASE_HOST}")
        conn = create_database_connection(database_host=DATABASE_HOST, database_name=DATABASE_NAME, database_user=DATABASE_USER, database_pass=DATABASE_PASS)
        cur = conn.cursor()
        # cur.execute(season_table_insert, df_season_insert.values[0])
        cur.executemany(insert_query, insert_tuples)
        conn.commit()
    except Exception as e:
        print(f"Error saving dataframe to database: {e}")
    finally:
        conn.close()
        print("--")

In [208]:
arrival_summary_table_insert = ("""
INSERT INTO arrival_summary (arrival_month ,arrival_state ,arrival_city ,traveller_citizenship ,
    total_travellers ,business_travellers ,leisure_travellers ,student_travellers)
                 VALUES (%s, %s, %s, %s, %s, %s, %s, %s);
""")

def load_arrival_summary(df_us_travellers):
    total_rows = df_us_travellers.count()
    if total_rows <= 0:
        print("Executing: Loading Arrival Summary Fails - dataframe has no data")
        return
    
    !echo "Executing: cat ./immigrate_data/*.csv | psql {DATABASE_POSTGRESQL_URL} \
        -c 'COPY arrival_summary(arrival_month, arrival_state, arrival_city, traveller_citizenship, total_travellers ,business_travellers ,leisure_travellers ,student_travellers) from stdin CSV'"
    print(f"Executing: Inserting [{total_rows}] rows")
    
    # rename frame to align with database column names
    spark = create_spark_session()
    df_us_travellers_renamed = df_us_travellers.filter(col('i94cit').isNotNull())\
                    .select(col('arrival_month').alias('arrival_month'),
                                    col('i94addr').alias('arrival_state'),
                                    col('i94port').alias('arrival_city'),
                                    col('i94cit').cast(IntegerType()).alias('traveller_citizenship'),
                                    col('total_travellers').alias('total_travellers'),
                                    col('business_travellers').alias('business_travellers'),
                                    col('leisure_travellers').alias('leisure_travellers'),
                                    col('student_travellers').alias('student_travellers'))
    
    # output dataframe to csv to be loaded with COPY command
    !rm -rfv "./immigrate_data"
    df_us_travellers_renamed.write.csv("immigrate_data")
    
    # load csv to postgresql with COPY command
    ! echo "Executing: Connecting to [{DATABASE_NAME}] with {DATABASE_USER}@{DATABASE_HOST}"
    ! cat ./immigrate_data/*.csv | psql "{DATABASE_POSTGRESQL_URL}" \
        -c 'COPY arrival_summary(arrival_month, arrival_state, arrival_city, traveller_citizenship, total_travellers ,business_travellers ,leisure_travellers ,student_travellers) from stdin CSV'

In [190]:
season_table_insert = ("""
    INSERT INTO season (season_month, season)
                     VALUES (%s, %s)
    ON CONFLICT(season_month) 
    DO update set 
        season = EXCLUDED.season;
    """)

temperature_table_insert = ("""
INSERT INTO temperature (city ,measurement_month ,total_measurements ,min_temperature ,max_temperature ,avg_temperature)
                 VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT(city, measurement_month) 
DO update set 
    total_measurements = EXCLUDED.total_measurements,
    min_temperature = EXCLUDED.min_temperature,
    max_temperature = EXCLUDED.max_temperature,
    avg_temperature = EXCLUDED.avg_temperature;
""")

def load_season_and_temperature(df_us_temperature):
    if len(df_us_temperature) <= 0:
        print("Executing: Loading Season and Temperature Fails - dataframe has no data")
        return
    
    df_season_insert = df_us_temperature[['month_name', 'season']].drop_duplicates()
    season_tuples, season_columns  = convert_dataframe_to_tuples(df_season_insert)
    save_dataframe_to_database(season_table_insert, season_tuples)
    
    df_temperature_insert = df_us_temperature[['city','month_name', 'total_recordings', 'min_temperature', 'max_temperature', 'avg_temperature']]
    temperature_tuples, temperature_columns  = convert_dataframe_to_tuples(df_temperature_insert)
    save_dataframe_to_database(temperature_table_insert, temperature_tuples)

In [191]:
population_table_insert = ("""
INSERT INTO population (state ,city ,median_age ,male_population ,female_population ,total_population ,race_diversity)
                 VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT(state, city) 
DO update set 
    median_age = EXCLUDED.median_age,
    male_population = EXCLUDED.male_population,
    female_population = EXCLUDED.female_population,
    total_population = EXCLUDED.total_population,
    race_diversity = EXCLUDED.race_diversity;
""")

def load_population(df_us_population):
    if len(df_us_population) <= 0:
        print("Executing: Loading Population Fails - dataframe has no data")
        return
    
    df_population_insert = df_us_population[['state_code', 'city', 'median_age', 'total_male',
       'total_female', 'total_population', 'total_race']].drop_duplicates()
    population_tuples, population_columns  = convert_dataframe_to_tuples(df_population_insert)
    save_dataframe_to_database(population_table_insert, population_tuples)

In [192]:
airport_table_insert = ("""
INSERT INTO airport (airport_id ,gps_code ,iata_code ,local_code ,airport_name ,airport_description ,airport_state ,airport_city ,airport_coordinates_x ,airport_coordinates_y)
                 VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT(airport_id) 
DO update set 
    gps_code = EXCLUDED.gps_code,
    iata_code = EXCLUDED.iata_code,
    local_code = EXCLUDED.local_code,
    airport_name = EXCLUDED.airport_name,
    airport_description = EXCLUDED.airport_description,
    airport_state = EXCLUDED.airport_state,
    airport_city = EXCLUDED.airport_city,
    airport_coordinates_x = EXCLUDED.airport_coordinates_x,
    airport_coordinates_y = EXCLUDED.airport_coordinates_y;
""")

def load_airport(df_us_airport):
    if len(df_us_airport) <= 0:
        print("Executing: Loading Airport Fails - dataframe has no data")
        return
    
    df_airport_insert = df_us_airport[['id', 'gps_code', 'iata_code', 'local_code',
       'name', 'type', 'state', 'city', 'coordinates_x', 'coordinates_y']].drop_duplicates()
    airport_tuples, airport_columns  = convert_dataframe_to_tuples(df_airport_insert)
    save_dataframe_to_database(airport_table_insert, airport_tuples)

In [193]:
arrival_country_table_insert = ("""
INSERT INTO arrival_country_code (country_code, country_name)
                 VALUES (%s, %s)
ON CONFLICT(country_code) 
DO update set 
    country_name = EXCLUDED.country_name;
""")

def load_arrival_country(df_arrival_country):
    if len(df_arrival_country) <= 0:
        print("Executing: Loading Arrival Country Fails - dataframe has no data")
        return
    
    df_arrival_country_insert = df_arrival_country[['country_code', 'country_name']]
    arrival_country_tuples, arrival_country_columns  = convert_dataframe_to_tuples(df_arrival_country_insert)
    save_dataframe_to_database(arrival_country_table_insert, arrival_country_tuples)

In [194]:
arrival_state_table_insert = ("""
INSERT INTO arrival_state_code (state_code, state_name)
                 VALUES (%s, %s)
ON CONFLICT(state_code) 
DO update set 
    state_name = EXCLUDED.state_name;
""")

def load_arrival_state(df_arrival_state):
    if len(df_arrival_state) <= 0:
        print("Executing: Loading Arrival State Fails - dataframe has no data")
        return
    
    df_arrival_state_insert = df_arrival_state[['state_code', 'state_name']]
    arrival_state_tuples, arrival_state_columns  = convert_dataframe_to_tuples(df_arrival_state_insert)
    save_dataframe_to_database(arrival_state_table_insert, arrival_state_tuples)

In [195]:
arrival_city_table_insert = ("""
INSERT INTO arrival_city_code (city_code ,city_name ,state_code)
                 VALUES (%s, %s, %s)
ON CONFLICT(city_code) 
DO update set 
    city_name = EXCLUDED.city_name,
    state_code = EXCLUDED.state_code;
""")

def load_arrival_city(df_arrival_city):
    if len(df_arrival_city) <= 0:
        print("Executing: Loading Arrival City Fails - dataframe has no data")
        return
    
    df_arrival_city_insert = df_arrival_city[['city_code', 'city_name', 'state_code']]
    arrival_city_tuples, arrival_city_columns  = convert_dataframe_to_tuples(df_arrival_city_insert)
    save_dataframe_to_database(arrival_city_table_insert, arrival_city_tuples)

#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### 4.1.1 Create Tables

In [209]:
!python3 create_tables.py

Processing: Connecting to studentdb@127.0.0.1
Executing: Dropping Table - DROP table if exists arrival_summary
Executing: Dropping Table - DROP table if exists arrival_country_code
Executing: Dropping Table - DROP table if exists arrival_state_code
Executing: Dropping Table - DROP table if exists arrival_city_code
Executing: Dropping Table - DROP table if exists season
Executing: Dropping Table - DROP table if exists temperature
Executing: Dropping Table - DROP table if exists population
Executing: Dropping Table - DROP table if exists airport
Executing: Creating Table - 
    CREATE TABLE IF NOT EXISTS arrival_summary(
        arrival_id SERIAL primary key,
        arrival_month varchar(45) NOT NULL,
        arrival_state varchar(45) NULL,
        arrival_city varchar(45) NULL,
        traveller_citizenship int NULL,
        total_travellers int NULL,
        business_travellers int NULL,
        leisure_travellers int NULL,
        student_travellers int NULL,
        UNIQUE(arrival_i

##### 4.1.2 Ingest Data Into Tables

In [175]:
# load arrival_summary - df_immigration_agg
# load arrival_state_code - df_airport_state
# load arrival_city_code - df_airport_city
# load arrival_country_code - df_traveller_country
# load season - df_us_temp_agg
# load temperature - df_us_temp_agg
# load population - df_demographic_agg
# load airport - df_us_airport_agg

In [236]:
df_airport_city.columns[:]

Index(['city_code', 'city_name_with_state', 'city_name', 'state_code'], dtype='object')

In [210]:
load_arrival_summary(df_immigration_agg)

Executing: cat ./immigrate_data/*.csv | psql postgresql://student:student@127.0.0.1:5432/travellers         -c 'COPY arrival_summary(arrival_month, arrival_state, arrival_city, traveller_citizenship, total_travellers ,business_travellers ,leisure_travellers ,student_travellers) from stdin CSV'
Executing: Inserting [810989] rows
removed './immigrate_data/.part-00002-cfe74b34-56c7-46f1-a919-a3776c5ad198-c000.csv.crc'
removed './immigrate_data/part-00019-cfe74b34-56c7-46f1-a919-a3776c5ad198-c000.csv'
removed './immigrate_data/part-00046-cfe74b34-56c7-46f1-a919-a3776c5ad198-c000.csv'
removed './immigrate_data/.part-00010-cfe74b34-56c7-46f1-a919-a3776c5ad198-c000.csv.crc'
removed './immigrate_data/.part-00001-cfe74b34-56c7-46f1-a919-a3776c5ad198-c000.csv.crc'
removed './immigrate_data/part-00027-cfe74b34-56c7-46f1-a919-a3776c5ad198-c000.csv'
removed './immigrate_data/part-00017-cfe74b34-56c7-46f1-a919-a3776c5ad198-c000.csv'
removed './immigrate_data/.part-00007-cfe74b34-56c7-46f1-a919-a3776

In [211]:
load_season_and_temperature(df_us_temp_agg)

Executing: 
    INSERT INTO season (season_month, season)
                     VALUES (%s, %s)
    ON CONFLICT(season_month) 
    DO update set 
        season = EXCLUDED.season;
    
Executing: Inserting [12] rows
Executing: Connecting to [travellers] with student@127.0.0.1
--
Executing: 
INSERT INTO temperature (city ,measurement_month ,total_measurements ,min_temperature ,max_temperature ,avg_temperature)
                 VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT(city, measurement_month) 
DO update set 
    total_measurements = EXCLUDED.total_measurements,
    min_temperature = EXCLUDED.min_temperature,
    max_temperature = EXCLUDED.max_temperature,
    avg_temperature = EXCLUDED.avg_temperature;

Executing: Inserting [2976] rows
Executing: Connecting to [travellers] with student@127.0.0.1
--


In [212]:
load_population(df_demographic_agg)

Executing: 
INSERT INTO population (state ,city ,median_age ,male_population ,female_population ,total_population ,race_diversity)
                 VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT(state, city) 
DO update set 
    median_age = EXCLUDED.median_age,
    male_population = EXCLUDED.male_population,
    female_population = EXCLUDED.female_population,
    total_population = EXCLUDED.total_population,
    race_diversity = EXCLUDED.race_diversity;

Executing: Inserting [595] rows
Executing: Connecting to [travellers] with student@127.0.0.1
--


In [213]:
load_airport(df_us_airport_agg)

Executing: 
INSERT INTO airport (airport_id ,gps_code ,iata_code ,local_code ,airport_name ,airport_description ,airport_state ,airport_city ,airport_coordinates_x ,airport_coordinates_y)
                 VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT(airport_id) 
DO update set 
    gps_code = EXCLUDED.gps_code,
    iata_code = EXCLUDED.iata_code,
    local_code = EXCLUDED.local_code,
    airport_name = EXCLUDED.airport_name,
    airport_description = EXCLUDED.airport_description,
    airport_state = EXCLUDED.airport_state,
    airport_city = EXCLUDED.airport_city,
    airport_coordinates_x = EXCLUDED.airport_coordinates_x,
    airport_coordinates_y = EXCLUDED.airport_coordinates_y;

Executing: Inserting [22747] rows
Executing: Connecting to [travellers] with student@127.0.0.1
--


In [214]:
load_arrival_country(df_traveller_country)

Executing: 
INSERT INTO arrival_country_code (country_code, country_name)
                 VALUES (%s, %s)
ON CONFLICT(country_code) 
DO update set 
    country_name = EXCLUDED.country_name;

Executing: Inserting [289] rows
Executing: Connecting to [travellers] with student@127.0.0.1
--


In [215]:
load_arrival_state(df_airport_state)

Executing: 
INSERT INTO arrival_state_code (state_code, state_name)
                 VALUES (%s, %s)
ON CONFLICT(state_code) 
DO update set 
    state_name = EXCLUDED.state_name;

Executing: Inserting [55] rows
Executing: Connecting to [travellers] with student@127.0.0.1
--


In [216]:
load_arrival_city(df_airport_city)

Executing: 
INSERT INTO arrival_city_code (city_code ,city_name ,state_code)
                 VALUES (%s, %s, %s)
ON CONFLICT(city_code) 
DO update set 
    city_name = EXCLUDED.city_name,
    state_code = EXCLUDED.state_code;

Executing: Inserting [660] rows
Executing: Connecting to [travellers] with student@127.0.0.1
--


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

##### 4.2.1 Check Total Data Size

In [218]:
!python3 check_tables.py

Executing: Checking Table - 
    select count(*) from arrival_summary

Executing: found [808585] records
--
Executing: Checking Table - 
    select count(*) from arrival_country_code

Executing: found [289] records
--
Executing: Checking Table - 
    select count(*) from arrival_state_code

Executing: found [55] records
--
Executing: Checking Table - 
    select count(*) from arrival_city_code

Executing: found [660] records
--
Executing: Checking Table - 
    select count(*) from season

Executing: found [12] records
--
Executing: Checking Table - 
    select count(*) from temperature

Executing: found [2976] records
--
Executing: Checking Table - 
    select count(*) from population

Executing: found [595] records
--
Executing: Checking Table - 
    select count(*) from airport

Executing: found [22747] records
--


#### 4.3 Data dictionary
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

##### 4.3.1: Arrival Summary Table

The `Arrival Summary` tables consists of information about the type of visitors to the country

| Name | Description | Type |
| --- | --- | --- |
| arrival_id  | auto increment key | integer |
| arrival_month   | month that the travellers arrived | varchar |
| arrival_state   | state that the travellers arrived | varchar |
| arrival_city   | city that the travellers arrived | varchar |
| traveller_citizenship   | country where the travellers are citizen of | integer |
| total_travellers   | total travellers for the month | integer |
| business_travellers   | total travellers for the month visiting for business | integer |
| leisure_travellers   | total travellers for the month visiting for leisure | integer |
| student_travellers   | total travellers for the month visiting for study | integer |

##### 4.3.2: Arrival Country Code Table

The `Arrival Country Code` tables consists of information about the country code and its corresponding country name

| Name | Description | Type |
| --- | --- | --- |
| country_id  | auto increment key | integer |
| country_code   | code representation for the country | integer  |
| country_name   | name of the country | varchar |

##### 4.3.3: Arrival State Code Table

The `Arrival State Code` tables consists of information about the state code and its corresponding state name

| Name | Description | Type |
| --- | --- | --- |
| state_id  | auto increment key | integer |
| state_code   | code representation for the state | varchar |
| state_name   | name of the state | varchar |

##### 4.3.4: Arrival City Code Table

The `Arrival City Code` tables consists of information about the city code and its corresponding city name

| Name | Description | Type |
| --- | --- | --- |
| city_id  | auto increment key | integer |
| city_code   | code representation for the city | varchar |
| city_name   | name of the city | varchar |

##### 4.3.5: Season Table

The `Season` table consists of information about the season of the respective months

| Name | Description | Type |
| --- | --- | --- |
| season_id  | auto increment key | integer |
| season_month   | month of the season | varchar |
| season   | the season of the month | varchar |

##### 4.3.6: Temperature Table

The `Temperature` table consists of information regarding the temperature measured across the years for the respective cities

| Name | Description | Type |
| --- | --- | --- |
| temperature_id  | auto increment key | integer |
| city | city that the temperature was taken | varchar |
| measurement_month | month that the temperature was taken | varchar |
| total_measurements | total number of temperature recordings that have taken for the month over the years | varchar |
| min_temperature | minimum temperature recorded for the city | decimal |
| max_temperature | maximum temperature recorded for the city | decimal |
| avg_temperature | average temperature recorded for the city | decimal |

##### 4.3.7: Population Table

The `Population` table consists of information regarding the total number of people residing in the respective cities

| Name | Description | Type |
| --- | --- | --- |
| population_id  | auto increment key | integer |
| state | state that the population belongs to | varchar |
| city | city that the population belongs to | varchar |
| median_age | median age of the city | decimal |
| male_population | total males in the city | integer |
| female_population | total females in the city | integer |
| total_population | total population in the city | integer |
| race_diversity | different type of race in the city | integer |

##### 4.3.8: Airport Table

The `Airport` table consists of information regarding the airports around US

| Name | Description | Type |
| --- | --- | --- |
| airport_id | identifier to identify the airport | varchar |
| gps_code | gps code repesentation for the airport | varchar |
| iata_code | iata code repesentation for the airport | varchar |
| local_code | local code repesentation for the airport | varchar |
| airport_name | name of the airport | varchar |
| airport_description | type of airport | varchar |
| airport_state | state which the airport is situated | varchar |
| airport_city | city which the airport is situated | varchar |
| airport_coordinates_x | coordinates which the airport is situated | decimal |
| airport_coordinates_y | coordinates which the airport is situated | decimal |

#### 4.4 Use Case

##### 4.4.1: Use Case #1 - Top 5 Countries Visitor

Find the 5 countries, where its citizen frequent visit US in 2016

In [221]:
!python3 select_queries.py -c 1

Executing: Selecting #1 - 
USE CASE 1: Find the Top 5 Countries Who Most Frequented US in 2016

Executing: Selecting #1 - 
select s.traveller_citizenship, c.country_name, sum(s.total_travellers) as yearly_travellers
from arrival_summary s
    left join arrival_country_code c on s.traveller_citizenship = c.country_code
group by s.traveller_citizenship, c.country_name
order by yearly_travellers desc
limit 5;

Executing: found [5] records
(135, 'UNITED KINGDOM', 4531534)
(209, 'JAPAN', 3278033)
(245, 'CHINA, PRC', 3128257)
(582, 'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)', 2617070)
(148, None, 2051390)
--


##### 4.4.2: Use Case #2 - Top 5 Cities Attraction

Find the 5 cities, where travellers frequent visited in 2016

In [222]:
!python3 select_queries.py -c 2

Executing: Selecting #2 - 
USE CASE 2: Find the Top 5 Cities That Are Most Visited in 2016

Executing: Selecting #2 - 
select s.arrival_city, cc.city_name, cc.state_code, 
    sc.state_name, sum(s.total_travellers) as yearly_travellers
from arrival_summary s
    left join arrival_city_code cc on s.arrival_city = cc.city_code
    left join arrival_state_code sc on trim(sc.state_code) = trim(cc.state_code)
group by  s.arrival_city, cc.city_name, cc.state_code, 
    sc.state_name
order by yearly_travellers desc
limit 5

Executing: found [5] records
('NYC', 'NEW YORK', ' NY', 'NEW YORK', 6671232)
('MIA', 'MIAMI', ' FL', 'FLORIDA', 5121183)
('LOS', 'LOS ANGELES', ' CA', 'CALIFORNIA', 4598326)
('SFR', 'SAN FRANCISCO', ' CA', 'CALIFORNIA', 2308671)
('HHW', 'HONOLULU', ' HI', 'HAWAII', 2248751)
--


##### 4.4.3: Use Case #3 - Relationship of Season and Travelling Patterns

Study if there is any relationship between the various season and travelling purposes

In [223]:
!python3 select_queries.py -c 3

Executing: Selecting #3 - 
USE CASE 3: Study the relationship between season and the different types of travellers

Executing: Selecting #3 - 
select a.arrival_month, s.season,
	sum(a.total_travellers) as total_travellers,
	sum(a.business_travellers) as business_travellers,
	sum(a.leisure_travellers) as leisure_travellers,
	sum(a.student_travellers) as student_travellers
from arrival_summary a
	left join season s on  a.arrival_month = s.season_month
group by a.arrival_month, s.season
order by EXTRACT(MONTH FROM to_date(a.arrival_month, 'Month'))

Executing: found [12] records
('January', 'winter', 2847924, 522890, 1939899, 385135)
('February', 'winter', 2570543, 446667, 2077101, 46775)
('March', 'winter', 3157072, 446208, 2607405, 103459)
('April', 'spring', 3096313, 522079, 2530868, 43366)
('May', 'spring', 3444249, 530473, 2858037, 55739)
('June', 'spring', 3574469, 487098, 3009651, 77720)
('July', 'summer', 4261837, 415603, 3728464, 117770)
('August', 'summer', 4097428, 375838, 3281

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### 5.1: Tools and Technologies Used

The following are the tools/libraries that will be used: <br />
1) Pandas 

> `Pandas` will be use to load the `World Temperature Data`, 
`US City Demographic Data`, `Airport Code Data`. In order to use `Pandas`, please ensure `pip` is installed and that the `pandas` library is installed via `pip`
<br/><br/>


2) Spark 

> `Spark` will be use to load the `194 Immigration Data`, which is in parquet format and is in millions of rows. In order to run `Spark`, please ensure that spark cluster is either installed or the standalone mode is setup.
<br/><br/>

3) PostgreSQL

> `PostgreSQL` will be use to store the proceessed data for future analysis. In order to run `PostgreSQL`, please ensure the the postgresql database is installed.

#### 5.2: How often should the data be updated

1) `arrival_summary` table update frequency (monthly)

> As the data are usually used for analytics purposes and the `arrival_summary` table consists of monthly travellers information, we can update the table monthly.

2) `arrival_*` related table update frequency (yearly)

> As for the rest of the arrival-related tables such as `arrival_country_code`,`arrival_state_code`, `arrival_city_code`, the information is more static and can be maintained yearly to add in any new changes

3) `season` and `temperature` table update frequency (monthly/yearly)

> As the weather data is recorded monthly, hence the information can be maintained either monthly or yearly to add in the new changes

4) `population` table (adhoc)

> As the population data is not readily available, the information can only be maintained if the relevant organisation disclosed the information

5) `airport` table update frequency (yearly) 

> As the airport data is relatively static, it can be maintained yearly to add in any new changes

#### 5.3: Scenarios

##### 5.3.1: Scenario 1 - The data was increased by 100x

>The data amount will surmount to big data (one million x 100 = hundred million). Assuming the data stored in any storage is approximately 1kb, hundred million will means that the storage alone would require almost close to 100 TB (hundred million * 1kb) of storage space, which is impossible to store in just 1 machine.
<br /><br />
We will need to be running spark to process the data and ingesting it into our storage as the standard notebook would not be able to handle such a large amount of data either due to the limitation of its memory or its storage. By leveraging on spark, we not only able to break down the big tasks into smaller tasks which are doable in most compute, but also allows the data to be stored in a distributed manner, thus allowing higher availablilty (if one machine storing the data crashed, there is another one that have a copy) through replication.

##### 5.3.2: Scenario 2 - The data populates a dashboard that must be updated on a daily basis by 7am every day

>We can use airflow and configure a airflow job which will run at 7am every day to update the data required for the dashboard. In addition, we can redirect any fail job to any notifications so that we would know that the update job need to be retrigger manually and that there might be some reason that the update could not complete automatically

##### 5.3.3: Scenario 3 - The database needed to be accessed by 100+ people

>We will probably need to have a load balancer to cater to the 100+ people so that our service would not be overwhelm. Alternatively if the data are static and don't change much, we can consider introducing a caching mechanism so that the result can be quickly return to the user.
<br /><br />
Another point to note is that if the data is not sensitive, we might also want to consider using cloud services such as AWS Elastic Load Balancer, which have an auto scaling feature that would be handy depending on the number of concurrent calls. That way, we won't have to worry about the user overwhelming the services, which retrieve the data.

#### Step 6: Housekeep Tables

Delete all the tables that were created

In [None]:
!python3 drop_tables.py