# Data Engineering Capstone Project

### Executive Summary

The Business Objective of this project is to support the `analytical datawarehouse in Amazon RedShift( cloud data warehouse)` for `U.S. Customs and Border Protection`. It is meant to process the raw data from the source, transform and load to the datalakes to provide analytics ready data about visitors entering U.S. The analytics team at `U.S. Customs and Border Protection` can build the reports; which can help  for better resource allocation;thereby enhancing the visitors experience hassle-free on their entry into United States.  



The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import os
import pandas as pd
import pyarrow.parquet as pq
from datetime import datetime
import os, re
import configparser
from datetime import timedelta, datetime
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, when, lower, isnull, year, month, dayofmonth, hour, weekofyear, dayofweek, date_format,to_date
from pyspark.sql.types import StructField, StructType, IntegerType, DoubleType
pd.set_option('display.max_rows', 5000)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)

In [2]:
def cast_type(df, dict_obj):
    """Converts the column to the desired type
     Arguments:   df : Spark dataframe 
                  dict_obj : Dictionary object
                                  ( Key= column name ; 
                                    value = Type to be converted to)     
    """

    #k=Key (Column name) ; #v=Value (Type)
    for k,v in dict_obj.items():
        if k in df.columns:
            df = df.withColumn(k, df[k].cast(v))
    return df



def rename_columns(df, dict_map):
    '''Rename the columns of the dataset
      Arguments: df: Spark dataframe to be processed.
                     dict_map: key=old_name value= new_name
    '''
    df = df.select([col(c).alias(dict_map.get(c, c)) for c in df.columns])
    return df



def convert_sas_date(df, cols):
    """
    Convert dates in the SAS datatype to a date in a string format YYYY-MM-DD 
    Args:
        df   : Spark Dataframe
        cols : List of columns in the SAS date format to be convert
    """
    
    #user defined function 
    convert_sas_udf = udf(lambda x: x if x is None 
                                      else (timedelta(days=x) + datetime(1960, 1, 1)) .date().strftime(date_format))
   
    
    for c in [c for c in cols if c in df.columns]:
        df = df.withColumn(c, convert_sas_udf(df[c]))
    return df


def date_diff(date1, date2):
    '''
    Calculates the difference in days between two dates
    '''
    if date2 is None:
        return None
    else:
        a = datetime.strptime(date1, date_format)
        b = datetime.strptime(date2, date_format)
        delta = b - a
        return delta.days


def change_field_value_condition(df, change_list):
    '''
    Helper function used to rename column values based on condition.
    
    Args:
        df (:obj:`SparkDataFrame`): Spark dataframe to be processed.
        change_list (:obj: `list`): List of tuples in the format (field, old value, new value)
    '''
    for field, old, new in change_list:
        df = df.withColumn(field, when(df[field] == old, new).otherwise(df[field]))
    return df



# User defined functions using Spark udf wrapper function to convert SAS dates into string dates in the format YYYY-MM-DD, to capitalize the first letters of the string and to calculate the difference between two dates in days.
capitalize_udf = udf(lambda x: x if x is None else x.title())
date_diff_udf = udf(date_diff)



## Step 1: Scope the Project and Gather Data


### The Architecture

The complete solution is cloud based on top of `Amazon Web Services (AWS)`. 

In this project, the datasets are loaded into EMR Cluster, preprocessed with Apache Spark and stored back as Fact and Dimension Tables in AWS S3 bucket in parquet format. The DataQuality checks are performed on the processed data. The Analytics team can pull data into `OLAP datawarehouse` or `BI apps` for their convenient analysis.

The main objective of this project is to develop a ETL pipeline to process huge amounts of source data using `Apache Spark`, store the staging data back to `Amazon S3` bucket (To allow multiple users across the continum to access concurrently), 

Technologies Used: `Python` `Apache Spark` `Amazon S3` 

![](architecture.png)

https://github.com/fpcarneiro/data-engineer-project/blob/master/Capstone%20Project%20Template.ipynb


### Data Source

`I94 Immigration Data`: 
This data comes from the US National Tourism and Trade Office Source. This data records immigration records partitioned by month of every year.

`World temperature Data`
This dataset comes from Kaggle Source. Includes temperature recordings of cities around the world for a period of time


`US City Demographic Data`
This dataset comes from OpenSoft Source. Includes population formation of US states, like race and gender

## Step 2: Explore and Assess the Data

### Creating a Spark Session

In [7]:
from pyspark.sql import SQLContext

#creating a spark session
spark=SparkSession.builder.master("local").getOrCreate()

sc = spark.sparkContext
# using SQLContext to read parquet file

sqlContext = SQLContext(sc)

### Immigration Data

In [8]:
# Reading parquet file
immigration = sqlContext.read.parquet('data/part-00013-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet')
immigration.show(2)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [35]:
immigration.count()

235125

#### ` Checking Null Values`

In [9]:
print('\n #### The total number of rows are #### \n')
print(immigration.count())

#checking null values
print('\n #### The count of Missing values by column are #### \n')
immigration.select([count(when(isnull(c), c)).alias(c) for c in immigration.columns]).show()


 #### The total number of rows are #### 

235125

 #### The count of Missing values by column are #### 

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+------+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost| occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+------+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|    130|  53704|  31818|   783|      0|    0|       0|  158785|234200|    130|  27807| 235096|  27807|    783|    311|  4428|122404|  82585|     0|18486|       0|
+-----+-----+------+------+------+-------+

#### `Dropping unnecessary columns`
    
    1. Irrelevant columns
    2. Columns with more than 60% of the missing values

In [10]:
print('#### Number of columns in the Immigration data Before Dropping')
print(len(immigration.columns))


print('#### Number of columns in the Immigration data After Dropping')
immigration=immigration.drop("count", "entdepd", "matflag", "entdepu","dtaddto", "biryear", "admnum","insnum","entdepa","occup","visapost")
print(len(immigration.columns))

#### Number of columns in the Immigration data Before Dropping
28
#### Number of columns in the Immigration data After Dropping
17


#### `Type Casting`

In [11]:
print(immigration.printSchema())

#Numeric columns
integer_cols = ['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'arrdate', 'i94mode', 'i94bir', 'i94visa', 'count', 
                    'biryear', 'dtadfile', 'depdate']
    
#Date Columns
date_cols = ['arrdate', 'depdate']
    
# Type Casting columns to NUMERIC
immigration = cast_type(immigration, dict(zip(integer_cols, len(integer_cols)*[IntegerType()])))
    
# Convert SAS date to a format of YYYY-MM-DD
immigration = convert_sas_date(immigration, date_cols)

print('############## After Type Casting ')
print(immigration.printSchema())

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)

None
############## After Type Casting 
root
 |-- cicid: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: integer (n

#### `Feature Engineering of Stay Column`

In [12]:
# Create a new columns to store the length of the visitor stay in the US
#Stay= Departure Date - Arrival Date
immigration = immigration.withColumn('stay', date_diff_udf(immigration.arrdate, immigration.depdate))
    
#Converting the derived column to integer
immigration = cast_type(immigration, {'stay': IntegerType()})

### Creating a Date Dimension Table

In [23]:
 #Taking distinct arrival dates
arrdate = immigration.select('arrdate').distinct()

#Taking distinct departure dates
depdate = immigration.select('depdate').distinct()

#Union 
dates = arrdate.union(depdate)

dates = dates.withColumn("year", year(dates.arrdate))
dates = dates.withColumn("month", month(dates.arrdate))
dates = dates.withColumn("day", dayofmonth(dates.arrdate))
dates = dates.withColumn("weekofyear", weekofyear(dates.arrdate))
dates = dates.withColumn("dayofweek", dayofweek(dates.arrdate))
dates = dates.drop("date").withColumnRenamed('arrdate', 'date')
        

### Global Temperature Data

    In the main immigration dataset, there is a country code for every country.
    We are extracting country wise temperature; joining with a lookup with country name to extract the country code

In [16]:
global_temperature = spark.read.csv('GlobalLandTemperaturesByCity.csv',header=True)
global_temperature.show(5)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



####  `Aggregating data by Country`
    There are multiple records for the same city

In [17]:
countries = global_temperature.groupby(["Country"]).agg({"AverageTemperature": "avg", 
                                 "Latitude": "first",
                                 "Longitude": "first"})\
    .withColumnRenamed('avg(AverageTemperature)', 'Temperature')\
    .withColumnRenamed('first(Latitude)', 'Latitude')\
    .withColumnRenamed('first(Longitude)', 'Longitude')

In [18]:
# Rename countries to match the lookup datasets
change_countries = [("Country", "Congo (Democratic Republic Of The)", "Congo"), 
                        ("Country", "Côte D'Ivoire", "Ivory Coast")]
countries = change_field_value_condition(countries, change_countries)
countries = countries.withColumn('Country_Lower', lower(countries.Country))

countries.show(5)

+--------+------------------+--------+---------+-------------+
| Country|       Temperature|Latitude|Longitude|Country_Lower|
+--------+------------------+--------+---------+-------------+
|    Chad|27.189829394812683|   8.84N|   15.41E|         chad|
|Paraguay|22.784014312977117|  24.92S|   58.52W|     paraguay|
|  Russia|  3.34726798287354|  53.84N|   91.36E|       russia|
|   Yemen| 25.76840766445382|  13.66N|   45.41E|        yemen|
| Senegal| 25.98417669449083|  15.27N|   17.50W|      senegal|
+--------+------------------+--------+---------+-------------+
only showing top 5 rows



#### `loading the lookup table to extract country code`

In [24]:
res = spark.read.csv("LookUpTables/I94CIT_I94RES.csv",header=True)
print(res.printSchema())
res.show(5)

root
 |-- Code: string (nullable = true)
 |-- I94CTRY: string (nullable = true)

None
+----+-----------+
|Code|    I94CTRY|
+----+-----------+
| 582|     MEXICO|
| 236|AFGHANISTAN|
| 101|    ALBANIA|
| 316|    ALGERIA|
| 102|    ANDORRA|
+----+-----------+
only showing top 5 rows



In [25]:
#converting code column to integer    
res = cast_type(res, {"Code": IntegerType()})
#creating a new column as lower case of I94CTRY
res = res.withColumn('Country_Lower', lower(res.I94CTRY))
res.show(2) 

+----+-----------+-------------+
|Code|    I94CTRY|Country_Lower|
+----+-----------+-------------+
| 582|     MEXICO|       mexico|
| 236|AFGHANISTAN|  afghanistan|
+----+-----------+-------------+
only showing top 2 rows



In [18]:
 
change_res = [("I94CTRY", "BOSNIA-HERZEGOVINA", "BOSNIA AND HERZEGOVINA"), 
                  ("I94CTRY", "INVALID: CANADA", "CANADA"),
                  ("I94CTRY", "CHINA, PRC", "CHINA"),
                  ("I94CTRY", "GUINEA-BISSAU", "GUINEA BISSAU"),
                  ("I94CTRY", "INVALID: PUERTO RICO", "PUERTO RICO"),
                  ("I94CTRY", "INVALID: UNITED STATES", "UNITED STATES")]

res = change_field_value_condition(res, change_res)



#### `Joining the countries dataset with lookup table to create the country dimmension table`

In [26]:
res = res.join(countries, res.Country_Lower == countries.Country_Lower, how="left")
res = res.withColumn("Country", when(isnull(res["Country"]), capitalize_udf(res.I94CTRY)).otherwise(res["Country"]))   
res = res.drop("I94CTRY", "Country_Lower")
res.show(5)

+----+---------+------------------+--------+---------+
|Code|  Country|       Temperature|Latitude|Longitude|
+----+---------+------------------+--------+---------+
| 532|    Aruba|              null|    null|     null|
| 110|  Finland| 3.711644535691719|  60.27N|   25.95E|
| 438|Australia| 16.70146214247643|  34.56S|  138.16E|
| 113|   Greece|16.347482714233163|  37.78N|   24.41E|
| 126| Portugal|14.749674965924589|  39.38N|    8.32W|
+----+---------+------------------+--------+---------+
only showing top 5 rows



### U.S. City Demographic Data


In [28]:
us_cities_demographics = spark.read.csv("LookUpTables/us-cities-demographics.csv",header=True,sep=';')
print(us_cities_demographics.printSchema())
us_cities_demographics.show(5)

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)

None
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+--------------

#### `Type Casting`

In [29]:
integer_cols=['Count', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born']
float_cols=['Median Age', 'Average Household Size']

us_cities_demographics = cast_type(us_cities_demographics, dict(zip(integer_cols, len(integer_cols)*[IntegerType()])))
us_cities_demographics = cast_type(us_cities_demographics, dict(zip(float_cols, len(float_cols)*[DoubleType()])))


#### `Aggregating the data`

In [30]:

print('#################### \n Aggregating the data \n')

#Aggregating the data by CITY and STATE
aggregate_df = us_cities_demographics.groupBy(["City", "State", "State Code"]).agg({"Median Age": "first", 
 "Male Population": "first",
 "Female Population": "first", 
 "Total Population": "first", 
 "Number of Veterans": "first",
 "Foreign-born": "first", 
 "Average Household Size": "first"})

aggregate_df.show(5)


#################### 
 Aggregating the data 

+------------+-----------+----------+-----------------------+------------------------+-----------------+-------------------------+-------------------+----------------------+-----------------------------+
|        City|      State|State Code|first(Total Population)|first(Female Population)|first(Median Age)|first(Number of Veterans)|first(Foreign-born)|first(Male Population)|first(Average Household Size)|
+------------+-----------+----------+-----------------------+------------------------+-----------------+-------------------------+-------------------+----------------------+-----------------------------+
|   Rockville|   Maryland|        MD|                  66998|                   35793|             38.1|                     1990|              25047|                 31205|                          2.6|
|Delray Beach|    Florida|        FL|                  66261|                   34042|             47.9|                     4232|        

#### `pivoting the columns`

In [31]:
# Pivot Table to transform values of the column Race to different columns
pivoted_df = us_cities_demographics.groupby(["City", "State", "State Code"]).pivot("Race").sum("Count")
pivoted_df.show(5)

+------------+----------+----------+---------------------------------+-----+-------------------------+------------------+------+
|        City|     State|State Code|American Indian and Alaska Native|Asian|Black or African-American|Hispanic or Latino| White|
+------------+----------+----------+---------------------------------+-----+-------------------------+------------------+------+
|Delray Beach|   Florida|        FL|                             null| 1696|                    21138|              6397| 40980|
|   Rockville|  Maryland|        MD|                              594|17370|                     7533|              9197| 41692|
| Jersey City|New Jersey|        NJ|                             3356|67610|                    65051|             79718| 99300|
|    Alhambra|California|        CA|                              687|44067|                     1905|             31386| 20811|
|  Cincinnati|      Ohio|        OH|                             3362| 7633|                   13

#### `Joining aggregated and pivoted data to extract summary stats as well as stats for each race`

In [32]:
# Rename column names
# Join the aggregated Df To Pivoted DF
demographics = aggregate_df.join(other=pivoted_df, on=["City", "State", "State Code"], how="inner")\
.withColumnRenamed('first(Total Population)', 'TotalPopulation')\
.withColumnRenamed('first(Female Population)', 'FemalePopulation')\
.withColumnRenamed('first(Male Population)', 'MalePopulation')\
.withColumnRenamed('first(Median Age)', 'MedianAge')\
.withColumnRenamed('first(Number of Veterans)', 'NumberVeterans')\
.withColumnRenamed('first(Foreign-born)', 'ForeignBorn')\
.withColumnRenamed('first(Average Household Size)', 'AverageHouseholdSize')\
.withColumnRenamed('Hispanic or Latino', 'HispanicOrLatino')\
.withColumnRenamed('Black or African-American', 'BlackOrAfricanAmerican')\
.withColumnRenamed('American Indian and Alaska Native', 'AmericanIndianAndAlaskaNative')


numeric_cols = ['TotalPopulation', 'FemalePopulation', 'MedianAge', 'NumberVeterans', 'ForeignBorn', 'MalePopulation', 
'AverageHouseholdSize','AmericanIndianAndAlaskaNative', 'Asian', 'BlackOrAfricanAmerican', 
'HispanicOrLatino', 'White']

# Fill the null values with 0
demographics = demographics.fillna(0, numeric_cols)

print(demographics.show(2))


+------------+--------+----------+---------------+----------------+---------+--------------+-----------+--------------+--------------------+-----------------------------+-----+----------------------+----------------+-----+
|        City|   State|State Code|TotalPopulation|FemalePopulation|MedianAge|NumberVeterans|ForeignBorn|MalePopulation|AverageHouseholdSize|AmericanIndianAndAlaskaNative|Asian|BlackOrAfricanAmerican|HispanicOrLatino|White|
+------------+--------+----------+---------------+----------------+---------+--------------+-----------+--------------+--------------------+-----------------------------+-----+----------------------+----------------+-----+
|Delray Beach| Florida|        FL|          66261|           34042|     47.9|          4232|      16639|         32219|                2.35|                            0| 1696|                 21138|            6397|40980|
|   Rockville|Maryland|        MD|          66998|           35793|     38.1|          1990|      25047|    

### Step 3: Define the Data Model

#### 3.1 Conceptual Data Model

STAR SCHEMA has been chosen to maintain the data integrity

After acessing the data from different datasets, we have `Immigration`,`country`,`Demographics` tables 

1. `Immigration Table` - It forms the center of the data warehouse. It is the Fact table consisting information about the 
    visitors information like Arrival Date, Departure Date, Gender, AirLine, Type of Visa
    *Primary Key = cicid ( Unique identifer for the Visitor )*
    
    
2.  ` Date Dimension Table` - From the Immigration table, a date dimension table was created using the Unique 
      arrival and departure dates
      *Primary Key = date*
      
        
3.  `The STATE dimension table` contains aggregation of the demographics dataset by the State column. It contains the 
      overall statistics ( Median Age, Male Population, Female Population, Total Population, Number of Veterans, Foreign-born)
      and the same statistics for each of the race ((BlackOrAfricanAmerican, White, ForeignBorn, AmericanIndianAndAlaskaNative, 
      HispanicOrLatino, Asian)
      *Primary Key State*
      
      
4. `The COUNTRY dimention` completes our star schema model. It has the average temperature , lattitude , longitude 
    for each country
    *Primary key COuntry*

#### 3.2 Mapping Out Data Pipelines

The `ETL` pipeline to `Extract` the data from the repository, `Transform` using Spark and `Load` into S3 buckets in parquet format

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

For exploring the data, jupyter notebook has been used. The actual pipeline was built using python scripts

In [None]:
#run etl_process_using_spark.py inside etl_scripts

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 

`Immigration Table `

|ColumnName|Description
|-----|-------|
CICID|Primary Key
I94YR|Year
I94MON|Month
I94CIT|3 digit for the country code where the visitor was born. This is a FK to the COUNTRY dimension table
I94RES|3 digit for the country code where the visitor resides in. This is a FK to the COUNTRY dimension table
ARRDATE|Arrival date in the USA. This is a FK to the DATE dimension table
I94MODE|Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
I94ADDR|State of arrival. This is a FK to the STATE dimension table
DEPDATE|Departure date from the USA. This is a FK to the DATE dimension table
I94BIR|Age of Respondent in Years
I94VISA|Visa codes collapsed into three categories: (1 = Business; 2 = Pleasure; 3 = Student)
BIRYEAR|4 digit year of birth
GENDER|Gender
AIRLINE|Airline used to arrive in U.S.
FLTNO|Flight number of Airline used to arrive in U.S.
VISATYPE|Class of admission legally admitting the non-immigrant to temporarily stay in U.S.
STAY|Number of days in the US


`DATE TABLE`

|ColumnName|Description
|-----|-------|
date|Date in the format YYYY-MM-DD. This is the PK.
day|Two digit day
month|Two digit month
year|Four digit for the year
weekofyear|The week of the year
dayofweek|The day of the week

`COUNTRY TABLE`

|ColumnName|Description
|-----|-------|
Code|Country Code. This is the PK.
Country|Country Name
Temperature|Average temperature of the country between 1743 and 2013
Latitude|GPS Latitude
Longitude|GPS Longitude


`STATE TABLE`

|ColumnName|Description
|-----|-------|
Code|Primary Key. This is the code of the State as in I94ADDR lookup table
State|Name of the state
BlackOrAfricanAmerican|Number of residents of the race Black Or African American
White|Number of residents of the race White
ForeignBorn|Number of residents that born outside th United States
AmericanIndianAndAlaskaNative|Number of residents of the race American Indian And Alaska Native
HispanicOrLatino|Number of residents of the race Hispanic Or Latino
Asian|Number of residents of the race Asian
NumberVeterans|Number of residents that are war veterans
FemalePopulation|Number of female population
MalePopulation|Number of male population
TotalPopulation|Number total of the population

#### Step 5: Complete Project Write Up

#### `Clearly state the rationale for the choice of tools and technologies for the project`

The whole solution was implemented on cloud Amazon Web Services because the cloud computing provides a low-cost, scalable, and highly reliable infrastructure platform in the cloud.No up-front costs involved.

In particular, why we use the following services:

`S3`: Provides a relatively cheap, easy-to-use with scalability, high availability, security, and performance. 

`Spark`: This is simply the best framework for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance.

`EMR`: This is a cloud-native big data platform, allowing teams to process vast amounts of data quickly, and cost-effectively at scale using Spark. EMR is easy to use, secure, elastic and low-cost. Perfect to our project;

#### `Propose how often the data should be updated and why`

Since we receive one file per month it seems reasonable to update the model monthly.

#### `Write a description of how you would approach the problem differently under the following scenarios`

##### `The data was increased by 100x:`

The biggest advantage of using spark on the cluster is to scale up and down. We can add more nodes to scale up

##### `The data populates a dashboard that must be updated on a daily basis by 7am every day`
The runnig interval of the Airflow DAG could be changed to daily and scheduled to run overnight to make the data available by 7am.

##### `The database needed to be accessed by 100+ people.`

Again cloud services like red shift are highly available. We can make use of elastic size feature