# Top of the Lake
### Data Engineering Capstone Project

#### Project Summary
Combining immigration, travel, weather, and demographics data sources to create a data model for further analysis into foreign tourism to the United States.

* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Project Write Up

In [1]:
# imports and installs
import os
import sys
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import configparser
import datetime
import glob
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import types as t
from collections import Counter
from pyspark.sql import DataFrame
from functools import reduce

%matplotlib inline

Left align tables

In [2]:
%%html
<style>
    table {
        display: inline-block
    }
</style>

### Step 1: Scope the Project and Gather Data

#### Scope 
The project combines four datasets covering immigration and travel, weather, and city demographics.
- [I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html): 2016 Immigration data to the United States from the U.S. National Tourism and Trade Office
- [World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data): Earth surface temperature data from Kaggle
- [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/information/): Demographics of all U.S. cities and census-designated places with a population greater or equal to 65,000
- [Airport Code Table](https://datahub.io/core/airport-codes#data): Airport codes and their corresponding cities

Data is modeled using a **data lake** hosted on Amazon Web Services (AWS). Data is first loaded and processed into dimension and analytics tables using Spark and **schema-on-read**. The data is then saved to S3 as partitioned `parquet` files to be easily loaded back into Spark dataframe objects on demand. For the saved analytics tables, a **snowflake schema** of one fact table with accompanying dimension tables is used. The Spark process is deployed on an AWS EMR cluster.

#### Describe and Gather Data 
Initial steps include gathering the datasets and performing a high-level review of the data and quality.

In [None]:
# read in sample immigration data to scope the full dataset
df = pd.read_csv('data/immigration_data_sample.csv', index_col=0).reset_index(drop=True)
df.head()

In [None]:
df.info()

In [None]:
# read in airport data for initial review
df_air = pd.read_csv('data/airport-codes_csv.csv')
df_air.head()

In [None]:
df_air.info()

In [None]:
# read in temperature data for initial review
df_temp = pd.read_csv('data/GlobalLandTemperaturesByCity.csv.gz', compression='gzip')
df_temp.head()

In [None]:
df_temp.info()

In [None]:
# read in city demographics data for initial review
df_demo = pd.read_csv('data/us-cities-demographics.csv', sep=';')
df_demo.head()

In [None]:
df_demo.info()

### Step 2: Explore and Assess the Data

Identify data quality issues, like missing values, duplicate data, etc.

In [3]:
# instantiate a SparkSession object
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.3,com.amazonaws:aws-java-sdk-pom:1.10.34") \
    .enableHiveSupport() \
    .getOrCreate()

#### 2.1 Explore and clean the immigration dataset

In [8]:
# read in sample immigration data to a Spark dataframe
df_imm = spark.read.csv('data/immigration_data_sample.csv', header=True)
df_imm.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- cicid: string (nullable = true)
 |-- i94yr: string (nullable = true)
 |-- i94mon: string (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- count: string (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = tru

The following will read in all 2016 I94 immigration data and union each month to create a combined, full-year Spark dataframe.

In [None]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.sas7bdat'))
        for fpath in files :
            all_files.append(os.path.abspath(fpath))
    
    return all_files

In [None]:
sas_dir = '../../data/18-83510-I94-Data-2016'
sas_filepaths = get_files(sas_dir)

In [None]:
frames = []
for fp in sas_filepaths:
    df = spark.read.format('com.github.saurfang.sas.spark') \
        .load('../..' + fp) \
        .select('i94mon', 'i94cit', 'i94port', 'arrdate', 'depdate', \
                'i94mode', 'i94bir', 'i94visa', 'gender', 'airline', 'visatype')
    frames.append(df)

In [None]:
df_imm = reduce(DataFrame.union, frames)

Helper functions to be used in Spark UDFs

In [4]:
def convert_date(d):
    '''
    Convert SAS date to datetime object

    INPUT
    d: number of days

    OUTPUT
    Datetime object
    '''
    epoch = datetime.datetime(1960, 1, 1)
    if d is not None:
        return epoch + datetime.timedelta(days=d)
    else:
        return None
    
date_func = f.udf(lambda d: convert_date(d), t.DateType())

In [5]:
def map_trip_purpose(tp):
    '''
    Map trip purpose codes to strings

    INPUT
    tp: integer identifier of trip purpose

    OUTPUT
    String identifier of trip purpose
    '''
    if tp is not None:
        if tp == 1:
            return 'Business'
        elif tp == 2:
            return 'Pleasure'
        elif tp == 3:
            return 'Student'
    else:
        None
        
trip_purpose_func = f.udf(lambda tp: map_trip_purpose(tp), t.StringType())

In [6]:
# UDFs to convert lists to dictionary counter objects
int_counter_udf = f.udf(lambda s: dict(Counter(s)), t.MapType(t.IntegerType(), t.IntegerType()))
str_counter_udf = f.udf(lambda s: dict(Counter(s)), t.MapType(t.StringType(), t.IntegerType())) 

In [7]:
def calc_median(lst):
    '''
    Calculate median from list of values

    INPUT
    lst: list of values

    OUTPUT
    Calculated median
    '''
    med = np.median(lst)
    return float(med)

median_func = f.udf(calc_median, t.DoubleType())

Check null counts by variable

In [9]:
df_imm.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in df_imm.columns]).toPandas().T

Unnamed: 0,0
_c0,0
cicid,0
i94yr,0
i94mon,0
i94cit,0
i94res,0
i94port,0
arrdate,0
i94mode,0
i94addr,59


Performed the following transformations on the immigration dataset:
- Selected and cast variables to correct data types
- Dropped duplicate records
- Filterd to air entry mode only
- Remobed unknown/unidentified entry ports
- Converted arrival and departure variables to date fields before creating a length of stay variable
- Converted trip purpose to a string field
- Replaced 'X' in the gender field with None
- Dropped fields no longer needed

In [10]:
imm_table = df_imm \
    .filter(f.col('i94mode') == 1) \
    .filter(~f.col('i94port').isin(['XXX', '888', 'UNK'])) \
    .select(
        f.col('i94mon').cast(t.IntegerType()).alias('month'),
        f.col('i94cit').cast(t.IntegerType()).alias('country_id'),
        f.col('i94port').alias('airport_cd'),
        f.col('arrdate').cast(t.IntegerType()).alias('arrival_date'), 
        f.col('depdate').cast(t.IntegerType()).alias('departure_date'),
        f.col('i94bir').cast(t.IntegerType()).alias('age'),
        f.col('i94visa').cast(t.IntegerType()).alias('trip_purpose'),
        f.col('gender'),
        f.col('airline').alias('airline_cd'),
        f.col('visatype').alias('visa_type')) \
    .dropDuplicates() \
    .withColumn('arrival_date', date_func(f.col('arrival_date'))) \
    .withColumn('departure_date', date_func(f.col('departure_date'))) \
    .withColumn('trip_purpose', trip_purpose_func(f.col('trip_purpose'))) \
    .withColumn('length_of_stay', f.datediff(f.col('departure_date'), f.col('arrival_date'))) \
    .withColumn('gender', f.when(f.col('gender') == 'X', None).otherwise(f.col('gender'))) \
    .drop('arrival_date', 'departure_date')

Data has been aggregated by month and airport. When appropriate MapType field structuring is used to further segment aggregations by variable. For example, the gender field has been converted to a MapType in which Male and Female are keys and visit counts are values. Other aggregations include the average length of stay and the median visitor age.

In [11]:
# GENDER counts by month and airport
# imm_table.filter(f.col('gender').isNotNull()) \
#     .groupBy('year', 'month', 'entry_port_cd') \
#     .pivot('gender') \
#     .agg(f.countDistinct('immigration_id')) \
#     .fillna(0) \
#     .withColumnRenamed('F', 'female_cnt') \
#     .withColumnRenamed('M', 'male_cnt') \
#     .show(5)

gender_table = imm_table.groupBy('month', 'airport_cd') \
    .agg(f.collect_list('gender').alias('gender')) \
    .withColumn('gender', str_counter_udf(f.col('gender'))) \
    .alias('gender_table')

In [12]:
# AVG LENGTH OF STAY by month and airport
stay_table = imm_table.groupBy('month', 'airport_cd') \
    .agg(f.round(f.mean('length_of_stay'),2).alias('avg_stay')) \
    .alias('stay_table')

In [13]:
# MEDIAN AGE by month and airport
age_table = imm_table.groupBy('month', 'airport_cd') \
    .agg(median_func(f.collect_list(f.col('age'))).alias('median_age')) \
    .alias('age_table')

# imm_table.groupBy('year', 'month', 'entry_port_cd').agg(f.round(f.mean('age'),2).alias('avg_age')).show(5)

In [14]:
# TRIP PURPOSE countes by month and airport
# imm_table.groupBy('year', 'month', 'entry_port_cd') \
#     .pivot('trip_purpose') \
#     .agg(f.count('immigration_id')) \
#     .fillna(0) \
#     .withColumnRenamed('Business', 'business_trip_cnt') \
#     .withColumnRenamed('Pleasure', 'pleasure_trip_cnt') \
#     .withColumnRenamed('Student', 'student_trip_cnt') \
#     .show(5)

trip_table = imm_table.groupBy('month', 'airport_cd') \
    .agg(f.collect_list('trip_purpose').alias('trip_purpose')) \
    .withColumn('trip_purpose', str_counter_udf(f.col('trip_purpose'))) \
    .alias('trip_table')

In [15]:
# VISA TYPE counts by month and airport
visa_table = imm_table.groupBy('month', 'airport_cd') \
    .agg(f.collect_list('visa_type').alias('visa_type')) \
    .withColumn('visa_type', str_counter_udf(f.col('visa_type'))) \
    .alias('visa_table')

In [16]:
# COUNTRY counts by month and airport
country_table = imm_table.groupBy('month', 'airport_cd') \
    .agg(f.collect_list('country_id').alias('countries')) \
    .withColumn('countries', int_counter_udf(f.col('countries'))) \
    .alias('country_table')

In [17]:
# AIRLINE counts by month and airport
airline_table = imm_table.groupBy('month', 'airport_cd') \
    .agg(f.collect_list('airline_cd').alias('airlines')) \
    .withColumn('airlines', str_counter_udf(f.col('airlines'))) \
    .alias('airline_table')

In [18]:
# TOTAL count by month and airport
cnt_table = imm_table.groupBy('month', 'airport_cd').count().alias('cnt_table')

After aggregating immigration data, each individual grouping is combined into one analytical table. The Spark `monotonically_increasing_id` function is used to create a unique primary key.

In [19]:
# full immigration table
immigration_table = cnt_table \
    .join(gender_table, on=[
        f.col('cnt_table.month') == f.col('gender_table.month'), \
        f.col('cnt_table.airport_cd') == f.col('gender_table.airport_cd')], how='left') \
    .join(stay_table, on=[
        f.col('cnt_table.month') == f.col('stay_table.month'), \
        f.col('cnt_table.airport_cd') == f.col('stay_table.airport_cd')], how='left') \
    .join(age_table, on=[
        f.col('cnt_table.month') == f.col('age_table.month'), \
        f.col('cnt_table.airport_cd') == f.col('age_table.airport_cd')], how='left') \
    .join(trip_table, on=[
        f.col('cnt_table.month') == f.col('trip_table.month'), \
        f.col('cnt_table.airport_cd') == f.col('trip_table.airport_cd')], how='left') \
    .join(visa_table, on=[
        f.col('cnt_table.month') == f.col('visa_table.month'), \
        f.col('cnt_table.airport_cd') == f.col('visa_table.airport_cd')], how='left') \
    .join(country_table, on=[
        f.col('cnt_table.month') == f.col('country_table.month'), \
        f.col('cnt_table.airport_cd') == f.col('country_table.airport_cd')], how='left') \
    .join(airline_table, on=[
        f.col('cnt_table.month') == f.col('airline_table.month'), \
        f.col('cnt_table.airport_cd') == f.col('airline_table.airport_cd')], how='left') \
    .select(
        cnt_table.month,
        cnt_table.airport_cd,
        gender_table.gender,
        stay_table.avg_stay,
        age_table.median_age,
        trip_table.trip_purpose,
        visa_table.visa_type,
        country_table.countries,
        airline_table.airlines) \
    .withColumn('immigration_id', f.monotonically_increasing_id())

immigration_table.printSchema()

root
 |-- month: integer (nullable = true)
 |-- airport_cd: string (nullable = true)
 |-- gender: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = true)
 |-- avg_stay: double (nullable = true)
 |-- median_age: double (nullable = true)
 |-- trip_purpose: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = true)
 |-- visa_type: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = true)
 |-- countries: map (nullable = true)
 |    |-- key: integer
 |    |-- value: integer (valueContainsNull = true)
 |-- airlines: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = true)
 |-- immigration_id: long (nullable = false)



#### 2.2 Explore and clean the airport dataset

In [20]:
# read in airport data to a Spark dataframe (define schema or infer schema)
df_air = spark.read.csv('data/airport-codes_csv.csv', header=True)
df_air.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [21]:
df_air.select('type').where(f.col('type').like('%airport%')).distinct().show()

+--------------+
|          type|
+--------------+
| large_airport|
|medium_airport|
| small_airport|
+--------------+



Performed the following transformations on the airport dataset:
- Dropping rows with null IATA code
- Filtered to U.S. airports
- Split the coordinates field to create separate latitude and longitude fields
- Extracted the state code from the iso_region field by splitting on a hyphen
- Selected and cast needed variables
- Dropped duplicate records

In [22]:
airport_table = df_air.na.drop(subset=['iata_code']) \
    .filter(f.col('iso_country') == 'US') \
    .where(f.col('type').like('%airport%')) \
    .withColumn('coord_split', f.split(f.col('coordinates'), ',')) \
    .withColumn('state_split', f.split(f.col('iso_region'), '-')) \
    .select(
        f.col('iata_code').alias('airport_cd'),
        f.col('type'),
        f.col('name'),
        f.col('state_split')[1].alias('state'),
        f.col('municipality').alias('city'),
        f.col('coord_split')[1].cast(t.DoubleType()).alias('latitude'),
        f.col('coord_split')[0].cast(t.DoubleType()).alias('longitude')) \
    .dropDuplicates() \
    .na.drop(subset=['city'])

airport_table.printSchema()

root
 |-- airport_cd: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



#### 2.3 Explore and clean the city demographics dataset

In [23]:
# read in city demographics data to a Spark dataframe (define schema or infer schema)
df_demo = spark.read.csv('data/us-cities-demographics.csv', header=True, sep=';')
df_demo.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [24]:
df_demo.select('Race').distinct().show()

+--------------------+
|                Race|
+--------------------+
|Black or African-...|
|  Hispanic or Latino|
|               White|
|               Asian|
|American Indian a...|
+--------------------+



In [None]:
df_demo.groupBy('City').pivot('Race').agg({'Count':'sum'}).show(5)

Selected, cast, and renamed the desired data fields

In [25]:
df_city = df_demo.select(
        f.col('City').alias('city'),
        f.col('State Code').alias('state'),
        f.col('Median Age').cast(t.DoubleType()).alias('median_age'),
        f.col('Male Population').cast(t.IntegerType()).alias('male_pop'),
        f.col('Female Population').cast(t.IntegerType()).alias('female_pop'),
        f.col('Total Population').cast(t.IntegerType()).alias('total_pop'),
        f.col('Number of Veterans').cast(t.IntegerType()).alias('veterans'),
        f.col('Foreign-born').cast(t.IntegerType()).alias('foreign_born'),
        f.col('Average Household Size').cast(t.DoubleType()).alias('avg_hh_size')) \
    .dropDuplicates()

Created a pivot table on race, grouping by city and state

In [26]:
df_city_pivot = df_demo.groupBy('City', 'State Code').pivot('Race').agg({'Count': 'sum'}) \
    .withColumnRenamed('City', 'city') \
    .withColumnRenamed('State Code', 'state') \
    .withColumnRenamed('American Indian and Alaska Native', 'native_am_pop') \
    .withColumnRenamed('Asian', 'asian_pop') \
    .withColumnRenamed('Black or African-American', 'black_pop') \
    .withColumnRenamed('Hispanic or Latino', 'hispanic_pop') \
    .withColumnRenamed('White', 'white_pop')

Merged the two tables to create one dimension table

In [27]:
demo_table = df_city.join(df_city_pivot, 
                          on=[df_city.city == df_city_pivot.city, df_city.state == df_city_pivot.state]) \
    .select(df_city.city,
        df_city.state,
        df_city.median_age,
        df_city.male_pop,
        df_city.female_pop,
        df_city_pivot.native_am_pop.cast(t.IntegerType()),
        df_city_pivot.asian_pop.cast(t.IntegerType()),
        df_city_pivot.black_pop.cast(t.IntegerType()),
        df_city_pivot.hispanic_pop.cast(t.IntegerType()),
        df_city_pivot.white_pop.cast(t.IntegerType()),
        df_city.veterans,
        df_city.foreign_born,
        df_city.avg_hh_size)

demo_table.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_pop: integer (nullable = true)
 |-- female_pop: integer (nullable = true)
 |-- native_am_pop: integer (nullable = true)
 |-- asian_pop: integer (nullable = true)
 |-- black_pop: integer (nullable = true)
 |-- hispanic_pop: integer (nullable = true)
 |-- white_pop: integer (nullable = true)
 |-- veterans: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- avg_hh_size: double (nullable = true)



#### 2.4 Explore and clean the global land temperature by city dataset

In [28]:
# read in temperature data to a Spark dataframe (define schema or infer schema)
df_temp = spark.read.csv('data/GlobalLandTemperaturesByCity.csv.gz', header=True)
df_temp.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



Check for fields with null values

In [29]:
df_temp.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in df_temp.columns]).toPandas().T

Unnamed: 0,0
dt,0
AverageTemperature,364130
AverageTemperatureUncertainty,364130
City,0
Country,0
Latitude,0
Longitude,0


Created helper functions to be used in Spark UDFs

In [30]:
def cel_to_fah(c):
    '''
    Convert celsius to fahrenheit

    INPUT
    c: integer value for temperature in Celsius

    OUTPUT
    Float value for temperature in Fahrenheit
    '''
    if c is not None:
        return c*(9/5)+32
    else:
        return None

temp_conver_func = f.udf(lambda c: cel_to_fah(c), t.DoubleType())

In [31]:
def convert_latlong(c):
    '''
    Convert cardinal directions to +/- latitude and longitude coordinates

    INPUT
    c: latitude or longitude coordinate with cardinal direction as the last character

    OUTPUT
    Postive or negative coordinate value
    '''
    if c is not None:
        num = c[:-1]
        quad = c[-1]
        if quad.upper() in ('S', 'W'):
            return float(num) * -1
        elif quad.upper() in ('N', 'E'):
            return float(num)
        else:
            None
    else:
        return None
    
latlong_func = f.udf(lambda c: convert_latlong(c), t.DoubleType())

The most recent year of data available is 2013

In [32]:
df_temp.filter(f.col('Country') == 'United States') \
    .withColumn('dim_date', f.to_date(f.col('dt'))) \
    .withColumn('year', f.year('dim_date')) \
    .agg({'year': 'max'}).collect()[0]

Row(max(year)=2013)

Performed the following transformations to the temperature dataset:
- Filtered to U.S. records
- Converted the `dt` field to datetime
- Filtered to records since 1950
- Dropped any records missing an average temperature
- Selected, cast, and renamed useful fields
- Converted the average temperature field from Celsius to Fahrenheit
- Dropped duplicate records
- Calculated the median average temperature by month, city, and lat/long coordinates

In [33]:
temp_table = df_temp.filter(f.col('Country') == 'United States') \
    .withColumn('dim_date', f.to_date(f.col('dt'))) \
    .filter(f.year(f.col('dim_date')) >= 1950) \
    .na.drop(subset=['AverageTemperature']) \
    .select(
        f.month(f.col('dim_date')).alias('month'),
        f.col('AverageTemperature').cast(t.DoubleType()).alias('avg_temp'),
        f.col('City').alias('city'),
        f.col('Latitude').alias('latitude'),
        f.col('Longitude').alias('longitude')) \
    .withColumn('avg_temp', f.round(temp_conver_func(f.col('avg_temp')), 2)) \
    .withColumn('latitude', latlong_func(f.col('latitude'))) \
    .withColumn('longitude', latlong_func(f.col('longitude'))) \
    .dropDuplicates() \
    .groupBy('month', 'city', 'latitude', 'longitude') \
    .agg(median_func(f.collect_list(f.col('avg_temp'))).alias('median_avg_temp'))

temp_table.printSchema()

root
 |-- month: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- median_avg_temp: double (nullable = true)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
A **data lake** using a snowflake schema-on-read approach is utilized due to the multiple levels of relationships between dimension tables. From a centralized immigration fact table, airport, city demographics, and temperature dimension tables are interconnected, forming one-to-many relationships.

#### Fact Table
**immigration**
- immigration_id: long
- month: integer
- airport_cd: string
- gender: map
    - key: string
    - value: integer
- avg_stay: double
- median_age: double
- trip_purpose: map
    - key: string
    - value: integer
- visa_type: map
    - key: string
    - value: integer
- countries: map
    - key: integer
    - value: integer
- airlines: map
    - key: string
    - value: integer

#### Dimension Tables
**airport**
- airport_cd: string
- type: string
- name: string
- state: string
- city: string
- latitude: double
- longitude: double
    
**demographics**
- city: string
- state: string
- median_age: double
- male_pop: integer
- female_pop: integer
- native_am_pop: integer
- asian_pop: integer
- black_pop: integer
- hispanic_pop: integer
- white_pop: integer
- veterans: integer
- foreign_born: integer
- avg_hh_size: double
    
**temperature** 
- month: integer  
- city: string  
- latitude: double  
- longitude: double  
- median_avg_temp: double  

![data-model](img/udacity_capstone_data_model.png)

#### 3.2 Mapping Out Data Pipelines
The data pipeline is centured around Spark and AWS EMR. The pipeline consists of the follwoing steps:
1. Using Infrastructure-as-Code (IaC), create a new EMR cluster with Spark installed
2. Copy the ETL pipeline from S3 onto the EMR cluster
3. Submit the ETL pipeline to `spark-submit` to perform data transformations and save output tables to S3
4. Terminate the EMR cluster

### Step 4: Run ETL to Model the Data
#### 4.1 Move data files to S3
Copy raw data files and the `etl.py` script to S3

In [None]:
% python copy_to_s3.py

#### 4.2 Run Pipeline
Create an EMR cluster, move the `etl.py` pipeline script from S3 onto the cluster, execute the pipeline using `spark-submit`

In [None]:
% python emr.py

#### 4.3 Data Quality Checks
The following data quality checks confirm data completeness

*Immigration table*

In [34]:
# null counts
immigration_table.select([f.count(f.when(f.col(c).isNull(), c)) \
                          .alias(c) for c in immigration_table.columns]).toPandas()

Unnamed: 0,month,airport_cd,gender,avg_stay,median_age,trip_purpose,visa_type,countries,airlines,immigration_id
0,0,0,0,1,0,0,0,0,0,0


In [36]:
# number of duplicate airports
immigration_table.groupBy("airport_cd").count().filter("count >= 2").count()

0

*Airports table*

In [37]:
# null counts
airport_table.select([f.count(f.when(f.col(c).isNull(), c)) \
                          .alias(c) for c in airport_table.columns]).toPandas()

Unnamed: 0,airport_cd,type,name,state,city,latitude,longitude
0,0,0,0,0,0,0,0


In [38]:
# number of airports
airport_table.select(f.col('airport_cd')).distinct().count()

1865

In [39]:
# number of duplicate airports
airport_table.groupBy("airport_cd").count().filter("count >= 2").count()

0

*City demographics table*

In [40]:
# null counts
demo_table.select([f.count(f.when(f.col(c).isNull(), c)) \
                          .alias(c) for c in demo_table.columns]).toPandas()

Unnamed: 0,city,state,median_age,male_pop,female_pop,native_am_pop,asian_pop,black_pop,hispanic_pop,white_pop,veterans,foreign_born,avg_hh_size
0,0,0,0,1,1,57,13,12,0,7,7,7,8


In [41]:
# dtype of city field == string
demo_table.schema['city'].dataType == t.StringType()

True

*Temperature table*

In [42]:
# null counts
temp_table.select([f.count(f.when(f.col(c).isNull(), c)) \
                          .alias(c) for c in temp_table.columns]).toPandas()

Unnamed: 0,month,city,latitude,longitude,median_avg_temp
0,0,0,0,0,0


In [43]:
# 12 months
temp_table.select(f.col('month')).distinct().count() == 12

True

In [44]:
# dtype of median_avg_temp field == string
temp_table.schema['median_avg_temp'].dataType == t.DoubleType()

True

*Joining of immigration table with airport table*

In [45]:
immigration_table.join(airport_table, 
                        on=[immigration_table.airport_cd == airport_table.airport_cd]) \
    .select(
        immigration_table.month,
        airport_table.airport_cd) \
    .distinct().count()

29

#### 4.4 Data dictionary 

`immigration`

| column name    | data type | description | origin |
| -------------- | --------- | ----------- | ------ |
| immigration_id | long | monotonically increasing and unique ID | PySpark generated |
| month | integer | integer representation of month of the year | 2016 I-94 immigration data |
| airport_cd | string | IATA airport code | 2016 I-94 immigration data |
| gender | map (key: string, value: integer) | visit counts by gender in key value pairs | 2016 I-94 immigration data |
| avg_stay | double | average length of stay in days | 2016 I-94 immigration data |
| median_age | double | median age of visitors | 2016 I-94 immigration data |
| trip_purpose | map (key: string, value: integer) | visit counts by trip purpose in key value pairs | 2016 I-94 immigration data |
| visa_type | map (key: string, value: integer) | visit counts by visa type in key value pairs | 2016 I-94 immigration data |
| countries | map (key: string, value: integer) | visit counts by country of origin in key value pairs | 2016 I-94 immigration data |
| airlines | map (key: string, value: integer) | visit counts by airline in key value pairs | 2016 I-94 immigration data |


`airport`

| column name    | data type | description | origin |
| -------------- | --------- | ----------- | ------ |
| airport_cd | string | IATA airport code | airport location information and attributes dataset from datahub.io |
| type | string | type of airport (small, medium, large) | airport location information and attributes dataset from datahub.io |
| name | string | airport name | airport location information and attributes dataset from datahub.io |
| state | string | two letter U.S. state code of airport location | airport location information and attributes dataset from datahub.io |
| city | string | U.S. city of airport location | airport location information and attributes dataset from datahub.io |
| latitude | double | latitudinal coordinate of airport location | airport location information and attributes dataset from datahub.io |
| longitude | double | longitudinal coordinate of airport location | airport location information and attributes dataset from datahub.io |


`demographics`

| column name    | data type | description | origin |
| -------------- | --------- | ----------- | ------ |
| city | string | U.S. city name | city demographics data from the U.S. Census Bureau's 2015 American Community Survey |
| state | string | two letter U.S. state code | city demographics data from the U.S. Census Bureau's 2015 American Community Survey |
| median_age | double | median age of city resident | city demographics data from the U.S. Census Bureau's 2015 American Community Survey |
| male_pop | integer | male population | city demographics data from the U.S. Census Bureau's 2015 American Community Survey |
| female_pop | integer | female population | city demographics data from the U.S. Census Bureau's 2015 American Community Survey |
| native_am_pop | integer | native american population | city demographics data from the U.S. Census Bureau's 2015 American Community Survey |
| asian_pop | integer | asian population | city demographics data from the U.S. Census Bureau's 2015 American Community Survey |
| black_pop | integer | african american population | city demographics data from the U.S. Census Bureau's 2015 American Community Survey |
| hispanic_pop | integer | hispanic population | city demographics data from the U.S. Census Bureau's 2015 American Community Survey |
| white_pop | integer | caucasian population | city demographics data from the U.S. Census Bureau's 2015 American Community Survey |
| veterans | integer | number of veterans | city demographics data from the U.S. Census Bureau's 2015 American Community Survey |
| foreign_born | integer | number of foreign born residents | city demographics data from the U.S. Census Bureau's 2015 American Community Survey |
| avg_hh_size | double | average household size | city demographics data from the U.S. Census Bureau's 2015 American Community Survey |


`temperature`

| column name    | data type | description | origin |
| -------------- | --------- | ----------- | ------ |
| month | integer | integer representation of month of the year | 1950-2013 temperature data from the Berkeley Earth Surface Temperature Study |
| city | string | U.S. city name | 1950-2013 temperature data from the Berkeley Earth Surface Temperature Study |
| latitude | double | latitudinal coordinate of city | 1950-2013 temperature data from the Berkeley Earth Surface Temperature Study |
| longitude | double | longitudinal coordinate of city | 1950-2013 temperature data from the Berkeley Earth Surface Temperature Study |
| median_avg_temp | double | median average temperature | 1950-2013 temperature data from the Berkeley Earth Surface Temperature Study |


#### 4.5 Save Analytics Tables
Table output is saved as parquet files to S3 as a last step in the ETL pipeline

In [None]:
output_data = 's3n://sking-udacity-capstone/'

In [None]:
# write immigration table to parquet
imm_output = output_data + 'parquet-files/immigration'
immigration_table.write.partitionBy('month','airport_cd').parquet(imm_output)
immigration_table = spark.read.parquet(imm_output)

In [None]:
# write airport table to parquet files
airport_output = output_data + 'parquet-files/airports'
airport_table.write.parquet(airport_output)
airport_table = spark.read.parquet(airport_output)

In [None]:
# write demographics table to parquet files
demo_output = output_data + 'parquet-files/demographics'
demo_table.write.partitionBy('city','state').parquet(demo_output)
demo_table = spark.read.parquet(demo_output)

In [None]:
# write temperature table to parquet files
temp_output = output_data + 'parquet-files/temperature'
temp_table.write.partitionBy('month','city').parquet(temp_output)
temp_table = spark.read.parquet(temp_output)

### Step 5: Project Write Up

#### 5.1 Rationale
The project relies heavily on Apache Spark and AWS. A data lake model was chosen for the following reasons:
1. On-demand cloud-based technologies are easily scaled and can cut down on processing costs and overhead
2. Spark is best-in-class for big data processing, and allows for advanced downstream analytics and machine learning
3. Schema-on-read supports the ingestion of all data formats, regardless of perceived value

#### 5.2 Refreshing Data
Data has been aggregated at the city and month level. As such, it is recommended that data be refreshed and updated on a monthly cadence. As the data is designed to inform marketing and advertising considerations, monthly updates should be sufficient to capture key changes in the underlying data.

#### 5.3 Scenarios
_1. The data increased by 100x_  
> The selected data model lends itself kindly to sudden and significant increases in data size. Spark's distributed processing engine combined with AWS elastic computing can easily handle an increase in data by 100x.

_2. The data populates a daily-updated dashboard_  
> The intended purpose of the selected data model is to provide data aggregrated by month and city. However, current month statistics could be recalculated on a daily basis as new data becomes available. The likelihood of new data holding sway over pre-computed statistics is low, but a daily refresh would be fairly straight foward to implement using the the existing data pipeline and a ETL pipelining tool such as Apache Airflow.

_3. The database is accessed by 100+ people_  
> Because the project employs a data lake model, a relational database is not relevant. The data pipeline will construct analytics tables and then save the output as parquet files to S3. These files can then be accessed by anyone with S3 read permissions.