# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql import functions as g
from pyspark.sql.functions import col
import functionss as f
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
config("spark.memory.offHeap.enabled","true").\
config("spark.memory.offHeap.size","10g").\
enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>
- In this project I will gather the data from four sources. I will load this data into staging dataframes. I will clean the raw data, write it to parquet files and perform an ETL process using a Spark cluster. Then I will write the data into Fact & Dimension tables to form a star schema. The star schema can then be used by the relevant parties to perform data analytics, correlation and ad-hoc reporting in an effective and efficient manner.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included?
* i94 Immigration Sample Data: Sample data of immigration records from the US National Tourism and Trade Office. This data source will serve as the Fact table in the schema. This data comes from https://travel.trade.gov/research/reports/i94/historical/2016.html.
* World Temperature Data world_temperature. This dataset contains temperature data in various cities from the 1700’s to 2013. Although the data is only recorded until 2013, we can use this as an average/gauge of temperature in 2017. This data comes from https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data.
* US City Demographic Data: Data about the demographics of US cities. This dataset includes information on the population of all US cities such as race, household size and gender. This data comes from https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/.
* Airport Codes: This table contains the airport codes for the airports in corresponding cities. This data comes from https://datahub.io/core/airport-codes#data.

# IMMIGRATION DATA

In [None]:
pd.read_csv("immigration_data_sample.csv",nrows=100).head()

### IMMIGRATION DATA

### Data Dictionary

|Feature|Description|
|--------|-----------|
|cicid|Unique ID|
|i94yr|year|
|i94mon|month|
|i94cit|3 digit code for immigrant country of birth|
|i94res|3 digit code for immigrant country of residence|
|i94port|Port of admission|
|arrdate|Arrival Date in the USA|
|i94mode|Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)|
|i94addr|USA State of arrival|
|depdate|Departure Date from the USA|
|i94bir|Age of Respondent in Years|
|i94visa|Visa codes collapsed into three categories|
|count|Field used for summary statistics|
|dtadfile|Character Date Field - Date added to I-94 Files|
|visapost|Department of State where where Visa was issued|
|occup|Occupation that will be performed in U.S|
|entdepa|Arrival Flag - admitted or paroled into the U.S.|
|entdepd|Departure Flag - Departed, lost I-94 or is deceased|
|entdepu|Update Flag - Either apprehended, overstayed, adjusted to perm residence|
|matflag|Match flag - Match of arrival and departure records|
|biryear|4 digit year of birth|
|dtaddto|Character Date Field - Date to which admitted to U.S. (allowed to stay until)|
|gender|Non-immigrant sex|
|insnum|INS number|
|airline|Airline used to arrive in U.S.|
|admnum|Admission Number|
|fltno|Flight number of Airline used to arrive in U.S.|
|visatype|Class of admission legally admitting the non-immigrant to temporarily stay in U.S|

# US CITIES DEMOGRAPHICS

In [None]:
pd.read_csv("us-cities-demographics.csv",sep=";",nrows=100).head()

### US CITIES DEMOGRAPHICS

### Data Dictionary

|Feature|Description|
|--------|-----------|
|City|City Name|
|State|US State of the City|
|Median Age|The median population age|
|Male Population|Male population total|
|Female Population|Female population total|
|Total Population|Total population|
|Number of Veterans|Number of veterans living in the city|
|Foreign-born|Number of residents who were not born in the city|
|Average Household Size|Average size of houses in the city|
|State Code|Code of the state|
|Race|Race class|
|Count|Number of individuals in each race|

# GlobalLandTemperaturesByCity

In [None]:
pd.read_csv("GlobalLandTemperaturesByCity.csv",nrows=100).head()

### TEMPERATURE DATA

### Data Dictionary

|Feature|Description|
|--------|-----------|
|dt|Date|
|AverageTemperature|Average temperature in celsius|
|AverageTemperatureUncertainty|95% confidence interval around average temperature|
|City|Name of city|
|Country|Name of country|
|Latitude|Latitude of city|
|Longitude|Longitude of city|

# AIRPORT CODES

In [None]:
pd.read_csv("airport-codes_csv.csv",nrows=100).head()

### AIRPORT CODES
### Data Dictionary

|Feature|Description|
|--------|-----------|
|ident|Unique identifier|
|type|Airport type|
|name|Airport name|
|elevation_ft|Airport altitude|
|continent|Continent|
|iso_country|ISO Code of the airport's country|
|iso_region|ISO Code for the airport's region|
|municipality|City/Municipality where the airport is located|
|gps_code|Airport GPS Code|
|iata_code|Airport IATA Code|
|local_code|Airport local code|
|coordinates|Airport coordinates|

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.
#### Cleaning Steps
Document steps necessary to clean the data

# IMMIGRATION DATA

In [None]:
immigration_data=spark.read.parquet("sas_data")
pd.read_csv("immigration_data_sample.csv",nrows=100).head()

In [None]:
immigration_data.printSchema()

In [None]:
f.explore_data(immigration_data)
cleaned_immigration_data=f.clean_null_values(immigration_data)

# US CITIES DEMOGRAPHICS

In [None]:
demographics_data=spark.read.option("delimiter", ";").csv("us-cities-demographics.csv",header=True)
pd.read_csv("us-cities-demographics.csv",sep=";",nrows=100).head()

In [None]:
demographics_data.printSchema()

In [None]:
f.explore_data(demographics_data)
cleaned_demographics_data=f.clean_null_values(demographics_data)

# GlobalLandTemperaturesByCity

In [None]:
temprature_data=spark.read.csv("GlobalLandTemperaturesByCity.csv",header=True,inferSchema=True)
pd.read_csv("GlobalLandTemperaturesByCity.csv",nrows=100).head()

In [None]:
temprature_data.printSchema()

In [None]:
f.explore_data(temprature_data)
cleaned_temprature_data=f.clean_null_values(temprature_data)

# AIRPORT CODES

In [None]:
airport_data=spark.read.csv("airport-codes_csv.csv",header=True,inferSchema=True)
pd.read_csv("airport-codes_csv.csv",nrows=100).head()

In [None]:
airport_data.printSchema()

In [None]:
f.explore_data(airport_data)
cleaned_airport_data=f.clean_null_values(airport_data)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
<img title="" alt="" src="ER.png">

#### 3.2 Mapping Out Data Pipelines

> #### List the steps necessary to pipeline the data into the chosen data model
> - ##### Load the data into staging dataframe
> - ##### clean data from null values and duplicates
> - ##### Create Dimension tables
> - ##### Create Fact table
> - ##### Write data into parquet files
> - ##### Perform data quality checks

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### create immigration fact table

In [None]:
f.process_visa(cleaned_immigration_data,"data/")

#### create demographics dimension table

In [None]:
f.process_demographic(cleaned_demographics_data,"data/")

#### create temprature dimension table

In [None]:
f.process_Temperature(cleaned_temprature_data,"data/")

#### create airport dimension table

In [None]:
f.process_airport(cleaned_airport_data,"data/")

#### create visa dimension table

In [None]:
f.process_visa(cleaned_immigration_data,"data/")

#### create applicant_nationalty and admission_port dimension tables

In [None]:
applicant_nationalty=spark.read.csv("applicant_nationalty_table.csv",header=True)
admission_port=spark.read.csv("admission_port_table.csv",header=True)

In [None]:
admission_port.printSchema()

In [None]:
f.process_applicant_nationality(applicant_nationalty,"data/")

In [None]:
f.process_addmission_port(admission_port,"data/")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
immigration=spark.read.parquet("data/fact_immigration")
visa = spark.read.parquet("data/dim_visa")
temperature = spark.read.parquet("data/dim_temprature")
demographic = spark.read.parquet("data/dim_demographic")
admission_port = spark.read.parquet("data/dim_addmission_port")
airport = spark.read.parquet("data/dim_airport")
applicant_nationality = spark.read.parquet("data/dim_applicant_nationality")

In [None]:
immigration.show(2,vertical=True)

In [None]:
# Perform quality checks here
tables = {
    "immigration": immigration,
    "airport": airport,
    "demographic": demographic,
    "temperature": temperature,
    "applicant_nationality": applicant_nationality,
    "admission_port": admission_port,
    "visa": visa}

for table_name, table in tables.items():
    f.perform_quality_check(table, table_name)

#### 4.3 Data dictionary 
please check data_dictionary.txt

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

### Clearly state the rationale for the choice of tools and technologies for the project:
- #### This project makes use of various Big Data processing technologies including:
- #### Apache Spark, because of its ability to process massive amounts of data as well as the use of its unified analytics engine and convenient APIsPandas, due to its convenient dataframe manipulation functions to gain further insights

### Propose how often the data should be updated and why:
- #### The immigration (i94) data set is updated monthly, hence all relevant data should be updated monthly as well

### The data was increased by 100x:
- #### If the data was increased by 100x I would use more sophisticated and appropriate frameworks to perform processing and storage functions, such as Amazon Redshift, Amazon EMR or Apache Cassandra.

### The data populates a dashboard that must be updated on a daily basis by 7am every day:
- #### If the data had to populate a dashboard daily, I would manage the ETL pipeline in a DAG from Apache Airflow. This would ensure that the pipeline runs in time, that data quality checks pass, and provide a convenient means of notification should the pipeline fail.

### The database needed to be accessed by 100+ people:
- #### If the data needed to be accessed by many people simultaneously, I would move the analytics database to Amazon Redshift which can handle massive request volumes and is easily scalable.