# Project Title
### Data Engineering Capstone Project

#### Project Summary
- This project aims to combine four data sets containing immigration data, airport codes, demographics of US cities and global temperature data. The primary purpose of the combination is to create a schema which can be used to derive various correlations, trends and analytics. For example, one could attempt to correlate the influence of the average temperature of a migrant's resident country on their choice of US state, and what the current dempgraphic layout of that state is.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [None]:
# Do all imports and installs here
! pip install -U numpy
! pip install missingno

: 

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import os
import configparser
import datetime as dt
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear, avg, monotonically_increasing_id
from pyspark.sql.types import *
import requests
requests.packages.urllib3.disable_warnings()
from pyspark.sql.functions import year, month, dayofmonth, weekofyear, date_format
from pyspark.sql import SparkSession, SQLContext, GroupedData, HiveContext
from pyspark.sql.functions import *
from pyspark.sql.functions import date_add as d_add
from pyspark.sql.types import DoubleType, StringType, IntegerType, FloatType
from pyspark.sql import functions as F
from pyspark.sql.functions import lit
from pyspark.sql import Row
import datetime, time
import tools as tools
import create_tables as ct

: 

In [None]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.config("spark.python.worker.memory", "15g") \
.enableHiveSupport().getOrCreate()

df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

: 

In [None]:
#write to parquet
# df_spark.write.parquet("sas_data")
# df_spark=spark.read.parquet("sas_data")

: 

### Step 1: Scope the Project and Gather Data

#### Scope 
##### Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use?

- In this project I will gather the data from four sources. I will load this data into staging dataframes. I will clean the raw data, write it to parquet files and perform an ETL process using a Spark cluster. Then I will write the data into Fact & Dimension tables to form a star schema. The star schema can then be used by the relevant parties to perform data analytics, correlation and ad-hoc reporting in an effective and efficient manner.

#### Describe and Gather Data 
##### Describe the data sets you're using. Where did it come from? What type of information is included? 

- i94 Immigration Sample Data: Sample data of immigration records from the US National Tourism and Trade Office. This data source will serve as the Fact table in the schema. This data comes from https://travel.trade.gov/research/reports/i94/historical/2016.html.
- World Temperature Data world_temperature. This dataset contains temperature data in various cities from the 1700’s to 2013. Although the data is only recorded until 2013, we can use this as an average/gauge of temperature in 2017. This data comes from https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data.
- US City Demographic Data: Data about the demographics of US cities. This dataset includes information on the population of all US cities such as race, household size and gender. This data comes from https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/.
- Airport Codes: This table contains the airport codes for the airports in corresponding cities. This data comes from https://datahub.io/core/airport-codes#data.

##### TEMPERATURE DATA

In [None]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature_df = pd.read_csv(fname)

: 

In [None]:
temperature_df.head()

: 

##### Data Dictionary

Feature                       |Description
:-----------------------------|:-----------
dt                            |Date
AverageTemperature            |Average temperature in celsius
AverageTemperatureUncertainty |95% confidence interval around average temperature
City                          |Name of city
Country                       |Name of country
Latitude                      |Latitude of city
Longitude                     |Longitude of city

##### AIRPORT CODES

In [None]:
airport_codes = 'airport-codes_csv.csv'
airport_df = pd.read_csv(airport_codes)

: 

In [None]:
airport_df.head()

: 


##### Data Dictionary

Feature       |Description
:-------------|:-----------
ident         |Unique identifier
type          |Airport type
name          |Airport name
elevation_ft  |Airport altitude
continent     |Continent
iso_country   |ISO Code of the airport's country
iso_region    |ISO Code for the airport's region
municipality  | City/Municipality where the airport is located
gps_code      |Airport GPS Code
iata_code     |Airport IATA Code
local_code    |Airport local code
coordinates   |Airport coordinates

##### IMMIGRATION DATA

In [None]:
immigration_data = 'immigration_data_sample.csv'
immigration_df = spark.read.format('parquet').load(immigration_data)

: 

In [None]:
immigration_df.head()

: 

##### Data Dictionary

Feature  |Description
:--------|:-----------
cicid    |Unique ID
i94yr    |year
i94mon   |month
i94cit   |3 digit code for immigrant country of birth
i94res   |3 digit code for immigrant country of residence
i94port  |Port of admission
arrdate  |Arrival Date in the USA
i94mode  |Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
i94addr  |USA State of arrival
depdate  |Departure Date from the USA
i94bir   |Age of Respondent in Years
i94visa  |Visa codes collapsed into three categories
count    |Field used for summary statistics
dtadfile |Character Date Field - Date added to I-94 Files
visapost |Department of State where where Visa was issued
occup    |Occupation that will be performed in U.S
entdepa  |Arrival Flag - admitted or paroled into the U.S.
entdepd  |Departure Flag - Departed, lost I-94 or is deceased
entdepu  |Update Flag - Either apprehended, overstayed, adjusted to perm residence
matflag  |Match flag - Match of arrival and departure records
biryear  |4 digit year of birth
dtaddto  |Character Date Field - Date to which admitted to U.S. (allowed to stay until)
gender   |Non-immigrant sex
insnum   |INS number
airline  |Airline used to arrive in U.S.
admnum   |Admission Number
fltno    |Flight number of Airline used to arrive in U.S.
visatype |Class of admission legally admitting the non-immigrant to temporarily stay in U.S.

###### Immigration Country Mapping

##### US CITIES DEMOGRAPHICS

In [None]:
us_cities_demographics = 'us-cities-demographics.csv'
demographics_df = spark.read.csv(us_cities_demographics, inferSchema=True, header=True, sep=';')

: 

In [None]:
demographics_df.limit(5).toPandas()

: 

##### Data Dictionary

Feature                       |Description
:-----------------------------|:-----------
City |City Name
State |US State of the City
Median Age |The median population age
Male Population |Male population total
Female Population |Female population total
Total Population |Total population
Number of Veterans |Number of veterans living in the city
Foreign-born |Number of residents who were not born in the city
Average Household Size |Average size of houses in the city
State Code |Code of the state
Race |Race class
Count |Number of individuals in each race

### Step 2: Explore and Assess the Data

 - Please refer to the "Explore & Assess Data" notebook for data exploration and analysis
 

##### Data Cleaning Steps Required:
- Drop columns containing over 90% missing values
- Drop duplicate values


In [None]:
# Drop columns with over 90% missing values
clean_temperature = tools.eliminate_missing_data(temperature_df)

: 

In [None]:
clean_temperature = tools.drop_duplicate_rows(clean_temperature)

: 

In [None]:
start_date = "2010-01-01"
end_date = "2020-01-01"

after_start_date = clean_temperature["dt"] >= start_date
before_end_date = clean_temperature["dt"] <= end_date
between_two_dates = after_start_date & before_end_date
clean_temperature = clean_temperature.loc[between_two_dates]

: 

In [None]:
clean_temperature.head()

: 

In [None]:
# Drop columns with over 90% missing values
clean_airport_codes = tools.eliminate_missing_data(airport_df)

: 

In [None]:
clean_airport_codes = tools.drop_duplicate_rows(clean_airport_codes)

: 

In [None]:
clean_airport_codes.head()

: 

In [None]:
# Drop columns with over 90% missing values
clean_immigration = tools.eliminate_missing_data(immigration_df)

: 

In [None]:
clean_immigration = tools.drop_duplicate_rows(clean_immigration)

: 

In [None]:
clean_immigration.head()

: 

In [None]:
# Drop columns with over 90% missing values
clean_demographics = tools.eliminate_missing_data(demographics_df.toPandas())

: 

In [None]:
clean_demographics = tools.drop_duplicate_rows(clean_demographics)

: 

In [None]:
clean_demographics.head()

: 

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
##### Map out the conceptual data model and explain why you chose that model:

In accordance with Kimball Dimensional Modelling Techniques, laid out in this document 
(http://www.kimballgroup.com/wp-content/uploads/2013/08/2013.09-Kimball-Dimensional-Modeling-Techniques11.pdf), 
the following modelling steps have been taken:

- 1. Select the Business Process:
    - The immigration department follows their business process of admitting migrants into the country. This process generates events which are captured and translated to facts in a fact table

- 2. Declare the Grain:
    - The grain identifies exactly what is represented in a single fact table row.
    - In this project, the grain is declared as a single occurrence of a migrant entering the USA.

- 3. Identify the Dimensions:
    - Dimension tables provide context around an event or business process.
    - The dimensions identified in this project are:
        - dim_migrant
        - dim_status
        - dim_visa
        - dim_temperature
        - dim_country
        - dim_state
        - dim_time
        - dim_airport
        

- 4. Identify the Facts:
    - Fact tables focus on the occurrences of a singular business process, and have a one-to-one relationship with the events described in the grain.
    - The fact table identified in this project is:
        - fact_immigration
    
For this application, I have developed a set of Fact and Dimension tables in a Relational Database Management System to form a Star Schema.
This Star Schema can be used by Data Analysts and other relevant business professionals to gain deeper insight into various immigration figures, trends and statistics recorded historically.

![alt_text](./Conceptual_Data_Model_5.png)

#### 3.2 Mapping Out Data Pipelines
##### List the steps necessary to pipeline the data into the chosen data model:

- 1. Load the data into staging tables
- 2. Create Dimension tables
- 3. Create Fact table
- 4. Write data into parquet files
- 5. Perform data quality checks
    

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
output_path = "tables/"

: 

In [None]:
clean_immigration.head()

: 

In [None]:
# query plan

# comes from clean_immigration

# create schema
immigration_schema = StructType([StructField("0", IntegerType(), True)\
                          ,StructField("cicid", FloatType(), True)\
                          ,StructField("i94yr", FloatType(), True)\
                          ,StructField("i94mon", FloatType(), True)\
                          ,StructField("i94cit", FloatType(), True)\
                          ,StructField("i94res", FloatType(), True)\
                          ,StructField("i94port", StringType(), True)\
                          ,StructField("arrdate", FloatType(), True)\
                          ,StructField("i94mode", FloatType(), True)\
                          ,StructField("i94addr", StringType(), True)\
                          ,StructField("depdate", FloatType(), True)\
                          ,StructField("i94bir", FloatType(), True)\
                          ,StructField("i94visa", FloatType(), True)\
                          ,StructField("count", FloatType(), True)\
                          ,StructField("dtadfile", StringType(), True)\
                          ,StructField("visapost", StringType(), True)\
                          ,StructField("entdepa", StringType(), True)\
                          ,StructField("entdepd", StringType(), True)\
                          ,StructField("matflag", StringType(), True)\
                          ,StructField("biryear", FloatType(), True)\
                          ,StructField("dtaddto", StringType(), True)\
                          ,StructField("gender", StringType(), True)\
                          ,StructField("airline", StringType(), True)\
                          ,StructField("admnum", FloatType(), True)\
                          ,StructField("fltno", StringType(), True)\
                          ,StructField("visatype", StringType(), True)])

immigration_spark = spark.createDataFrame(clean_immigration, schema=immigration_schema)

immigration_spark.toPandas().head()

: 

In [None]:
# create schema
temperature_schema = StructType([StructField("dt", StringType(), True)\
                          ,StructField("AverageTemperature", FloatType(), True)\
                          ,StructField("AverageTemperatureUncertainty", FloatType(), True)\
                          ,StructField("City", StringType(), True)\
                          ,StructField("Country", StringType(), True)\
                          ,StructField("Latitude", StringType(), True)\
                          ,StructField("Longitude", StringType(), True)])

temperature_spark = spark.createDataFrame(clean_temperature, schema=temperature_schema)

temperature_spark.toPandas().head()

: 

In [None]:
# create schema
demographics_schema = StructType([StructField("City", StringType(), True)\
                        ,StructField("State", StringType(), True)\
                        ,StructField("Median Age", FloatType(), True)\
                        ,StructField("Male Population", FloatType(), True)\
                        ,StructField("Female Population", FloatType(), True)\
                        ,StructField("Total Population", IntegerType(), True)\
                        ,StructField("Number of Veterans", FloatType(), True)\
                        ,StructField("Foreign-born", FloatType(), True)\
                        ,StructField("Average Household Size", FloatType(), True)\
                        ,StructField("State Code", StringType(), True)\
                        ,StructField("Race", StringType(), True)\
                        ,StructField("Count", IntegerType(), True)])

demographics_spark = spark.createDataFrame(clean_demographics, schema=demographics_schema)

demographics_spark.toPandas().head()

: 

In [None]:
# create schema
airport_codes_schema = StructType([StructField("ident", StringType(), True)\
                        ,StructField("type", StringType(), True)\
                        ,StructField("name", StringType(), True)\
                        ,StructField("elevation_ft", FloatType(), True)\
                        ,StructField("continent", StringType(), True)\
                        ,StructField("iso_country", StringType(), True)\
                        ,StructField("iso_region", StringType(), True)\
                        ,StructField("municipality", StringType(), True)\
                        ,StructField("gps_code", StringType(), True)\
                        ,StructField("iata_code", StringType(), True)\
                        ,StructField("local_code", StringType(), True)\
                        ,StructField("coordinates", StringType(), True)])

airport_codes_spark = spark.createDataFrame(clean_airport_codes, schema=airport_codes_schema)

airport_codes_spark.toPandas().head()

: 

##### 1. Create dim_migrant

In [None]:
migrant =  ct.create_migrant_dimension(immigration_spark, output_path)

: 

In [None]:
migrant = spark.read.parquet("tables/migrant")
migrant.toPandas().head()

: 

##### 2. Create dim_status

In [None]:
status = ct.create_status_dimension(immigration_spark, output_path)

: 

In [None]:
status = spark.read.parquet("tables/status")
status.toPandas().head()

: 

##### 3. Create dim_visa

In [None]:
visa = ct.create_visa_dimension(immigration_spark, output_path)

: 

In [None]:
visa = spark.read.parquet("tables/visa")
visa.toPandas().head()

: 

##### 4. Create dim_state

In [None]:
state = ct.create_state_dimension(demographics_spark, output_path)

: 

In [None]:
state = spark.read.parquet("tables/state")
state.toPandas().head()

: 

##### 5. Create dim_time

In [None]:
time = ct.create_time_dimension(immigration_spark, output_path)

: 

In [None]:
time = spark.read.parquet("tables/time")
time.toPandas().head()

: 

##### 6. Create dim_airport

In [None]:
airport_codes = ct.create_airport_dimension(airport_codes_spark, output_path)

: 

In [None]:
airport = spark.read.parquet("tables/airport")
airport.toPandas().head()

: 

##### 7. Create dim_temperature

In [None]:
temperature = ct.create_temperature_dimension(temperature_spark, output_path)

: 

In [None]:
temperature = spark.read.parquet("tables/temperature")
temperature.toPandas().head()

: 

##### 8. Create dim_country

In [None]:
country_names = spark.read.parquet("./i94res_country_mapping")
country_names.toPandas().head() 

: 

In [None]:
country = ct.create_country_dimension(country_names, output_path)

: 

In [None]:
country = spark.read.parquet("./tables/country")
country.toPandas().head()

: 

##### 9. Create fact_immigration

In [None]:
# join city and temperature
country_temp = country.select(["*"])\
            .join(temperature, (country.country == upper(temperature.country)), how='full')\
            .select([country.country_code, country.country, temperature.temperature_id, temperature.average_temperature, temperature.average_temperature_uncertainty])

country_temp.write.mode("overwrite").parquet("tables/country_temperature_mapping")

: 

In [None]:
immigration = ct.create_immigration_fact(immigration_spark, output_path, spark)

: 

In [None]:
immigration = spark.read.parquet("tables/immigration")

immigration.toPandas().head()

: 

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

##### 1. Check table columns

In [None]:
airport = spark.read.parquet("tables/airport")
airport.toPandas().head()

: 

In [None]:
country = spark.read.parquet("tables/country")
country.toPandas().head()

: 

In [None]:
temperature = spark.read.parquet("tables/temperature")
temperature.toPandas().head()

: 

In [None]:
migrant = spark.read.parquet("tables/migrant")
migrant.toPandas().head()

: 

In [None]:
state = spark.read.parquet("tables/state")
state.toPandas().head()

: 

In [None]:
status = spark.read.parquet("tables/status")
status.toPandas().head()

: 

In [None]:
time = spark.read.parquet("tables/time")
time.toPandas().head()

: 

In [None]:
visa = spark.read.parquet("tables/visa")
visa.toPandas().head()

: 

In [None]:
immigration = spark.read.parquet("tables/immigration")
immigration.toPandas().head()

: 

##### 2. Check Record Count

In [None]:
tables = {
    "airport": airport,
    "country": country,
    "temperature": temperature,
    "migrant": migrant,
    "state": state,
    "status": status,
    "time": time,
    "visa": visa,
    "immigration": immigration
}

for table_name, table in tables.items():
    tools.perform_quality_check(table, table_name)

: 

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

##### - Please refer to Data_Dictionary.txt

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

##### 1. Clearly state the rationale for the choice of tools and technologies for the project:

- This project makes use of various Big Data processing technologies including:
    - Apache Spark, because of its ability to process massive amounts of data as well as the use of its unified analytics engine and convenient APIs
    - Pandas, due to its convenient dataframe manipulation functions
    - Matplotlib, to plot data and gain further insights

##### 2. Propose how often the data should be updated and why:

- The immigration (i94) data set is updated monthly, hence all relevant data should be updated monthly as well

##### 3. Write a description of how you would approach the problem differently under the following scenarios:

##### 3.1 The data was increased by 100x:
- If the data was increased by 100x I would use more sophisticated and appropriate frameworks to perform processing and storage functions, such as Amazon Redshift, Amazon EMR or Apache Cassandra.

##### 3.2 The data populates a dashboard that must be updated on a daily basis by 7am every day:
- If the data had to populate a dashboard daily, I would manage the ETL pipeline in a DAG from Apache Airflow. This would ensure that the pipeline runs in time, that data quality checks pass, and provide a convenient means of notification should the pipeline fail.

##### 3.3 The database needed to be accessed by 100+ people:
- If the data needed to be accessed by many people simultaneously, I would move the analytics database to Amazon Redshift which can handle massive request volumes and is easily scalable.