# ETL Pipeline of Immigration, Temperature, US Demographics
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

#### Import Libraries

In [1]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import os
import configparser
import datetime, time
import datetime as dt

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear, lit, explode, regexp_extract, isnull, desc, when, sum, to_date, desc, regexp_replace, count, to_timestamp, date_format
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import date_add as d_add
from pyspark.sql.functions import avg
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType, StringType, IntegerType, FloatType
from pyspark.sql import types as T
from pyspark.sql import functions as F
from pyspark.sql import Row
from pyspark.sql import GroupedData, HiveContext

import requests
requests.packages.urllib3.disable_warnings()

import importlib
import tools
import etl_functions

#### Load AWS Credentials

In [2]:
config = configparser.ConfigParser()
config.read('config.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

#### Create Spark Session

In [7]:
spark = SparkSession.builder.getOrCreate()
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included?

# Step 1: Scope the Project and Gather Data

## Project Scope 
<hr></hr>

To create the analytics database, the following steps will be carried out:
* Use Spark to load the data into dataframes.
* Exploratory data analysis of I94 immigration dataset to identify missing values and strategies for data cleaning.
* Exploratory data analysis of demographics dataset to identify missing values and strategies for data cleaning.
* Exploratory data analysis of global land temperatures by city dataset to identify missing values and strategies for data cleaning.
* Perform data cleaning functions on all the datasets.
* Create dimension tables.
    * Create immigration calendar dimension table from I94 immigration dataset, this table links to the fact table through the arrdate field.
    * Create country dimension table from the I94 immigration and the global temperatures dataset. The global land temperatures data was aggregated at country level. The table links to the fact table through the country of residence code allowing analysts to understand correlation between country of residence climate and immigration to US states. 
    * Create usa demographics dimension table from the us cities demographics data. This table links to the fact table through the state code field. 
    
* Create fact table from the clean I94 immigration dataset and the visa_type dimension.

<p>
The technology used in this project is <b>Amazon S3, Apache Spark</b>. Data will be read and staged from the customers repository using Spark.
</p>

While the whole project has been implemented on this notebook, provisions has been made to run the ETL on a spark cluster through etl.py. The etl.py script reads data from S3 and creates fact and dimesion tables through Spark that are loaded back into S3.

## Data Load and Descriptions
---
#### I94 Immigration Data Description 
<hr style="background-color: #b7d0e2;"/> 

This data comes from the US National Tourism and Trade Office. In the past all foreign visitors to the U.S. arriving via air or sea were required to complete paper Customs and Border Protection Form I-94 Arrival/Departure Record or Form I-94W Nonimmigrant Visa Waiver Arrival/Departure Record and this dataset comes from this forms. 

This dataset forms the core of the data warehouse and the customer repository has a years worth of data for the year 2016 and the dataset is divided by month. For this project the data is in a folder located at ../../data/18-83510-I94-Data-2016/. Each months data is stored in an SAS binary database storage format <i>sas7bdat</i>. For this project we have chosen going to work with data for the month of April. However, the data extraction, transformation and loading utility functions have been designed to work with any month's worth of data.

#### Load Immigration Dataset

In [8]:
# Option 1) Full dataset
# Load the immigration dataset using workspace folder in parquet format & create dataframe
df_immigration =spark.read.load('./sas_data')


# Option 2) Smaller sample immigration dataset to use for testing purposes
#file_name = "immigration_data_sample.csv"
#df_immigration = spark.read.csv(file_name, inferSchema=True, header=True)

In [5]:
# Option used when reading the DATA folder......see differences

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df_immigration =spark.read.load('./sas_data')
print("Completed without error")

Completed without error


In [5]:
# Use with Option 2 only to drop extra unused column when using smaller immigration sample csv file
df_immigration=df_immigration.drop('_c0')

In [6]:
# Total number of records
df_immigration.count()

3096313

In [24]:
# View first 5 records
df_immigration.limit(5).toPandas().head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [6]:
# List all columns
df_immigration.columns

['cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'i94addr',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'dtadfile',
 'visapost',
 'occup',
 'entdepa',
 'entdepd',
 'entdepu',
 'matflag',
 'biryear',
 'dtaddto',
 'gender',
 'insnum',
 'airline',
 'admnum',
 'fltno',
 'visatype']

<b><i>Immigration Data Dictionary</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">cicid</td><td class="tg-0pky">Unique record ID</td>
 <tr><td class="tg-0pky">i94yr</td><td class="tg-0pky">4 digit year</td>
 <tr><td class="tg-0pky">i94mon</td><td class="tg-0pky">Numeric month</td>
 <tr><td class="tg-0pky">i94cit</td><td class="tg-0pky">3 digit code for traveler's birth country</td>
 <tr><td class="tg-0pky">i94res</td><td class="tg-0pky">3 digit code for traveler's country of residence </td>
 <tr><td class="tg-0pky">i94port</td><td class="tg-0pky">Port of admission</td>
 <tr><td class="tg-0pky">arrdate</td><td class="tg-0pky">Arrival Date into the US</td>
 <tr><td class="tg-0pky">i94mode</td><td class="tg-0pky">Mode of transportation; 1 = Air; 2 = Sea; 3 = Land; 9 = Unknown</td>
 <tr><td class="tg-0pky">i94addr</td><td class="tg-0pky">US State of arrival</td>
 <tr><td class="tg-0pky">depdate</td><td class="tg-0pky">Departure Date from the US</td>
 <tr><td class="tg-0pky">i94bir</td><td class="tg-0pky">Birth Year</td>
 <tr><td class="tg-0pky">i94visa</td><td class="tg-0pky">Visa codes</td>
 <tr><td class="tg-0pky">count</td><td class="tg-0pky">Field used for summary statistics</td>
 <tr><td class="tg-0pky">dtadfile</td><td class="tg-0pky">Character Date Field - Date added to I-94 Files</td>
 <tr><td class="tg-0pky">visapost</td><td class="tg-0pky">Department of State where Visa was issued </td>
 <tr><td class="tg-0pky">occup</td><td class="tg-0pky">Occupation that will be conducted in U.S</td>
 <tr><td class="tg-0pky">entdepa</td><td class="tg-0pky">Arrival Flag</td>
 <tr><td class="tg-0pky">entdepd</td><td class="tg-0pky">Departure Flag</td>
 <tr><td class="tg-0pky">entdepu</td><td class="tg-0pky">Update Flag</td>
 <tr><td class="tg-0pky">matflag</td><td class="tg-0pky">Match flag</td>
 <tr><td class="tg-0pky">biryear</td><td class="tg-0pky">4 digit year of birth</td>
 <tr><td class="tg-0pky">dtaddto</td><td class="tg-0pky">Character Date Field; Date to which admitted to U.S.</td>
 <tr><td class="tg-0pky">gender</td><td class="tg-0pky">Gender</td>
 <tr><td class="tg-0pky">insnum</td><td class="tg-0pky">INS number</td>
 <tr><td class="tg-0pky">airline</td><td class="tg-0pky">Arrival Airline</td>
 <tr><td class="tg-0pky">admnum</td><td class="tg-0pky">Admission Number</td>
 <tr><td class="tg-0pky">fltno</td><td class="tg-0pky">Arrival Airline Flight number</td>
 <tr><td class="tg-0pky">visatype</td><td class="tg-0pky">Class of admission</td>
</table>

#### US City Demographic Data Description 
<hr style="background-color: #b7d0e2;"/> 
Describe Demographics Data here

In [6]:
# Load data via CSV file in workspace
file_name = "us-cities-demographics.csv"
df_demographics = spark.read.csv(file_name, inferSchema=True, header=True, sep=';')

In [5]:
# Total number of records
df_demographics.count()

2891

In [5]:
# View first 5 records
df_demographics.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [6]:
# List out all columns
df_demographics.columns

['City',
 'State',
 'Median Age',
 'Male Population',
 'Female Population',
 'Total Population',
 'Number of Veterans',
 'Foreign-born',
 'Average Household Size',
 'State Code',
 'Race',
 'Count']

<b><i>US City Demographics Data Dictionary</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">City</td><td class="tg-0pky">City Name</td>
 <tr><td class="tg-0pky">State</td><td class="tg-0pky">State of city</td>
 <tr><td class="tg-0pky">Median Age</td><td class="tg-0pky">Median age of population</td>
 <tr><td class="tg-0pky">Male Population</td><td class="tg-0pky">Male Population of city</td>
 <tr><td class="tg-0pky">Female Population</td><td class="tg-0pky">Female Population of city</td>
 <tr><td class="tg-0pky">Total Population</td><td class="tg-0pky">Total Population of city</td>
 <tr><td class="tg-0pky">Number of Veterans</td><td class="tg-0pky">Veteran Population in city</td>
 <tr><td class="tg-0pky">Foreign Born</td><td class="tg-0pky">Population of residents not born in city</td>
 <tr><td class="tg-0pky">Average Household Size</td><td class="tg-0pky">Average city household size</td>
 <tr><td class="tg-0pky">State Code</td><td class="tg-0pky">Code of US State</td>
 <tr><td class="tg-0pky">Race</td><td class="tg-0pky">Traveler's race</td>
 <tr><td class="tg-0pky">Count</td><td class="tg-0pky">Count of city's individual per race</td>
</table>

#### World Temperature Data Description 
<hr style="background-color: #b7d0e2;"/> 
Describe Temp data here.

In [7]:
file_name = 'GlobalLandTemperaturesByCity.csv'
df_temperature = spark.read.csv(file_name, header=True, inferSchema=True)

In [6]:
df_temperature.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [7]:
df_temperature.count()

8599212

<b><i>World Temperature Data Dictionary</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">dt</td><td class="tg-0pky">Date</td>
 <tr><td class="tg-0pky">AverageTemperature</td><td class="tg-0pky">Global average land temperature in celsius</td>
 <tr><td class="tg-0pky">AverageTemperatureUncertainty</td><td class="tg-0pky">95% confidence interval around the average</td>
 <tr><td class="tg-0pky">City</td><td class="tg-0pky">City Name</td>
 <tr><td class="tg-0pky">Country</td><td class="tg-0pky">Country Name</td>
 <tr><td class="tg-0pky">Latitude</td><td class="tg-0pky">Latitude of City</td>
 <tr><td class="tg-0pky">Longitude</td><td class="tg-0pky">Longitude of City</td>
</table>

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [6]:
# Number of immigration files in the I94 dataset archive
files = os.listdir('../../data/18-83510-I94-Data-2016/')
print("There are a total of {} files in the I94 dataset.".format(len(files)))
files

There are a total of 12 files in the I94 dataset.


['i94_apr16_sub.sas7bdat',
 'i94_sep16_sub.sas7bdat',
 'i94_nov16_sub.sas7bdat',
 'i94_mar16_sub.sas7bdat',
 'i94_jun16_sub.sas7bdat',
 'i94_aug16_sub.sas7bdat',
 'i94_may16_sub.sas7bdat',
 'i94_jan16_sub.sas7bdat',
 'i94_oct16_sub.sas7bdat',
 'i94_jul16_sub.sas7bdat',
 'i94_feb16_sub.sas7bdat',
 'i94_dec16_sub.sas7bdat']

In [7]:
# Size of the I94 files
for file in files:
    size = os.path.getsize('{}/{}'.format('../../data/18-83510-I94-Data-2016/', file))
    print('{} - dim(bytes): {}'.format(file, size))

i94_apr16_sub.sas7bdat - dim(bytes): 471990272
i94_sep16_sub.sas7bdat - dim(bytes): 569180160
i94_nov16_sub.sas7bdat - dim(bytes): 444334080
i94_mar16_sub.sas7bdat - dim(bytes): 481296384
i94_jun16_sub.sas7bdat - dim(bytes): 716570624
i94_aug16_sub.sas7bdat - dim(bytes): 625541120
i94_may16_sub.sas7bdat - dim(bytes): 525008896
i94_jan16_sub.sas7bdat - dim(bytes): 434176000
i94_oct16_sub.sas7bdat - dim(bytes): 556269568
i94_jul16_sub.sas7bdat - dim(bytes): 650117120
i94_feb16_sub.sas7bdat - dim(bytes): 391905280
i94_dec16_sub.sas7bdat - dim(bytes): 523304960


#### Immigration Data Clean
1) Clean the data types from DOUBLE to INTEGER since double is not most useful type

In [61]:
# View dataset schema
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [8]:
toInt = udf(lambda x: int(x) if x!=None else x, IntegerType())

for colname, coltype in df_immigration.dtypes:
    if coltype == 'double':
        df_immigration = df_immigration.withColumn(colname, toInt(colname))

In [8]:
# Verify schema is showing updated data type of INTEGER
df_immigration.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: integer (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: integer (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- dtadfile: integer (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: integer (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: integ

2)  Convert depdate's SAS format to datetime format since SAS format are numbers that represent the number of days from Jan 1, 1960.

In [9]:
# Function to convert arrdate & depdate to datetime format using udf

def convert_to_datetime(date):

    if date is not None:
        return pd.Timestamp('1960-1-1')+pd.to_timedelta(date, unit='D')
convert_to_datetime_udf = udf(convert_to_datetime, DateType())
df_immigration = df_immigration.withColumn('arrdate',convert_to_datetime_udf(col('arrdate')))
df_immigration = df_immigration.withColumn('depdate',convert_to_datetime_udf(col('depdate')))

In [16]:
# Confirming arrdate & depdate is now in yyyy-mm-dd format.
df_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4084316,2016,4,209,209,HHW,2016-04-22,1,HI,2016-04-29,...,,M,1955,7202016,F,,JL,748099785,00782,WT
1,4422636,2016,4,582,582,MCA,2016-04-23,1,TX,2016-04-24,...,,M,1990,10222016,M,,*GA,-127284582,XBLNG,B2
2,1195600,2016,4,148,112,OGG,2016-04-07,1,FL,2016-04-27,...,,M,1940,7052016,M,,LH,-54106415,00464,WT
3,5291768,2016,4,297,297,LOS,2016-04-28,1,CA,2016-05-07,...,,M,1991,10272016,M,,QR,300415518,00739,B2
4,985523,2016,4,111,111,CHM,2016-04-06,3,NY,2016-04-09,...,,M,1997,7042016,F,,,-627100327,LAND,WT


3) i94 Null Check, missing values

In [8]:
# Raw, code to find if any columns have nulls
df_immigration.select(*[
    (
        F.count(F.when((F.isnan(c) | F.col(c).isNull()), c)) if t not in ("timestamp", "date")
        else F.count(F.when(F.col(c).isNull(), c))
    ).alias(c)
    for c, t in df_immigration.dtypes if c in df_immigration.columns
]).show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender| insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|    239| 152592| 142457|   802|      0|    0|       1| 1881250|3088187|    238| 138429|3095921| 138429|    802|    477|414269|2982605|  83627|     0|19549|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+---

<u>i94 Null Analysis </u>  
*entepu(99% empty)= Update flag;apprehended,overstayed, or adjusted due to permanent residence.  
*insum(96% empty)= INS number  
*Occup(99% empty)= Occupation to be conducted in US.  
  
These columns contain >90% empty values thus will be removed from the i94 data set.  
Remaining columns had nulls ranging from 1% to 60%, which still contain a solid amount of data thus will included in the data set.

In [10]:
#Removing the >90% null columns in the i94 dataset
cols = ['occup', 'entdepu','insnum']

df_new_immigration = df_immigration.drop(*cols)

In [12]:
# display the new schema with the 3 columns dropped
df_new_immigration.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: date (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- dtadfile: integer (nullable = true)
 |-- visapost: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: integer (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



4) i94 Duplicate Check

In [11]:
# i94 Duplicate check using CICID as identifier since its unique
df_final_immigration = df_new_immigration.dropDuplicates(['cicid'])

In [15]:
df_final_immigration.count()

3096313

<u> i94 Duplicate Analysis </u>  
No duplicates found in i94 dataset.

In summary, 3 columns were dropped due to nulls while no rows removed.

#### Demographics Data Clean

1) Null check, missing values

In [6]:
# Now checking Demographics for nulls
df_demographics.select(*[
    (
        F.count(F.when((F.isnan(c) | F.col(c).isNull()), c)) if t not in ("timestamp", "date")
        else F.count(F.when(F.col(c).isNull(), c))
    ).alias(c)
    for c, t in df_demographics.dtypes if c in df_demographics.columns
]).show()

+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race|Count|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|   0|    0|         0|              3|                3|               0|                13|          13|                    16|         0|   0|    0|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+



In [10]:
# Table showing null columns and how much is null% within that column

# View columns with missing data
nulls_df = pd.DataFrame(data= df_demographics.toPandas().isnull().sum(), columns=['null values'])
nulls_df = nulls_df.reset_index()
nulls_df.columns = ['column', 'null values']

# calculate % missing values
nulls_df['% missing'] = 100*nulls_df['null values']/df_demographics.count()
nulls_df[nulls_df['% missing']>0]

Unnamed: 0,column,null values,% missing
3,Male Population,3,0.10377
4,Female Population,3,0.10377
6,Number of Veterans,13,0.449671
7,Foreign-born,13,0.449671
8,Average Household Size,16,0.553442


<u>Demographics null analysis</u>  
*Male Population(3 empty)= Male population of city  
*Female Population(3 empty)= Female population of city  
*Number of Veterans(13 empty)= Veteran population in city  
*Foreign-born(13 empty)= Population of residents not born in city  
*Average Household Size(16 empty)= Average household size of city  
  
5 columns contained nulls and those columns have less than only 1% empty values.
So will keep columns intact and just remove the ROWS with nulls since columns still have solid amount of data.

In [12]:
# drop ROWS(not columns) with missing values
subset_cols = [
    'Male Population',
    'Female Population',
    'Number of Veterans',
    'Foreign-born',
    'Average Household Size'
]
    
df_new_demographics = df_demographics.dropna(subset=subset_cols)
    
rows_dropped = df_demographics.count()-df_new_demographics.count()
print("Rows in Demographic dataset dropped with missing values: {}".format(rows_dropped))

Rows in Demographic dataset dropped with missing values: 16


2) Demographic Duplicate check

In [13]:
# Duplicate check for demography using city + state + state code + race, 
# These unique identifiers ensures row entry is indeed a duplicate

df_final_demographics = df_new_demographics.drop_duplicates(subset=['City', 'State', 'State Code', 'Race'])
rows_dropped = df_new_demographics.count()-df_final_demographics.count()
print("Rows in Demographic dataset dropped due to duplicate row entries: {}".format(rows_dropped))

Rows in Demographic dataset dropped due to duplicate row entries: 0


<u>Demographics Duplicate Analysis</u>  
Using unique identifiers of (City, State, State Code, Race), no duplicate row entries were found.

#### Temperature Data Clean

1) Convert dt column datatype: timestamp -> string

In [14]:
df_new_temperature = df_temperature.withColumn("dt",col("dt").cast(StringType()))

In [13]:
df_new_temperature.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



2) Null check, missing values

In [32]:
# Final null check on 3rd table: World Temperature

df_new_temperature.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_new_temperature.columns]).show()

+---+------------------+-----------------------------+----+-------+--------+---------+
| dt|AverageTemperature|AverageTemperatureUncertainty|City|Country|Latitude|Longitude|
+---+------------------+-----------------------------+----+-------+--------+---------+
|  0|            364130|                       364130|   0|      0|       0|        0|
+---+------------------+-----------------------------+----+-------+--------+---------+



In [15]:
# Since only 4% of the Average Temperature column is null, the column is worth keeping
# So drop the row entries with the missing values in Ave Temp

total_records = df_new_temperature.count()
print(f'Total records in Temp dataframe before null removal: {total_records:,}')    
    
df_null_temperature = df_new_temperature.dropna(subset=['AverageTemperature'])
    
rows_dropped = df_new_temperature.count()-df_null_temperature.count()
print("Rows in Temperature dataset dropped with missing values: {}".format(rows_dropped))

total_records2 = df_null_temperature.count()
print(f'Total records in Temp dataframe after null removal: {total_records2:,}')  

Total records in Temp dataframe before null removal: 8,599,212
Rows in Temperature dataset dropped with missing values: 364130
Total records in Temp dataframe after null removal: 8,235,082


<u>Temperature Null Analysis</u>  
Since only 4% of the Average Temperature column is null, the column is worth keeping 

So drop the row entries with the missing values in Ave Temp.

364,130 rows contained missing Ave Temp and are dropped.

3) Temperature Duplicate Check & Remove

In [16]:
# Need a unique identifier that isolates duplicates in Temp data:  use dt + city + country

df_final_temperature = df_null_temperature.drop_duplicates(subset=['dt','City','Country'])
rows_dropped = df_null_temperature.count()-df_final_temperature.count()
print("Rows in Temperature dataset dropped due to duplicate row entries: {}".format(rows_dropped))

print("Total records in Temperature dataset after duplicate clean:{}".format(df_final_temperature.count()))

Rows in Temperature dataset dropped due to duplicate row entries: 44299
Total records in Temperature dataset after duplicate clean:8190783


<u>Temperature Dataset Duplicate Analysis</u>  
Using unique identifiers of (dt, city, country), 44,299 duplicate row entries were found and dropped.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [17]:
# Before creating tables in jupyter test environment,
# Create a temporary Parquet directory for tables to be stored locally for notebook purposes
output_data = "tables/"

#### Create timedate dimension table
Unique identifier created using Monotonically increasing ID and added as new column.

In [18]:
def create_immigration_time(df, output_data):
    """This function creates an immigration time table based on arrival date
    
    params 
        df:          spark dataframe of immigration events
        output_data: path to write dimension dataframe to
    """
    
    # create initial calendar df from arrdate column
    time_df = df.select(['arrdate'])
    
    # expand df by adding other calendar columns
    time_df = time_df.withColumn('arrival_day', dayofmonth('arrdate'))
    time_df = time_df.withColumn('arrival_week', weekofyear('arrdate'))
    time_df = time_df.withColumn('arrival_month', month('arrdate'))
    time_df = time_df.withColumn('arrival_year', year('arrdate'))
    time_df = time_df.withColumn('arrival_weekday', dayofweek('arrdate'))

    # create an id field in calendar df
    time_df = time_df.withColumn('id', monotonically_increasing_id())
    
    # write the calendar dimension to parquet file
    partition_columns = ['arrival_year', 'arrival_month', 'arrival_week']
    time_df.write.parquet(output_data + "time", partitionBy=partition_columns, mode="overwrite")
    
    return time_df

In [19]:
time_df = create_immigration_time(df_final_immigration, output_data)
time_df.limit(5).toPandas()

Unnamed: 0,arrdate,arrival_day,arrival_week,arrival_month,arrival_year,arrival_weekday,id
0,2016-04-13,13,15,4,2016,4,0
1,2016-04-25,25,17,4,2016,2,1
2,2016-04-08,8,14,4,2016,6,2
3,2016-04-25,25,17,4,2016,2,3
4,2016-04-02,2,13,4,2016,7,4


#### Create country dimension table

In [21]:
# This method creates a country code --> country name dictionary from the SAS file, then use the dictionary 
# to covert to a CSV file

# Opening the SAS Label Descriptions File 
with open("I94_SAS_Labels_Descriptions.SAS") as sas_file:
        file_contents = sas_file.readlines()

In [29]:
# Extracting country code from the SAS file and splitting the code 

dictcountry_code = {}

# For loop for in input country variable in the file contents of the SAS file
for input_countries in file_contents[10:298]:
    # Splitting the contents
    pair = input_countries.split('=')
    # Code, country pair values
    code, country = pair[0].strip(), pair[1].strip().strip("'")
    # storing the final results
    dictcountry_code[code] = country

In [30]:
# Converting the country code and country name from the list into the dataframe
df_dictcountry_code = pd.DataFrame(list(dictcountry_code.items()), columns=['code', 'Name'])

# Previewing the 5 rows from country code dataframe
df_dictcountry_code.head(5)

Unnamed: 0,code,Name
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


In [None]:
# Converting dataframe to CSV file
df_dictcountry_code.to_csv('i94res.csv', index=False)

In [20]:
def create_country(df, temp_df, output_data):
    """This function creates a country dimension from the immigration and global land temperatures data.
    
    :param df: spark dataframe of immigration events
    :temp_df: spark dataframe of global land temperatures data.
    :param output_data: path to write dimension dataframe to
    :return: spark dataframe representing calendar dimension
    """
    # get the aggregated temperature data
    agg_temp = tools.aggregate_temperature_data(temp_df).toPandas()
    # load the i94res to country mapping data
    mapping_codes = pd.read_csv('i94res.csv')
    
    @udf('string')
    def get_country_average_temperature(name):
        print("Processing: ", name)
        avg_temp = agg_temp[agg_temp['Country']==name]['average_temperature']
        
        if not avg_temp.empty:
            return str(avg_temp.iloc[0])
        
        return None
    
    @udf()
    def get_country_name(code):
        name = mapping_codes[mapping_codes['code']==code]['Name'].iloc[0]
        
        if name:
            return name.title()
        return None
        
    
    # select i94res column
    dim_df = df.select(['i94res']).distinct() \
                .withColumnRenamed('i94res', 'country_code')
    
    # create country_name column
    dim_df = dim_df.withColumn('country_name', get_country_name(dim_df.country_code))
    
    # create average_temperature column
    dim_df = dim_df.withColumn('average_temperature', get_country_average_temperature(dim_df.country_name))
    
    # write the dimension to a parquet file
    dim_df.write.parquet(output_data + "country", mode="overwrite")
    
    return dim_df

In [21]:
country_df = create_country(df_final_immigration, df_final_temperature, output_data)
country_df.limit(5).toPandas()

Unnamed: 0,country_code,country_name,average_temperature
0,243,Burma,26.0168399893
1,516,Trinidad And Tobago,
2,251,Israel,19.0076965915
3,296,United Arab Emirates,26.5726805482
4,513,Barbados,


#### Create Demographics dimension table

In [22]:
def create_demographics(df, output_data):
    """This function creates a us demographics dimension table from the us cities demographics data.
    
    params 
        df: spark dataframe of US demographics survey data
        output_data: path to write dimension dataframe to
    """
    dim_df = df.withColumnRenamed('Median Age','median_age') \
            .withColumnRenamed('Male Population', 'male_population') \
            .withColumnRenamed('Female Population', 'female_population') \
            .withColumnRenamed('Total Population', 'total_population') \
            .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
            .withColumnRenamed('Foreign-born', 'foreign_born') \
            .withColumnRenamed('Average Household Size', 'average_household_size') \
            .withColumnRenamed('State Code', 'state_code')
    # Add an id column
    dim_df = dim_df.withColumn('id', monotonically_increasing_id())
    
    # write dimension to parquet file
    dim_df.write.parquet(output_data + "demographics", mode="overwrite")
    
    return dim_df

In [23]:
demographics_df = create_demographics(df_final_demographics, output_data)
demographics_df.limit(5).toPandas()

Unnamed: 0,City,State,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,Race,Count,id
0,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723,0
1,Wilmington,North Carolina,35.5,52346,63601,115947,5908,7401,2.24,NC,Asian,3152,1
2,Tampa,Florida,35.3,175517,193511,369028,20636,58795,2.47,FL,Hispanic or Latino,95154,2
3,Gastonia,North Carolina,36.9,35527,39023,74550,3537,5715,2.67,NC,Asian,2788,3
4,Tyler,Texas,33.9,50422,53283,103705,4813,8225,2.59,TX,American Indian and Alaska Native,1057,4


#### Create Visatype dimension table

In [24]:
def create_visa_type(df, output_data):
    """This function creates a visa type dimension from the immigration data.
    
    params:
        df:          spark dataframe of immigration events
        output_data: path to write dimension dataframe to
    """
    # create visatype df from visatype column, distinct because need to classify each type only once
    visatype_df = df.select(['visatype']).distinct()
    
    # add an id column
    visatype_df = visatype_df.withColumn('visa_type_key', monotonically_increasing_id())
    
    # write dimension to parquet file
    visatype_df.write.parquet(output_data + "visatype", mode="overwrite")
    
    return visatype_df

In [25]:
visatype_df = create_visa_type(df_final_immigration, output_data)
visatype_df.limit(5).toPandas()

Unnamed: 0,visatype,visa_type_key
0,F2,103079215104
1,B2,369367187456
2,F1,498216206336
3,WB,738734374912
4,M1,747324309504


#### Create Immigration Fact Table

In [26]:
# Visatype dim table needs to be created 1st before running 

def create_immigration(df, output_data):
    """This function creates an country dimension from the immigration and global land temperatures data.
    
    params 
        df: spark dataframe of immigration events
        visa_type_df: spark dataframe of global land temperatures data.
        output_data: path to write dimension dataframe to
    """

    # get visa_type dimension
    dim_df = get_visa_type_dimension(output_data).toPandas()
    
    @udf('string')
    def get_visa_key(visa_type):
        """user defined function to get visa key
        
        params 
            visa_type: US non-immigrant visa type
        """
        key_series = dim_df[dim_df['visatype']==visa_type]['visa_type_key']
        
        if not key_series.empty:
            return str(key_series.iloc[0])
        
        return None
    
    
    # rename columns to align with data model
    df = df.withColumnRenamed('cicid','record_id') \
            .withColumnRenamed('i94res', 'country_residence_code') \
            .withColumnRenamed('i94addr', 'state_code') 
    
    # create visa_type key
    df = df.withColumn('visa_type_key', get_visa_key('visatype'))
    
    
    # removing visatype column because now moved into the visa_type dimension table 
    df = df.drop('visatype')
    
    # write dimension to parquet file
    df.write.parquet(output_data + "immigration_fact", mode="overwrite")
    
    return df

# Supplemental function to retrieve visa_type_dimension table used to execute get_visa_key udf within 
# create_immigration function above

def get_visa_type_dimension(output_data):
    return spark.read.parquet(output_data + "visatype")

In [27]:
immigration_fact_df = create_immigration(df_final_immigration, output_data)
immigration_fact_df.limit(5).toPandas()

Unnamed: 0,record_id,i94yr,i94mon,i94cit,country_residence_code,i94port,arrdate,i94mode,state_code,depdate,...,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visa_type_key
0,243257,2016,4,130,130,PHO,2016-04-02,1,NY,2016-04-07,...,G,O,M,1972,6302016,F,DL,-359003515,47,884763262976
1,289649,2016,4,245,245,DET,2016-04-02,1,GA,2016-04-11,...,G,O,M,1957,10012016,M,DL,-1921760482,582,369367187456
2,1498315,2016,4,582,582,SEA,2016-04-08,1,FL,2016-04-16,...,G,O,M,1976,10072016,F,Y4,-1459106882,976,369367187456
3,2305457,2016,4,104,104,SFR,2016-04-13,1,CA,2016-04-21,...,G,O,M,1974,7112016,M,KL,255357485,605,738734374912
4,4702607,2016,4,135,135,ORL,2016-04-25,1,FL,2016-04-28,...,O,O,M,1967,7232016,,BA,-873770311,2037,884763262976


In [29]:
immigration_fact_df.printSchema()

root
 |-- record_id: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- country_residence_code: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- state_code: string (nullable = true)
 |-- depdate: date (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- dtadfile: integer (nullable = true)
 |-- visapost: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: integer (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visa_type_key: string (nullable =

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 


#### 1st Data Quality Test(Source/Count check for completeness): Table Load Success Test

Counting total # of records in all Dimension and Fact tables to confirm loading has been done correctly.

Total # of records should correspond to the expected # during both data cleaning and table creation steps.


In [31]:
# Code for this quality test is imported from etl_functions file with the function named "quality_checks"
# df's here are finalized created during table creation

table_dfs = {
    'immigration_fact': immigration_fact_df,
    'visa_type_dim': visatype_df,
    'time_dim': time_df,
    'demographics_dim': demographics_df,
    'country_dim': country_df
}
for table_name, table_df in table_dfs.items():
    # count quality check for table
    etl_functions.quality_checks(table_df, table_name)

PASSED 1st Data Quality test for immigration_fact table with 1,000 records found.
PASSED 1st Data Quality test for visa_type_dim table with 10 records found.
PASSED 1st Data Quality test for time_dim table with 1,000 records found.
PASSED 1st Data Quality test for demographics_dim table with 2,875 records found.
PASSED 1st Data Quality test for country_dim table with 91 records found.


#### 2nd Data Quality Test(Integrity Contraint): Primary Key Duplicate Check

2nd test checks to make sure there are no duplicate primary keys or ID's in the dimension tables

In [28]:
# check for duplicate ID entries in time dimension table
# updated df in order to conver to panda view

time_df_new = time_df.toPandas()
time_df_new['id'].duplicated().any()

if True:
    print('PASSED: No duplicate IDs in time dim table.')
else:
    print('FAILED: Duplicate IDs found in time dim table.')

PASSED: No duplicate IDs in time dim table.


In [80]:
# check for duplicate ID entries in demographics dimension table
demographics_df_new = demographics_df.toPandas()
demographics_df_new['id'].duplicated().any()

if True:
    print('PASSED: No duplicate IDs in demographics dim table.')
else:
    print('FAILED: Duplicate IDs found in demographics dim table.')

PASSED: No duplicate IDs in demographics dim table.


In [37]:
# check for duplicate visa type key entries in visatype dimension table
visatype_df_new = visatype_df.toPandas()
visatype_df_new['visa_type_key'].duplicated().any()

if True:
    print('PASSED: No duplicate visa_type_keys in visatype dim table.')
else:
    print('FAILED:Duplicate visa_type_keys found in visatype dim table.')

PASSED: No duplicate visa_type_keys in visatype dim table.


In [33]:
# check for duplicate country code entries in country dimension table
country_df_new = country_df.toPandas()
country_df_new['country_code'].duplicated().any()

if True:
    print('PASSED: No duplicate country codes in country dim table.')
else:
    print('FAILED: Duplicate country codes found in country dim table.')

PASSED: No duplicate country codes in country dim table.


#### 3rd Data Quality Test(Script Unit Test): Null Check on Primary Keys
Test to confirm table creating scripts are performing correctly: where primary keys are created correctly, and in 

their right designated tables/columns.

In [32]:
country_df.createOrReplaceTempView("dim_country")
demographics_df.createOrReplaceTempView("dim_demographics")
time_df.createOrReplaceTempView("dim_time")
visatype_df.createOrReplaceTempView("dim_visatype")
immigration_fact_df.createOrReplaceTempView("fact_immigration")

In [33]:
#Function to check for null values
def nullValueCheck(spark_ctxt, tables_to_check):
    """
    This function performs null value checks on specific columns of given tables received \
    as parameters and raises a ValueError exception when null values are encountered.
    It receives the following parameters:
    spark_ctxt: spark context where the data quality check is to be performed
    tables_to_check: A dictionary containing (table, columns) pairs specifying for each table,\
    which column is to be checked for null values.   
    """  
    for table in tables_to_check:
        print(f"Performing primary key null value check on table {table}...")
        for column in tables_to_check[table]:
            returnedVal = spark_ctxt.sql(f"""SELECT COUNT(*) as nbr FROM {table} WHERE {column} IS NULL""")
            if returnedVal.head()[0] > 0:
                raise ValueError(f"3rd Data quality check failed! Found NULL values in {column} column!")
        print(f"Table {table} passed 3rd Data Quality Test.")

In [34]:
tables_to_check = { 'fact_immigration' : ['record_id'], "dim_visatype":['visa_type_key'], 'dim_time':['arrdate'],\
                    'dim_demographics': ['City','state_code'],\
                    'dim_country':['country_code']}

nullValueCheck(spark, tables_to_check)

Performing primary key null value check on table fact_immigration...
Table fact_immigration passed 3rd Data Quality Test.
Performing primary key null value check on table dim_visatype...
Table dim_visatype passed 3rd Data Quality Test.
Performing primary key null value check on table dim_time...
Table dim_time passed 3rd Data Quality Test.
Performing primary key null value check on table dim_demographics...
Table dim_demographics passed 3rd Data Quality Test.
Performing primary key null value check on table dim_country...
Table dim_country passed 3rd Data Quality Test.


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

<b><i>Time Dimension Table Data Dictionary</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">arrdate(Primary Key)</td><td class="tg-0pky">Arrival Date into the US</td>
 <tr><td class="tg-0pky">arrival_day</td><td class="tg-0pky">Global average land temperature in celsius</td>
 <tr><td class="tg-0pky">arrival_week</td><td class="tg-0pky">95% confidence interval around the average</td>
 <tr><td class="tg-0pky">arrival_month</td><td class="tg-0pky">City Name</td>
 <tr><td class="tg-0pky">arrival_year</td><td class="tg-0pky">Country Name</td>
 <tr><td class="tg-0pky">arrival_weekday</td><td class="tg-0pky">Latitude of City</td>
 <tr><td class="tg-0pky">id</td><td class="tg-0pky">unique monotonically increasing id created</td>
</table>

<b><i>Country Dimension Table Data Dictionary</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">country_code(Primary Key)</td><td class="tg-0pky">Unique country code</td>
 <tr><td class="tg-0pky">country_name</td><td class="tg-0pky">Name of country</td>
 <tr><td class="tg-0pky">average_temperature</td><td class="tg-0pky">Average temperature of country</td>
 <tr><td class

<b><i>Demographics Dimension Table Data Dictionary</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">state_code (Primary Key)</td><td class="tg-0pky">US State of arrival</td>
 <tr><td class="tg-0pky">city</td><td class="tg-0pky">City Name</td>
 <tr><td class="tg-0pky">state</td><td class="tg-0pky">State of city</td>
 <tr><td class="tg-0pky">median_age</td><td class="tg-0pky">Median age of population</td>
 <tr><td class="tg-0pky">male_population</td><td class="tg-0pky">Male Population of city</td>
 <tr><td class="tg-0pky">female_population</td><td class="tg-0pky">Female Population of city</td>
 <tr><td class="tg-0pky">total_population</td><td class="tg-0pky">Total Population of city</td>
 <tr><td class="tg-0pky">number_of_veterans</td><td class="tg-0pky">Veteran Population in city</td>
 <tr><td class="tg-0pky">foreign_born</td><td class="tg-0pky">Population of residents not born in city</td>
 <tr><td class="tg-0pky">average_household_size</td><td class="tg-0pky">Average city household size</td>
 <tr><td class="tg-0pky">race</td><td class="tg-0pky">Traveler's race</td>
 <tr><td class="tg-0pky">count</td><td class="tg-0pky">Count of city's individual per race</td>
 <tr><td class="tg-0pky">id</td><td class="tg-0pky">Unique monotonically increasing id created</td>
</table>

<b><i>Visa_type Dimension Table Data Dictionary</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">visa_type_key(Primary Key)</td><td class="tg-0pky">Unique country code</td>
 <tr><td class="tg-0pky">country_name</td><td class="tg-0pky">Name of country</td>
 <tr><td class

<b><i>Immigration Fact Table Data Dictionary</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">record_id (cicid)</td><td class="tg-0pky">Unique record ID</td>
 <tr><td class="tg-0pky">i94yr</td><td class="tg-0pky">4 digit year</td>
 <tr><td class="tg-0pky">i94mon</td><td class="tg-0pky">Numeric month</td>
 <tr><td class="tg-0pky">i94cit</td><td class="tg-0pky">3 digit code for traveler's birth country</td>
 <tr><td class="tg-0pky">country_residence_code (i94res)</td><td class="tg-0pky">3 digit code for traveler's country of residence </td>
 <tr><td class="tg-0pky">i94port</td><td class="tg-0pky">Port of admission</td>
 <tr><td class="tg-0pky">arrdate</td><td class="tg-0pky">Arrival Date into the US</td>
 <tr><td class="tg-0pky">i94mode</td><td class="tg-0pky">Mode of transportation; 1 = Air; 2 = Sea; 3 = Land; 9 = Unknown</td>
 <tr><td class="tg-0pky">state_code (i94addr)</td><td class="tg-0pky">US State of arrival</td>
 <tr><td class="tg-0pky">depdate</td><td class="tg-0pky">Departure Date from the US</td>
 <tr><td class="tg-0pky">i94bir</td><td class="tg-0pky">Birth Year</td>
 <tr><td class="tg-0pky">i94visa</td><td class="tg-0pky">Visa codes</td>
 <tr><td class="tg-0pky">count</td><td class="tg-0pky">Field used for summary statistics</td>
 <tr><td class="tg-0pky">dtadfile</td><td class="tg-0pky">Character Date Field - Date added to I-94 Files</td>
 <tr><td class="tg-0pky">visapost</td><td class="tg-0pky">Department of State where Visa was issued </td>
 <tr><td class="tg-0pky">entdepa</td><td class="tg-0pky">Arrival Flag</td>
 <tr><td class="tg-0pky">entdepd</td><td class="tg-0pky">Departure Flag</td>
 <tr><td class="tg-0pky">matflag</td><td class="tg-0pky">Match flag</td>
 <tr><td class="tg-0pky">biryear</td><td class="tg-0pky">4 digit year of birth</td>
 <tr><td class="tg-0pky">dtaddto</td><td class="tg-0pky">Character Date Field; Date to which admitted to U.S.</td>
 <tr><td class="tg-0pky">gender</td><td class="tg-0pky">Gender</td>
 <tr><td class="tg-0pky">insnum</td><td class="tg-0pky">INS number</td>
 <tr><td class="tg-0pky">airline</td><td class="tg-0pky">Arrival Airline</td>
 <tr><td class="tg-0pky">admnum</td><td class="tg-0pky">Admission Number</td>
 <tr><td class="tg-0pky">fltno</td><td class="tg-0pky">Arrival Airline Flight number</td>
 <tr><td class="tg-0pky">visa_type_key</td><td class="tg-0pky">Unique id for each visa issued</td>
</table>

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

In [1]:
from distutils.dir_util import copy_tree
copy_tree("../../data/18-83510-I94-Data-2016/", "./data")

['./data/i94_apr16_sub.sas7bdat',
 './data/i94_sep16_sub.sas7bdat',
 './data/i94_nov16_sub.sas7bdat',
 './data/i94_mar16_sub.sas7bdat',
 './data/i94_jun16_sub.sas7bdat',
 './data/i94_aug16_sub.sas7bdat',
 './data/i94_may16_sub.sas7bdat',
 './data/i94_jan16_sub.sas7bdat',
 './data/i94_oct16_sub.sas7bdat',
 './data/i94_jul16_sub.sas7bdat',
 './data/i94_feb16_sub.sas7bdat',
 './data/i94_dec16_sub.sas7bdat']