# Project Title
### Data Engineering Capstone Project

#### Project Summary
##### As part of this project we will model the immigration and weather data of United States to understand the impact of temperature on the travel pattern of passengers coming to the United states over the past years for the past 50 years.

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [None]:
# Do all imports and installs here
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import pandas as pd

### Step 1: Scope the Project and Gather Data

#### Scope: 
----------------------
##### As part of this project we will model the immigration and weather data of the United States to understand the impact of temperature on the travel pattern of passengers coming to the United States over the past years for the past 50 years i.e since 1970s. More specifically we want to answer the questions below:
1. Temperature influence over the years on the choice of destination city in the U.S.
2. Temperature influence over the years on the choice of arrival date of travellers aggregated across the various date dimensions i.e. year, month, quarter etc.

#### Source Datasets:
-------------------------------
##### Below are the datasets that we will used across the project.
1. i94_apr16_sub.sas7bdat: This immigration data comes from the US National Tourism and Trade Office and contains information present on the I94 form which is the arrival/departure report card for the international visitors.
2. GlobalLandTemperaturesByCity.csv: This dataset contains the world temperature recorded over various years for various cities across countries. We would only be looking at the temperature data of United States since 1970s
3. I94PORT_lookup.csv: This file is generated based on the dictionary provided for i94port column in the I94_SAS_Labels_Descriptions.SAS file. Since we are only interested in US port of entry city and states, non US port of entries such as Canada, Mexico , Belgium etc as well as invalid, collapsed, no port code data have been dropped while creating the file.
4. I94CIT_I94RES_lookup.csv: This file is generated based on dictionary provided for i94cit and i94res column in the I94_SAS_Labels_Descriptions.SAS file.

#### Target tables:
----------------------
##### Below are the target table details:
1. Fact Table : fact_immigration
2. Dimension Tables: dim_temperature, dim_date
3. Format: parquet

#### ETL Pipeline Design:
-------------------------
##### The ETL pipeline will leverage Spark's in memory processing capabilities which is very fast and will loads the data as follows:
1. Create a SparkSession and read the i94_apr16_sub.sas7bdat, GlobalLandTemperaturesByCity.csv, I94PORT_lookup.csv and I94CIT_I94RES_lookup.csv input files to create spark dataframes.
2. Select the desired columns for each of the target tables and perform the required transformations.
3. Save the transformed dataframe with the required partitioning in parquet format.


In [None]:
# Read in the i94 immigration data which is in sas format and load it in a pandas dataframe.
fname_i94 = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_i94 = pd.read_sas(fname_i94, 'sas7bdat', encoding="ISO-8859-1")

In [None]:
# Explore the first 5 rows of the i94 travel data.
df_i94.head()
# Explore the columns and the data types for i94 travel data.
df_i94.info()
# Explore the value counts for i94 travel data.
df_i94.count()

In [None]:
# Read in the global temperature data which is in csv format and load it in a pandas dataframe.
fname_global_temp = '../../data2/GlobalLandTemperaturesByCity.csv'
df_global_temp = pd.read_csv(fname_global_temp,parse_dates=['dt'])

In [None]:
# Explore temperature data to check the columns and the corresponding value counts.
df_global_temp.info()
# Explore the first 5 rows of the global temperature data.
df_global_temp.head()
# Explore the value counts for global temperature data.
df_global_temp.count()

In [None]:
# Since we are only interested in exploring U.S. data post 1970 ,apply a filter where 
# Country = 'United States' and dt >= 1st Jan 1970.
df_us_temp = df_global_temp[df_global_temp['Country'] == 'United States']
df_us_temp = df_us_temp[df_us_temp['dt'] >= '1970-01-01']



In [None]:
# Explore us temperature data to check the columns and the corresponding value counts.
df_us_temp.info()
# Explore the first 5 rows of the us temperature data.
df_us_temp.head()
# Explore the value counts for us temperature data.
df_us_temp.count()

In [None]:
# Read in the I94 port data which is in csv format and load it in a pandas dataframe.
fname_I94port = 'I94PORT_lookup.csv'
df_I94port = pd.read_csv(fname_I94port,delimiter=";")

In [None]:
# Explore the value counts for the I94 port data alongwith the first 5 rows
df_I94port.count()
df_I94port.info()
df_I94port.head()

In [None]:
# Read in the I94 country data which is in csv format and load it in a pandas dataframe.
fname_I94cit_res = 'I94CIT_I94RES_lookup.csv'
df_I94cit_res = pd.read_csv(fname_I94cit_res,quotechar="'")

In [None]:
# Explore the value counts for the I94 country data alongwith the first 5 rows
df_I94cit_res.count()
df_I94cit_res.info()
df_I94cit_res.head()

In [None]:
#Instantiate py spark session and read in the i94 immigration data in spark dataframes
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark_i94 =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [None]:
#write the i94 immigration data to parquet folder locally and read from it.
df_spark_i94.write.parquet("sas_data")
df_spark_i94=spark.read.parquet("sas_data")

In [None]:
# Convert the us temp pandas dataframe to spark dataframe 
df_spark_us_temp = spark.createDataFrame(df_us_temp)


In [None]:
# Convert the i94 port pandas dataframe to spark dataframe 
mySchema_i94port = StructType([ StructField("code", StringType(), True)\
                       ,StructField("city", StringType(), True)\
                       ,StructField("state", StringType(), True)])

df_spark_i94port = spark.createDataFrame(df_I94port,schema=mySchema_i94port)


In [None]:
# Convert the country pandas dataframe to spark dataframe 
mySchema_i94cit_res = StructType([ StructField("code", LongType(), True)\
                       ,StructField("country", StringType(), True)])
df_spark_i94cit_res = spark.createDataFrame(df_I94cit_res,schema=mySchema_i94cit_res)

In [None]:
# Check the schema of the i94  immigration data alongwith the value counts
df_spark_i94.printSchema()
print(df_spark_i94.count())
df_spark_i94.show(5,truncate=False)




##### Lets only select the columns below from the immigration dataset which will be part of the final data model and which we will explore futher.
1. i94cit
2. i94res
3. i94port
4. arrdate
5. depdate
6. i94mode
7. i94visa
8. gender 
9. biryear
10. cicid


In [None]:
#Select a subset of columns from the immigration tbl and tally the record count.
df_spark_i94 = df_spark_i94.select("cicid","i94cit","i94res","i94port","arrdate","depdate","i94mode",
                   "i94visa","gender","i94bir","biryear").distinct()

df_spark_i94.select("cicid","i94cit","i94res","i94port","arrdate","depdate","i94mode",
                   "i94visa","gender","i94bir","biryear").count()


In [None]:
# Check the schema of the us temperature data alongwith the value counts
df_spark_us_temp.printSchema()
print(df_spark_us_temp.count())
df_spark_us_temp.show(5,truncate=False)


In [None]:
# Check the schema of the i94 port data alongwith the value counts
df_spark_i94port.printSchema()
print(df_spark_i94port.count())
df_spark_i94port.show(5,truncate=False)


In [None]:
# Check the schema of the i94 country data alongwith the value counts
df_spark_i94cit_res.printSchema()
print(df_spark_i94cit_res.count())
df_spark_i94cit_res.show(5,truncate=False)


In [None]:
# Register the spark dataframes below as temporary table.
df_spark_i94.createOrReplaceTempView('immigration_tbl')
df_spark_i94cit_res.createOrReplaceTempView('country_tbl')
df_spark_i94port.createOrReplaceTempView('port_of_entry_tbl')
df_spark_us_temp.createOrReplaceTempView('us_temp_tbl')

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Clean the data

In [None]:
#Lets the check cicid column to see the distinct values:
spark.sql(""" select distinct count(cicid)
              from immigration_tbl""").show()

##### Lets translate the code values in each of the fields below in the i94 immigration data to add suitable description based on the data dictionary provided in the workspace
1. i94cit,i94res ==> Will be used to add country of origin and residence 
2. i94mode ==> Will be used to add the actual mode of transport
3. i94visa ==> Will be used to add the visa type.

In [None]:
# Check if the country of origin codes in the immigration dataset is invalid and get the counts
spark.sql("""select distinct i94cit
             from immigration_tbl where
             i94cit not in (
             select distinct code from country_tbl)""").show()

spark.sql("""select count(*)
             from immigration_tbl where
             i94cit not in (
             select distinct code from country_tbl)""").show()

In [None]:
# Check if the country of residence codes in the immigration dataset is invalid and get the counts
spark.sql("""select distinct i94res
             from immigration_tbl where
             i94res not in (
             select distinct code from country_tbl)""").show()

spark.sql("""select count(*)
             from immigration_tbl where
             i94res not in (
             select distinct code from country_tbl)""").show()

##### Looks like we have about 386189 rows where the country of origin is an invalid number. But since all the rows have a valid country of residence codes, we will not drop such records. 
##### We will do an inner join while determing the country of residence and a left outer join while determining the country of origin i.e country of residence is mandatory but not country of origin.

In [None]:
# Add the country of residence and country of origin columns in the immigration dataset.
df_spark_i94 = spark.sql("""select imm.*, cntry.country as i94_country_origin
             from immigration_tbl imm
             left outer join country_tbl cntry
             on imm.i94cit = cntry.code""")
df_spark_i94.createOrReplaceTempView('immigration_tbl')

df_spark_i94 = spark.sql("""select imm.*, cntry.country as i94_country_residence
             from immigration_tbl imm
             inner join country_tbl cntry
             on imm.i94res = cntry.code""")

df_spark_i94.createOrReplaceTempView('immigration_tbl')



In [None]:
# Validate the country of residence and origin columns 
# alongwith the row count to check no records were dropped from the i94 immigration datasets
spark.sql("select * from immigration_tbl").limit(5).show()           
spark.sql("select count(*) from immigration_tbl").show()           


In [None]:
# Get the the count of records from the immigration table where port of entry is some sample 
# Non US city or invalid port of entry
spark.sql("""select count(*)
             from immigration_tbl 
             where trim(i94port) in ('AUH','888','CLG')""").show()


##### Since we are only interested in visitors travelling to U.S. with valid U.S. port of entry , we will do an inner join to eliminate non U.S. cities

In [None]:
# Validate the port of entry columns in the immigration dataset.
df_spark_i94 = spark.sql("""select imm.*
             from immigration_tbl imm
             inner join port_of_entry_tbl prt
             on trim(imm.i94port) = trim(prt.code)""")

df_spark_i94.createOrReplaceTempView('immigration_tbl')


In [None]:
# Validate the port of entry alongwith the row count from the i94 immigration dataset. 
# Check the record count for the sample non US city or invalid port of entry is 0.
spark.sql("select * from immigration_tbl").limit(5).show()           
spark.sql("select count(*) from immigration_tbl").show()
spark.sql("""select count(*)
             from immigration_tbl 
             where trim(i94port) in ('AUH','888','CLG')""").show()


In [None]:
# Check the distinct values mode of travel in the immigration dataset 
spark.sql("""select distinct i94mode,count(*)
             from immigration_tbl
             group by i94mode""").show()

##### We will only keep records where the mode of travel is by land, air and sea. We will drop the records where i94mode is either null or 9.

In [None]:
# Add the mode of travel columns in the immigration dataset for air , sea and land.
df_spark_i94 = spark.sql("""select *,case 
                            when i94mode = 1.0 then 'air'
                            when i94mode = 2.0 then 'sea'
                            when i94mode = 3.0 then 'land'
                            else 'Nan' end as i94_mode_travel
                            from immigration_tbl
                            where i94mode in (1.0,2.0,3.0)""")

df_spark_i94.createOrReplaceTempView('immigration_tbl')

In [None]:
# Validate the mode of travel record alongwith the row count to check only 7422
# records were dropped from a total of 2973390 records from the i94 immigration dataset.
spark.sql("""select *
             from immigration_tbl""").limit(5).show()
spark.sql("""select count(*)
             from immigration_tbl""").show()


In [None]:
# Check the distinct values visa type in the immigration dataset 
spark.sql("""select distinct i94visa,count(*)
             from immigration_tbl
             group by i94visa""").show()

In [None]:
# Add the visa type description column in the immigration dataset for business , pleasure and student.
df_spark_i94 = spark.sql("""select *,case 
                            when i94visa = 1.0 then 'business'
                            when i94visa = 2.0 then 'pleasure'
                            when i94visa = 3.0 then 'student'
                            else 'Nan' end as i94_visa_type
                            from immigration_tbl
                            where i94visa in (1.0,2.0,3.0)""")

df_spark_i94.createOrReplaceTempView('immigration_tbl')

In [None]:
# Validate the visa type in the record alongwith the row count to check none of the  
# records were dropped from the i94 immigration dataset. Number of records = 2965968.
spark.sql("""select *
             from immigration_tbl""").limit(5).show()
spark.sql("""select count(*)
             from immigration_tbl""").show()


##### Lets translate the fields below in the i94 immigration data:
arrdate,depdate: Since the SAS numeric date represents the number of days since 1st Jan 1960, we will need to convert these numeric dates into yyyy-mm-dd format.

In [None]:
# Convert arrival and departure date from SAS numeric format to yyyy-mm-dd date format
df_spark_i94 = spark.sql("""select *,
                   date_add('1960-01-01', arrdate) as i94_arrival_date,
                   date_add('1960-01-01',depdate) as i94_dep_date
                   from immigration_tbl""")
df_spark_i94.createOrReplaceTempView('immigration_tbl')
spark.sql("select * from immigration_tbl").limit(5).show()

In [None]:
#Lets take a look at the min and max of arrival date
spark.sql("""select min(i94_arrival_date),max(i94_arrival_date)
                    from immigration_tbl""").limit(5).show()


In [None]:
#Lets take a look and see if we have any rows where both the arrival and departure dates are null
spark.sql("""select *
             from immigration_tbl
             where i94_arrival_date is null
             and i94_dep_date is null""").limit(5).show()

In [None]:
#Lets take a look and see if we have any rows where the arrival date is null but the departure dates 
# is not null
spark.sql("""select *
             from immigration_tbl
             where i94_arrival_date is null
             and i94_dep_date is not null""").limit(5).show()

In [None]:
# Check the gender columns
spark.sql("""select gender, count(*)
             from immigration_tbl
             group by gender""").show()

##### There are records with null gender values which seem to missing information. We will allow these records to be part of the final data model.


In [None]:
# Check the i94bir and biryear column to see if there are any invalid records and the range of 
# values present in the field
spark.sql("""select max(i94bir), min(i94bir)
             from immigration_tbl""").show()

spark.sql("""select count(*)
             from immigration_tbl
             where i94bir is null""").show()

spark.sql("""select max(biryear), min(biryear)
             from immigration_tbl""").show()

spark.sql("""select count(*)
             from immigration_tbl
             where biryear is null""").show()


#### Looks like i94bir contains negative values for age which is incorrect. We will use the the biryear column to calculate the age where the i94bir is < 0.

In [None]:
# Add the age column in the immigration dataset based on i94bir and biryear fields.
df_spark_i94 = spark.sql("""select *,case 
                            when i94bir < 0.0 then (double(year(current_date))-biryear)
                            else i94bir end as i94_age
                            from immigration_tbl""")

df_spark_i94.createOrReplaceTempView('immigration_tbl')

In [None]:
# Validate the age field in the record alongwith the row count to check none of the  
# records were dropped from the i94 immigration dataset. Number of records = 2965968
spark.sql("""select *
             from immigration_tbl""").limit(5).show()
spark.sql("""select count(*)
             from immigration_tbl""").show()


##### Lets take a look at the temperature dataset.

In [None]:
# Select the min and max dates when the temperature was recorded
spark.sql("""select min(dt),max(dt)
             from us_temp_tbl""").show()

In [None]:
# Check if there are any rows where the temperature was recorded as NaN.
spark.sql("""select *
             from us_temp_tbl
             where AverageTemperature = 'NaN'
             or AverageTemperatureUncertainty = 'NaN'  """).show(truncate=False)

##### Lets cleanup the records where both the Average temperature column is NaN.

In [None]:
# Drop the records where the AverageTemperature column is NaN.
df_spark_us_temp = spark.sql("""select *
             from us_temp_tbl
             where NOT(AverageTemperature = 'NaN')""")
df_spark_us_temp.createOrReplaceTempView("us_temp_tbl")

In [None]:
# Validate the individual records and the total number of records in the us_temp_table
spark.sql("""select *
             from us_temp_tbl""").limit(5).show()
spark.sql("""select count(*)
             from us_temp_tbl""").show()


In [None]:
# Check if the rest of the columns contain any null values.
spark.sql("""select count(*)
             from us_temp_tbl
             where Latitude = 'NaN'
             or Longitude = 'NaN'
             or dt = 'NaN'
             or city = 'NaN'""").show()

spark.sql("""select count(*)
             from us_temp_tbl
             where Latitude is null
             or Longitude is null
             or dt is null
             or city is null""").show()

In [None]:
# Check if there are multiple rows for a given combination of dt,city,lattitude,longitude. 
# The record count should be equal to the total record count in us_temp_table.
spark.sql("""select distinct dt,city,Latitude,Longitude
             from us_temp_tbl""").count()

In [None]:
# Add the port of entry code, state columns in the tempertaure dataset.
df_spark_us_temp = spark.sql("""select temp.*, trim(prt.code) as i94_port_code, trim(prt.state) as state
             from us_temp_tbl temp
             inner join port_of_entry_tbl prt
             on trim(lower(temp.city)) = trim(lower(prt.city))""")

df_spark_us_temp.createOrReplaceTempView('us_temp_tbl')


In [None]:
# Validate the individual records and the total number of records in the us_temp_table
spark.sql("""select *
             from us_temp_tbl""").limit(5).show()
spark.sql("""select count(*)
             from us_temp_tbl""").show()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
##### fact_immigration: The table  will contain the immigration details for travellers travelling in US.
1. i94_cicid
2. i94_country_origin
3. i94_country_residence
4. i94_port_code
5. i94_arrival_date
6. i94_dep_date
7. i94_mode_travel
8. i94_visa_type
9. i94_gender 
10. i94_age
11. year

##### dim_temperature: The table will contain the temperature for USA across various cities post 1970.
1. i94_port_code
2. city 
3. state 
4. date
5. avg_temperature 
6. avg_temperature_un 
7. latitude 
8. longitude
9. year

##### dim_date : The table will contain various date dimensions for the arrival dates from the immigration dataset.
1. date
2. year
3. month
4. day
5. week
6. weekday
7. quarter

#### 3.2 Mapping Out Data Pipelines
##### fact_immigration:
1. Read the following datasets and convert it into spark dataframe: i94_apr16_sub.sas7bdat, I94PORT_lookup.csv, I94CIT_I94RES_lookup.csv
2. Get the actual country names for the country of origin and country of residence codes values by joining i94_apr16_sub.sas7bdat and I94CIT_I94RES_lookup.csv dataframes.
3. Filter out the invalid port of entries by joining i94_apr16_sub.sas7bdat and I94PORT_lookup.csv dataframes. 
4. Replace the mode of the travel, visa type codes with the actual values.Filter out the mode of travel values where mode of travel = land, air, sea.
5. Get the age of the travelling using the age and the birth year information.
6. Get the gender and cicid of the traveller.
7. Convert the arrival date and the departure date from SAS numeric format to yyyy-mm-dd format and filter out the invalid records.
8. Select the required columns to create the fact table and write it in parquet format partitioned by year of arrival date and i94 port code.

##### dim_temperature:
1. Read the GlobalLandTemperaturesByCity.csv file and convert to spark dataframe
2. Check the temperature column for null values and filter out records where the average temperature columns value is null.
3. Check the other required column for null values
4. Join GlobalLandTemperaturesByCity.csv with the I94PORT_lookup.csv dataframes to get the i94 port of entry code and state  based on the city column.
5. Select the required columns to create the temperature dimension table and write in parquet format partitioned by year of the date when the temperature was recorded.

##### dim_date
1. Get the arrival date from immigration fact table and extract various date dimensions such as year, month, day, week etc.
2. Create the date dimension table and write it in parquet format partitioned by year of arrival date.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Select the required columns to create the fact_immigration table.
df_spark_i94 = spark.sql("""select cicid as i94_cicid,
             i94_country_origin,
             i94_country_residence,
             i94port as i94_port_code,
             i94_arrival_date,
             i94_dep_date,
             i94_mode_travel,
             i94_visa_type,
             gender as i94_gender,
             i94_age,
             year(i94_arrival_date) as year
             from immigration_tbl""").distinct()

In [None]:
# Select the required columns to create the dim_temperature table.
df_spark_us_temp = spark.sql("""select i94_port_code,
             city,
             state,
             dt as date,
             AverageTemperature as avg_temperature,
             AverageTemperatureUncertainty as avg_temperature_un,
             Latitude as latitude,
             Longitude as longitude,
             year(dt) as year
             from us_temp_tbl """).distinct()

In [None]:
# Select the required columns to create the dim_date table.
df_spark_date = df_spark_i94.selectExpr("i94_arrival_date as date").distinct()
df_spark_date = df_spark_date.withColumn("year",F.year("date"))
df_spark_date = df_spark_date.withColumn("month",F.month("date"))
df_spark_date = df_spark_date.withColumn("day",F.dayofmonth("date"))
df_spark_date = df_spark_date.withColumn("week",F.weekofyear("date"))
df_spark_date = df_spark_date.withColumn("weekday",F.dayofweek("date"))
df_spark_date = df_spark_date.withColumn("quarter",F.quarter("date"))

In [None]:
# write fact_immigration table to parquet files partitioned by year, i94_port_code 
df_spark_i94.write.mode("append").partitionBy("year","i94_port_code").parquet("output/fact_immigration/")

In [None]:
# write dim_temperature table to parquet files partitioned by year.
df_spark_us_temp.write.mode("append").partitionBy("year").parquet("output/dim_temperature/")

In [None]:
# write dim_date table to parquet files partitioned by year.
df_spark_date.write.mode("append").partitionBy("year").parquet("output/dim_date/")

#### 4.2 Data Quality Checks
* Check the total record counts for each of the 3 target tables
* Check the sample records for each of the 3 target tables
* Check if the mandatory columns in each of the 3 tables do not contain any null values

In [None]:
# The record count for fact immigration table should be 2965968
print(df_spark_i94.count())
# Check the sample record
df_spark_i94.show(5,truncate=False)

In [None]:
# The record count for dimension temperature table should be 51449
print(df_spark_us_temp.count())
# Check the sample record
df_spark_us_temp.show(5,truncate=False)


In [None]:
# The record count for dimension date table should be equal to the number of distinct arrival 
# dates in fact immigration table.
print(df_spark_i94.select("i94_arrival_date").distinct().count())
print(df_spark_date.count())
# Check the sample record
df_spark_date.show(5,truncate=False)


In [None]:
# Check if the mandatory columns in each of the 3 tables do not contain any null values

dq = {df_spark_i94 : ["i94_cicid","i94_country_residence","i94_port_code","i94_arrival_date"],
      df_spark_us_temp : ["i94_port_code","city","date","avg_temperature","latitude","longitude"],
      df_spark_date : ["date"]}

for df,colNames in dq.items():
    for colName in colNames:
        count = 0
        count = df.where(F.col(colName).isNull()).count()
        print("The null record count:" + str(count))
        if (count == 0):
            print("Data quality check passed for column " + colName)
        else:
            print("Data quality check failed for column " + colName)



#### 4.3 Data dictionary 
##### fact_immigration: The table  will contain the immigration details for travellers travelling in the US.
1. i94_cicid = unique id assigned to each row obtained from i94_apr16_sub.sas7bdat(cicid)
2. i94_country_origin = coutry of origin obtained from I94CIT_I94RES_lookup.csv(value)
3. i94_country_residence = country of residence obtained from I94CIT_I94RES_lookup.csv(value)
4. i94_port_code = 3 character i94 port code specifying the port of entry in the USA obtained from i94_apr16_sub.sas7bdat(i94port) 
5. i94_arrival_date = Arrival date in the USA obtained from i94_apr16_sub.sas7bdat(arrdate)
6. i94_dep_date = Departure date from the USA i94_apr16_sub.sas7bdat(depdate)
7. i94_mode_travel = Mode of travel obtained from i94_apr16_sub.sas7bdat(i94mode)
8. i94_visa_type = Visa type issued obtained from i94_apr16_sub.sas7bdat(i94visa)
9. i94_gender = Gender of the traveller obtained from i94_apr16_sub.sas7bdat(gender)
10. i94_age = Age of the traveller obtained from i94_apr16_sub.sas7bdat(i94bir and biryear)
11. year = Year of arrival date

##### dim_temperature: The table will contain the temperature for USA across various cities post 1970.
1. i94_port_code = 3 character i94 port code from the immigration dataset obtained from I94PORT_lookup.csv(code)
2. city = City where the temperature was recorded obtained from GlobalLandTemperaturesByCity.csv(city). 
3. state = State details for the i94 port from the immigration dataset obtained from I94PORT_lookup.csv(state)
4. date = Date when the temperature was recorded obtained from GlobalLandTemperaturesByCity.csv(dt). 
5. avg_temperature = Average temperature in the city obtained from GlobalLandTemperaturesByCity.csv(AverageTemperature). 
6. avg_temperature_un = Average temperature uncertainity in the city obtained from GlobalLandTemperaturesByCity.csv(AverageTemperatureUncertainty). 
7. latitude = latitude where ther temperature was recorded obtained from GlobalLandTemperaturesByCity.csv(Latitude). 
8. longitude = longitude where the temperature was recorded obtained from GlobalLandTemperaturesByCity.csv(Longitude).
9. year = year of date when the tempertature was recorded.

##### dim_date : The table will contain various date dimensions for the arrival and deperature dates from the immigration dataset.
1. date = arrival date obtained from i94_apr16_sub.sas7bdat(arrdate) 
2. year = year of arrival date
3. month =  month of arrival date
4. day = day of arrival date 
5. week = week of the year of arrival date
6. weekday = day of the week of arrival date 
7. quarter = quarter of arrival date


#### Step 5: Complete Project Write Up
* The i94 immigration data for april 2016 itself contains close to 3 million records which would only exponentially increase with every year. To handle such huge record volume, Spark is used taking advantage of its in memory processing capabilities which will provide huge performance boots while running the ETL pipeline. 
* The final tables are stored in parquet format with dimension modelling so that the data can be easily queried and joined to extract meaningful insights again using Spark's sql like capabilities.
* Parquet file format is binary and compressed , hence would occupy less space. Since its a columnar storage, one can easily perform column wise queries which will be faster.
* Update Frequency: Since the end goal is to study the impact of temperature on the travel pattern of people coming to the United states over the years, which is more of analytical nature, the data can be updated on a monthly basis.We dont need data real time.
* Incase the data was increased by 100x, we would store the data again in parquet format on Amazon S3 which is cheap.
* Incase the data populates a dashboard that must be updated on a daily basis by 7am every day, we would automate the data pipeline and data quality checks using Apache Airflow.
* Incase the database needed to be accessed by 100+ people, we would host the tables on Amazon Redshift whose MPP capabilities and columnar storage would boost the query performance.