# Project Title
### Data Engineering Capstone Project

#### Project Summary
I94 Immigration data and city temperature data will be used to create a database that is optimized to query and analyze immigration events. An ETL pipeline is to be build with these to data sources to create the database. Finally, the database will be used to access immigration behaviour to location temperatures.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import psycopg2

In [2]:
from sql_queries import airport_insert, demographic_insert, immigration_insert, temperature_insert

### Step 1: Scope the Project and Gather Data

#### Scope 
This projects aims to enrich the US I94 immigration data with further data such as demographics and temperature data to have a wider basis for analysis on the immigration data.

#### Describe and Gather Data 
This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.

In [None]:
# Read in the data here
i94_path = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_i94 = pd.read_sas(i94_path, 'sas7bdat', encoding="ISO-8859-1")

In [None]:
pd.options.display.max_columns = None

In [None]:
df_i94.head()

## World Temperature Data

In [None]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname)

In [None]:
df_temp.head()

In [None]:
# find all unique country codes in temperature data to find used name for United States 
set(df_temp["Country"].values)

In [None]:
df_temp_us = df_temp[df_temp["Country"] == "United States"]
df_temp_us.head()

## U.S. City Demographic Data

In [None]:
df_demographics = pd.read_csv("us-cities-demographics.csv", delimiter=";")

In [None]:
df_demographics.head()

## Airport Code Table

In [None]:
df_airport_codes = pd.read_csv("airport-codes_csv.csv")

In [None]:
df_airport_codes.head()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [None]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [None]:
df_i94.describe()

In [None]:
df_temp_us.describe()

In [None]:
df_airport_codes.describe()

## Cleaning Steps
Document steps necessary to clean the data.

In [None]:
# Get port locations from SAS text file
with open("./I94_SAS_Labels_Descriptions.SAS") as f:
    content = f.readlines()
content = [x.strip() for x in content]
ports = content[302:962]
splitted_ports = [port.split("=") for port in ports]
port_codes = [x[0].replace("'","").strip() for x in splitted_ports]
port_locations = [x[1].replace("'","").strip() for x in splitted_ports]
port_cities = [x.split(",")[0] for x in port_locations]
port_states = [x.split(",")[-1] for x in port_locations]
df_port_locations = pd.DataFrame({"port_code" : port_codes, "port_city": port_cities, "port_state": port_states})
df_port_locations.head(20)

In [None]:
# print first and last element in dict to check if all lines in file are covered
print(f"First port in SAS file: {df_port_locations['port_city'].values[0]}, last port {df_port_locations['port_city'].values[-1]}")
irregular_ports_df = df_port_locations[df_port_locations["port_city"] == df_port_locations["port_state"]]
irregular_ports = list(set(irregular_ports_df["port_code"].values))
print(irregular_ports)

In [None]:
# drop all irregular ports from i94 data
print(f"i94 data contains {len(df_i94)} rows before cleaning.")
df_i94_filtered = df_i94[~df_i94["i94port"].isin(irregular_ports)]

In [None]:
print(f"i94 data contains {len(df_i94_filtered)} rows after removing irregular ports.")
df_i94_filtered.drop(columns=["insnum", "entdepu", "occup", "visapost"], inplace=True)
df_i94_filtered.dropna(inplace=True)

In [None]:
print(f"i94 data contains {len(df_i94_filtered)} rows after removing NaN values.")

In [None]:
# clear missing temperature values
df_temp_us.dropna(inplace=True)

In [None]:
df_airport_codes.dropna(subset=['iata_code'], inplace=True)

In [None]:
df_airport_codes.head()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

##### Tables:
| table name | columns | description | type |
| ------- | ---------- | ----------- | ---- |
| airports | iata_code - name - type - local_code - coordinates - city | stores information related to airports | dimension table |
| demographics | city - state - media_age - male_population - female_population - total_population - num_veterans - foreign_born - average_household_size - state_code - race - count | stores demographics data for cities | dimension table |
| immigrations | cicid - year - month - cit - res - iata - arrdate - mode - addr - depdate - bir - visa - coun- dtadfil - visapost - occup - entdepa - entdepd - entdepu - matflag - biryear - dtaddto - gender - insnum - airline - admnum - fltno - visatype | stores all i94 immigrations data | fact table |
| temperature | timestamp - average_temperature - average_temperatur_uncertainty - city - country - latitude - longitude | stores temperature information | dimension table |



#### 3.2 Mapping Out Data Pipelines

List the steps necessary to pipeline the data into the chosen data model

1. Create tables by executing `create_tables.py`.
2. Join city to airports data.
3. Insert data.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# After running create_tables.py, insert the data into the database
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
cur = conn.cursor()

In [None]:
df_airport_codes = df_airport_codes.merge(df_port_locations, left_on="iata_code", right_on="port_code")

In [None]:
df_airport_codes.head()

In [None]:
df_airport_codes.drop(columns=["port_code"], inplace=True)
df_airport_codes = df_airport_codes[["iata_code", "name", "type", "local_code", "coordinates", "port_city", "elevation_ft", "continent", "iso_country", "iso_region", "municipality", "gps_code"]]

In [None]:
for index, row in df_airport_codes.iterrows():
    cur.execute(airport_insert, list(row.values))
    conn.commit()

In [None]:
for index, row in df_demographics.iterrows():
    cur.execute(demographic_insert, list(row.values))
    conn.commit()

In [None]:

for index, row in df_i94_filtered.iterrows():
    cur.execute(immigration_insert, list(row.values))
    conn.commit()

In [None]:
for index, row in df_temp_us.iterrows():
    cur.execute(temperature_insert, list(row.values))
    conn.commit()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here
cur.execute("SELECT COUNT(*) FROM airports")
conn.commit()
if cur.rowcount < 1:
    print("No data found in table airports")

In [None]:
cur.execute("SELECT COUNT(*) FROM demographics")
conn.commit()
if cur.rowcount < 1:
    print("No data found in table demographics")

In [None]:
cur.execute("SELECT COUNT(*) FROM immigrations")
conn.commit()
if cur.rowcount < 1:
    print("No data found in table immigrations")

In [None]:
cur.execute("SELECT COUNT(*) FROM temperature")
conn.commit()
if cur.rowcount < 1:
    print("No data found in table temperature")

#### 4.3 Data dictionary 

**Fact Table** - I94 immigration data joined with the city temperature data on i94port
Columns:
   - i94yr = 4 digit year,
   - i94mon = numeric month,
   - i94cit = 3 digit code of origin city,
   - i94port = 3 character code of destination USA city,
   - arrdate = arrival date in the USA,
   - i94mode = 1 digit travel code,
   - depdate = departure date from the USA,
   - i94visa = reason for immigration,
   - AverageTemperature = average temperature of destination city,

**Dimension Table** - I94 immigration data Events
Columns:
   - i94yr = 4 digit year
   - i94mon = numeric month
   - i94cit = 3 digit code of origin city
   - i94port = 3 character code of destination USA city
   - arrdate = arrival date in the USA
   - i94mode = 1 digit travel code
   - depdate = departure date from the USA
   - i94visa = reason for immigration

**Dimension Table** - temperature data
Columns:
- i94port = 3 character code of destination city (mapped from cleaned up immigration data)
- AverageTemperature = average temperature
- City = city name
- Country = country name
- Latitude= latitude
- Longitude = longitude

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project:
  1. I used Spark since it can easily handle multiple file formats (SAS, csv, etc) that contain large amounts of data. Spark SQL was used to process the input files into dataframes and manipulated via standard SQL join operations to create the tables.
    
* Propose how often the data should be updated and why.
    1. Since the format of the raw files are monthly, we should continue pulling the data monthly.
    
### Scenarios
* Write a description of how you would approach the problem differently under the following scenarios:
    1. the data was increased by 100x.
        - Use Amazon Redshift: It is an analytical database that is optimized for aggregation and read-heavy workloads
    2. The data populates a dashboard that must be updated on a daily basis by 7am every day.
        - Airflow can be used here, create DAG retries or send emails on failures.
        - Have daily quality checks; if fail, send emails to operators and freeze dashboards
    3. The database needed to be accessed by 100+ people.
        - Redshift can help us here since it has auto-scaling capabilities and good read performance