# Udacity Data Engineering Capstone Project

#### Project Summary
This project will aggregate disparate data sources related to U.S. immigration and create a datalake for use in analyzing immigration trends.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [51]:
# Do all imports and installs here
import pandas as pd
import configparser
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import (year as y, \
                                   month as m, \
                                   dayofmonth as dm, \
                                   hour as h, \
                                   weekofyear as wk, \
                                   dayofweek as dw, \
                                   date_format as dt)
from pyspark.sql.functions import udf, col, sum, avg
from pyspark.sql.types import IntegerType
import datetime
import matplotlib.pyplot as plt

In [2]:
config = configparser.ConfigParser()
config.read('aws.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['dend']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['dend']['AWS_SECRET_ACCESS_KEY']

output_data = "s3a://udacity-dend-capstone-s3/"


### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>
This is the capstone project as provided by Udacity.  This project will used U.S. immigration data, and the supplimental data sets described below:
 - **I94 Immigration Data:** This data is a sample dataset from the [US National Tourism and Trade Office](https://www.trade.gov/national-travel-and-tourism-office). It contains international visitor information and will be primarily used for the fact table.
 - **World Temperature Data:** This dataset contains historical temperature date from 1850 to 2013 came from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).
 - **U.S. City Demographic Data:** This dataset contains demographic data for major U.S. cities and is sourced from [OpenSoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
 - **Airport Code Table:** This is a simple table of airport codes and corresponding cities and is sourced from [datahub](https://datahub.io/core/airport-codes#data).

Initial research and modeling will be perfomed in a Jupyter notebook. 


#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

#### Immigration Data

The immingration data contains international visitor/traveler information including visa types, mode of transportation, age and gender of the individual, state visited, arrival and departure date, and port of entry from April, 2016.  The data set is based off information provided on the i94 immigration form.  More information regainf form i94 can be found [here](https://i94.cbp.dhs.gov/I94/#/home).

In [4]:
#Gather immigration data
spark = SparkSession.builder.\
    config("spark.jars.repositories", "https://repos.spark-packages.org/").\
    config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
    config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0"). \
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

df_spark.write.parquet("sas_data")
#df_spark = spark.read.parquet("sas_data")

#### Temperature Data
The temperature dataset contains average temperature for cities around the world, and their location by latitude and longitude ina monthly timeseries.  
This data was initially gathered by [Berkely Earth](https://berkeleyearth.org/about/), and was repackaged into the data set in kaggle which we are using here.  We could extract month and year from the date to aid in later analysis, or by country

In [5]:
df_t = pd.read_csv('GlobalLandTemperaturesByCity.csv')
df_t.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


#### Airport Data

The airport dataset contains airport name, code, type, location informations.  The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code.  Since we are dealing with civilians who are travelling, we will want to use the IATA code.

In [6]:
df_a = pd.read_csv('airport-codes_csv.csv')
df_a.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


#### U.S. City Demographic Data
As the name implies, this dataset contains basid demograpgic data for US Cities with a population greater or equal to 65,000.   Included in this dataset are population counts by gender, veteran status, race and foreign born.  The data here was sourced through the Census Bureau Data API but is not endorsed or certified by the Census Bureau. These could be used to calulate relative percentages.

In [7]:
df_c = pd.read_csv('us-cities-demographics.csv',delimiter=';')
df_c.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


### Label Data

This is an accessory data set for the I94 data.  The data is in a SAS library file.  The information here, labels, can be used in the dimension tables to provide additional context and detail.  We can create a function extract the pertinent label data to a dataframe which can be utilized later.

### Step 2: Explore and Assess the Data

#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.


##### Immigration Data

The immigaration data has few areas of concern.  First the date is in a SAS format and will need to be converted to datetime forat we can work with. Furthermore the datatpe is being infered as a float data type, which is not indicative of the actual data.  This problem endemic and will need to be corrected prior to final state.  Secondly, there are many gaps in the data; Gender, for example.  These will need to be addressed as well.

In [8]:
# Read in the sample data and 
df_i = pd.read_csv('immigration_data_sample.csv') #sample set
#df_i = pd.read_parquet('sas_data') #full set
df_i.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [9]:
df_i.infer_objects().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 29 columns):
Unnamed: 0    1000 non-null int64
cicid         1000 non-null float64
i94yr         1000 non-null float64
i94mon        1000 non-null float64
i94cit        1000 non-null float64
i94res        1000 non-null float64
i94port       1000 non-null object
arrdate       1000 non-null float64
i94mode       1000 non-null float64
i94addr       941 non-null object
depdate       951 non-null float64
i94bir        1000 non-null float64
i94visa       1000 non-null float64
count         1000 non-null float64
dtadfile      1000 non-null int64
visapost      382 non-null object
occup         4 non-null object
entdepa       1000 non-null object
entdepd       954 non-null object
entdepu       0 non-null float64
matflag       954 non-null object
biryear       1000 non-null float64
dtaddto       1000 non-null object
gender        859 non-null object
insnum        35 non-null float64
airline       967 non

The SAS date is easily corrected.  The date is represented in the number of days since epoch - in this case 1/1/1960.

In [10]:
def sas_dt_to_datetime(dt):
    return pd.to_timedelta(dt, unit='D') + pd.Timestamp('1960-1-1')

df_i['arrdate'] = sas_dt_to_datetime(df_i['arrdate'])
df_i['depdate'] = sas_dt_to_datetime(df_i['depdate'])

print('Earliest date: {}; Latest date: {}'.format(df_i['arrdate'].min(),df_i['arrdate'].max()))


Earliest date: 2016-04-01 00:00:00; Latest date: 2016-04-30 00:00:00


##### Temperature Data
The weather data, while comprehensive, will be excluded from the model going forward as the data within that dataset is not within the bounds of the data range in the i94 data.

As seen below this date range of this data does not overlap the imigration data provided:

In [11]:
df_t = pd.read_csv('GlobalLandTemperaturesByCity.csv')
df_t = df_t.dropna(subset=['AverageTemperature']).sort_values(by = ['dt'], ascending = False)
df_t.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
4679742,2013-09-01,27.767,1.309,Masaya,Nicaragua,12.05N,85.48W
2689314,2013-09-01,28.406,1.104,Guantánamo,Cuba,20.09N,75.07W
3218901,2013-09-01,19.716,1.208,Irapuato,Mexico,20.09N,100.66W
5562972,2013-09-01,22.678,1.136,Overland Park,United States,39.38N,93.64W
2402872,2013-09-01,18.83,1.188,Fresnillo,Mexico,23.31N,102.23W


In [12]:
df_t.infer_objects().info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 8235082 entries, 4679742 to 0
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 502.6+ MB


In [13]:
print('Earliest date: {}; Latest date: {}'.format(df_t['dt'].min(),df_t['dt'].max()))


Earliest date: 1743-11-01; Latest date: 2013-09-01


##### Demographic Data

In [14]:
df_c = pd.read_csv('us-cities-demographics.csv',delimiter=';')
df_c.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [15]:
df_c.infer_objects().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


In [16]:
#can we flatten demo to state level using spark?
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import col

sdf = spark.read.option("header",True).csv('us-cities-demographics.csv', sep=';')
sdf_dim = sdf[['State Code', 'State']].drop_duplicates()
sdf = sdf.withColumn("Male Population",col("Male Population").cast("int"))
sdf = sdf.withColumn("Female Population",col("Female Population").cast("int"))
sdf.printSchema()

tmp = sdf.groupBy("State Code", "State").sum("Male Population", "Female Population")
tmp.toPandas().head()


root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



Unnamed: 0,State Code,State,sum(Male Population),sum(Female Population)
0,MT,Montana,438535,467935
1,NC,North Carolina,7330525,7970470
2,MD,Maryland,3139755,3420890
3,CO,Colorado,7273095,7405250
4,CT,Connecticut,2123435,2231661


##### Airport Data

THere are a number of gaps in the airport data. Since the data model will use the iata_code as a key, we will want to remove any blanks.  Additionally we can exclude any iso_country code outside of the US. The IATA code for airports is a 3 digit code, and airlines a two digit code.  More info can be fond [here]('https://www.iata.org/en/publications/directories/code-search/)

In [17]:
df_a.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [18]:
df_a.infer_objects().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [19]:
df_a.isnull().sum()

ident               0
type                0
name                0
elevation_ft     7006
continent       27719
iso_country       247
iso_region          0
municipality     5676
gps_code        14045
iata_code       45886
local_code      26389
coordinates         0
dtype: int64

In [20]:
#invalid iata codes
df_a.loc[df_a['iata_code'].apply(lambda x : len(str(x)) != 3)]

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
18899,EK_4,small_airport,VejrÃ¸,,EU,DK,DK-85,,EK_4,-,-,"11.375, 55.035"
41010,SBES,small_airport,SÃ£o Pedro da Aldeia Airport,61.0,SA,BR,BR-RJ,SÃ£o Pedro Da Aldeia,SBES,0,,"-42.09260177612305, -22.81290054321289"
41020,SBGP,small_airport,EMBRAER - Unidade GaviÃ£o Peixoto Airport,1998.0,SA,BR,BR-SP,GaviÃ£o Peixoto,SBGP,0,,"-48.40510177612305, -21.773700714111328"
41048,SBLS,small_airport,Lagoa Santa Airport,2795.0,SA,BR,BR-MG,Lagoa Santa,SBLS,0,,"-43.896400451660156, -19.66160011291504"
41772,SDDJ,small_airport,Fazenda Santa Maria Airport,1525.0,SA,BR,BR-SP,MonÃ§Ãµes,SDDJ,0,,"-50.07939910888672, -20.848899841308594"
41784,SDDV,small_airport,Usina Catanduva Airport,1860.0,SA,BR,BR-SP,Palmares Paulista,SDDV,0,,"-48.84469985961914, -21.12689971923828"
41817,SDFC,small_airport,Fazenda ConstÃ¢ncia Airport,1673.0,SA,BR,BR-SP,Altair,SDFC,0,,"-49.1880989074707, -20.48859977722168"
41843,SDGC,small_airport,GarÃ§a Airport,2182.0,SA,BR,BR-SP,GarÃ§a,SDGC,0,,"-49.65610122680664, -22.1835994720459"
41921,SDJC,small_airport,Jaboticabal Airport,2024.0,SA,BR,BR-SP,Jaboticabal,SDJC,0,,"-48.284698486328125, -21.229999542236328"
41933,SDJO,small_airport,SÃ£o Joaquim da Barra Airport,2136.0,SA,BR,BR-SP,SÃ£o Joaquim Da Barra,SDJO,0,,"-47.842201232910156, -20.593299865722656"


##### Label Data

The label data provides additional context. This function can be used to extract label data details for various codes in the immigration dataset

In [21]:
import re
def get_labels (file_txt, label_type):
    pattern = r"({}*[^;]+)".format(label_type)
    lbl_set = re.search(pattern,file_txt)
    return extract_labels_to_df(lbl_set.group(0))

def extract_labels_to_df(labels):
    labels = labels.replace('\t','').split('\n')
    for i, lbl in enumerate(labels):
        if i == 0:
            key = label_type[i]
            print(key)
        else:
            label_data = lbl.replace(" = ","=").replace("\'","").strip().split("=")
            label_data.insert(0,key)
            try:
                df_lbl.loc[len(df_lbl)] = [x.strip() for x in label_data]
            except:
                continue
            
df_lbl = pd.DataFrame(columns=["key","code", "value"])
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    data = f.read()

label_type = ['i94cntyl','i94mode']
for i, lbl in enumerate(label_type):
    get_labels(data,lbl) 

df_spark_label = spark.createDataFrame(df_lbl)
df_spark_label.printSchema()

i94cntyl
i94cntyl
root
 |-- key: string (nullable = true)
 |-- code: string (nullable = true)
 |-- value: string (nullable = true)




#### Cleaning Steps
Document steps necessary to clean the data

In [100]:
# Performing cleaning tasks here
#UDF to convert SAS Date
convertDtUDF = udf(lambda dt: ((datetime.timedelta(dt) + datetime.datetime(1960, 1, 1)).date()) if dt else '')

#Clean Immigrations data
df_spark_immigration = spark.read.parquet("sas_data")
df_spark_immigration = df_spark_immigration.dropna(subset=['cicid','arrdate'])
df_spark_immigration = df_spark_immigration.withColumn('arrdate', convertDtUDF(df_spark_immigration['arrdate'])) \
                                           .withColumn('depdate', convertDtUDF(df_spark_immigration['depdate'])) \
                                           .withColumn("cicid",col("cicid").cast('integer')) \
                                           .withColumn("i94bir",col("i94bir").cast('integer')) \
                                           .withColumn("biryear",col("biryear").cast('integer'))
df_spark_immigration.printSchema()
#df_spark_immigration.na.fill("U",["gender"])



root
 |-- cicid: integer (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable

In [23]:
#Clean airport data
df_spark_airport = spark.read.options(header=True).csv('airport-codes_csv.csv')
df_spark_airport.dropna(subset=['iata_code']).drop_duplicates()
df_spark_airport = df_spark_airport.filter(df_spark_airport['iso_country'] == "US")
df_spark_airport = df_spark_airport.where("length(iata_code) = 3")
df_spark_airport.na.fill("").drop_duplicates()
df_spark_airport.count()


2019

In [59]:
#Clean demographic data
df_spark_demog = spark.read.options(header=True,sep=';').csv('us-cities-demographics.csv')
df_spark_demog.drop_duplicates(subset=["City", "State", "Race"])
df_spark_demog.dropna(subset=["City", "State Code"])
df_spark_demog =  df_spark_demog.withColumnRenamed("Median Age","median_age")\
                                .withColumnRenamed("Male Population","male_population")\
                                .withColumnRenamed("Female Population","female_population")\
                                .withColumnRenamed("Total Population","total_population")\
                                .withColumnRenamed("Number of Veterans","number_of_veterans")\
                                .withColumnRenamed("Number of Veterans","number_of_veterans")\
                                .withColumnRenamed("Average Household Size","avg_household_size")\
                                .withColumnRenamed("State Code","state_code")
df_spark_demog.withColumn("male_population",col("male_population").cast("int")) \
            .withColumn("male_population",col("male_population").cast("int")) \
            .withColumn("number_of_veterans",col("number_of_veterans").cast("int")) \
            .withColumn("Foreign-born",col("Foreign-born").cast("int")) \
            .withColumn("avg_household_size",col("avg_household_size").cast("int")) \
            .withColumn("total_population",col("total_population").cast("int")) 

df_spark_demog.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- male_population: string (nullable = true)
 |-- female_population: string (nullable = true)
 |-- total_population: string (nullable = true)
 |-- number_of_veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- avg_household_size: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



### Step 3: Define the Data Model


#### 3.1 Conceptual Data Model
The conceptual data model is as follows
 - Fact Table: based on the i94 immigration data
 - Dimension tables:
    - Traveller: This contains demographic data about the individual traveller
    - Port Information: details about the port through which the traveller entered
    - Destination Demographics: details about the stated destination of the traveller
    - Arrival Date: Contains Date information regarding the arrival date  
    - Mode: Mode of travel




#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

- Gather data into spark dataframes: 
    a dataframe will be create for each source dataset, including additional information from the label data
- Clean and repare data
    cleaning steps will be performed to ensure the data is of an acceptable quality
- Create star schema data lake
    - extract dimension table data from sources
    - create fact table 
    - write dataframes to parquet
- Perform data validations


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [25]:
# Write code here
print("extract columns to create traveller table...")
#Traveler Dim table
df_dim_traveler = df_spark_immigration[['cicid','i94cit','i94res','i94bir','biryear','gender']]
df_spark_immigration = df_spark_immigration.drop('i94cit','i94res','i94bir','biryear','gender')
#citizenship info
df_dim_traveler = df_dim_traveler.join(df_spark_label, (df_dim_traveler.i94cit == df_spark_label.code))
df_dim_traveler = df_dim_traveler.drop('i94cit','key')
df_dim_traveler = df_dim_traveler.withColumnRenamed('value','cit').withColumnRenamed('code','i94cit')
#residence info
df_dim_traveler = df_dim_traveler.join(df_spark_label.alias("lbl"), (df_dim_traveler.i94cit == df_spark_label.alias("lbl").code))
df_dim_traveler = df_dim_traveler.drop('i94res','key')
df_dim_traveler = df_dim_traveler.withColumnRenamed('value','res').withColumnRenamed('code','i94res')

#data validation dict
dv_dict = {"traveller" : df_dim_traveler.count()}

#write to S3
df_dim_traveler.write.mode('overwrite').parquet(os.path.join(output_data, 'dim_traveller/traveller.parquet'))

extract columns to create traveller table...


In [83]:
#Date Dim table
spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")
print("extract columns to create time table...")
time_table = df_spark_immigration.select('arrdate') \
                .withColumn('day', dm('arrdate')) \
                .withColumn('week', wk('arrdate')) \
                .withColumn('month', m('arrdate')) \
                .withColumn('year', y('arrdate')) \
                .withColumn('weekday', dw('arrdate')) \
                .dropDuplicates()
#DV
dv_dict['time'] = time_table.count()
#write to s3
time_table.write.mode('overwrite').partitionBy("year", "month").parquet(os.path.join(output_data, 'dim_time/time.parquet'))

extract columns to create time table...


In [48]:
#Port Dim Table
print("extract columns to create port table...")
df_dim_port = spark.read.option("header", True).csv("airport-codes_csv.csv")
df_dim_port.dropna(subset='iata_code')

#ports = [data[0] for data in df_spark_immigration.select('i94port').collect()]
#df_dim_port.filter(df_dim_port("iata_code").isin(ports))
#df_dim_port.filter(col("iata_code").isin(ports))

#DV
dv_dict['port'] = df_dim_port.count()

#write to s3
df_dim_port.write.mode('overwrite').parquet(os.path.join(output_data, 'dim_port/port.parquet'))

extract columns to create port table...


In [60]:
#Destination demographics dim table
print("extract columns to create demographics table...")
df_spark_demog = df_spark_demog.groupBy("state_code", "State") \
            .agg(avg("male_population").alias("male_population_avg"), 
               avg("female_population").alias("female_population_avg"),
               avg("number_of_veterans").alias("number_of_veterans_avg"),
               avg("Foreign-born").alias("foreign_born_avg"),
               avg("avg_household_size").alias("avg_household_size"),
               avg("total_population").alias("avg_total_population"))

dv_dict['demo'] = df_spark_demog.count()
df_spark_demog.write.mode('overwrite').parquet(os.path.join(output_data, 'dim_demo/demographics.parquet'))

extract columns to create demographics table...


In [61]:
#Mode Dim table
print("extract columns to create mode table...")
df_dim_mode = df_spark_label.select('code','value').where(df_spark_label.key == 'i94mode')

#DV
dv_dict['mode'] = df_dim_mode.count()

#write to s3
df_dim_mode.write.mode('overwrite').parquet(os.path.join(output_data, 'dim_mode/mode.parquet'))

extract columns to create mode table...


In [84]:
#Fact table
print("extract columns to create fact table...")
df_fact = df_spark_immigration.select('cicid', 'i94yr', 'i94mon', 'i94port', 'i94mode', 'i94addr', 'arrdate', 'depdate', 'dtaddto', 'airline', 'fltno', 'i94visa', 'visatype').drop_duplicates()

#DV
dv_dict['fact'] = df_fact.count()

#write to s3
df_fact.write.mode('overwrite').parquet(os.path.join(output_data, 'fact_immigrations/immigrations.parquet'))

extract columns to create fact table...


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [90]:
# Perform quality checks here
dv_fact = spark.read.parquet(os.path.join(output_data, 'fact_immigrations/immigrations.parquet'))
test = dv_dict['fact'] == dv_fact.count()
print("Data validation for {} table = {}".format("fact", test))

Data validation for fact table = True


In [91]:
#test Join
dv_traveller = spark.read.parquet(os.path.join(output_data, 'dim_traveller/traveller.parquet'))

test_table = dv_fact.join(dv_traveller, (dv_fact.cicid == dv_traveller.cicid))

test_table.count()

2710124

#### 4.3 Data dictionary 
## Dimension Tables

### Traveller
| field   | type   | key  | source                      |
|---------|--------|------|-----------------------------|
| cicid   | int    | (PK) | i94 Immigration data        |
| i94bir  | int    |      | i94 Immigration data        |
| biryear | int    |      | i94 Immigration data        |
| gender  | string |      | i94 Immigration data        |
| i94cit  | string |      | i94 Immigration data        |
| cit     | string |      | I94_SAS_Labels_Descriptions |
| i94res  | string |      | i94 Immigration data        |
| res     | string |      | I94_SAS_Labels_Descriptions |

### Time
| field   | type   | key  | source               |
|---------|--------|------|----------------------|
| arrdate | int    | (PK) | i94 Immigration data |
| day     | int    |      | derived from arrdate |
| week    | int    |      | derived from arrdate |
| month   | string |      | derived from arrdate |
| year    | string |      | derived from arrdate |
| weekday | string |      | derived from arrdate |

### Port
| field        | type   | key  | source            |
|--------------|--------|------|-------------------|
| ident        | string |      | airport-codes_csv |
| type         | string |      | airport-codes_csv |
| name         | string |      | airport-codes_csv |
| elevation_ft | string |      | airport-codes_csv |
| continent    | string |      | airport-codes_csv |
| iso_country  | string |      | airport-codes_csv |
| iso_region   | string |      | airport-codes_csv |
| municipality | string |      | airport-codes_csv |
| gps_code     | string |      | airport-codes_csv |
| iata_code    | string | (PK) | airport-codes_csv |
| local_code   | string |      | airport-codes_csv |
| coordinates  | string |      | airport-codes_csv |

### Demographics
| field                  | type   | key  | source                 |
|------------------------|--------|------|------------------------|
| City                   | string |      | us-cities-demographics |
| State                  | string |      | us-cities-demographics |
| median_age             | string |      | us-cities-demographics |
| male_population        | string |      | us-cities-demographics |
| female_population      | string |      | us-cities-demographics |
| total_population       | string |      | us-cities-demographics |
| number_of_veterans     | string |      | us-cities-demographics |
| Foreign-born           | string |      | us-cities-demographics |
| Average Household Size | string |      | us-cities-demographics |
| state_code             | string | (PK) | us-cities-demographics |
| Race                   | string |      | us-cities-demographics |
| Count                  | string |      | us-cities-demographics |

### Mode
| field                  | type   | key  | source                      |
|------------------------|--------|------|-----------------------------|
| code                   | string | (PK) | i94 Immigration data        |
| value                  | string |      | I94_SAS_Labels_Descriptions |

## Fact Table
| field    | type    | key      | source               |
|----------|---------|----------|----------------------|
| cicid    | integer | (PK)(FK) | i94 Immigration data |
| i94yr    | double  |          | i94 Immigration data |
| i94mon   | double  |          | i94 Immigration data |
| i94port  | string  | (FK)     | i94 Immigration data |
| i94mode  | double  | (FK)     | i94 Immigration data |
| i94addr  | string  | (FK)     | i94 Immigration data |
| arrdate  | string  | (FK)     | i94 Immigration data |
| depdate  | string  |          | i94 Immigration data |
| dtaddto  | string  |          | i94 Immigration data |
| airline  | string  |          | i94 Immigration data |
| fltno    | string  |          | i94 Immigration data |
| i94visa  | double  |          | i94 Immigration data |
| visatype | string  |          | i94 Immigration data |

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
This project uses pandas and spark to analyse and to transform and load the data.  Pandas was initially used to view and analyze the data due to its ease of use.  Spark was subsequently used sue to its robust toolset.  furthermore, should the datasets grow so a signigicantly large size. Spark's ability to scale to cluster processing and its lazy evaluation make it a natual choice for handling large datasets.


* Propose how often the data should be updated and why.
 - This data should be updated on a monthly basis.  The i94 data is already segmented by `i94yr` and `i94mon` on the file.
 - For ongoing data loads at this scale, a simple cron job would suffice.  Howver, moving to an orchestration tool such as Airflow would be beneficial due to the may advanced features

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
    - The data should be moved to cloud storage for both staging and for the data lake, Apache Cassandra would be beneficial due to its distributed storage
    - A spark chuster would be required to handle the data since it can use parrallel processing, additionally the memory useage of spark would require significantly more resources than what is available on local hardware.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
    - Apache Airflow is an ideal choice for scheduling in monitoring
 * The database needed to be accessed by 100+ people.
    - A chould based datawarehouse such ar Resdift or Snowflake that can handle many concurrent users would be a good solution