### Data Engineering Capstone Project
### Process immegration data and co-relate with temperature data using Spark 

In [1]:
import re
import pandas as pd
import psycopg2
from collections import defaultdict
from datetime import datetime, timedelta
from pyspark.sql.functions import udf

In [None]:
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_im = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [None]:
df = df_im
df.head()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(
    "spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").enableHiveSupport().getOrCreate()

def get_temp_data(path="../../data2/GlobalLandTemperaturesByCity.csv"):
    return spark.read.format("csv").option("header", "true").load(path)

def get_imm_data(path='../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'):
    return spark.read.format('com.github.saurfang.sas.spark').load(path)

# Test
# get_temp_data().show()
# get_imm_data().show()

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

1. By checking the data using pandas, it was observed that AverageTemperature data might contains NAN or ""
2. The City in the temperature data and City in the I94_SAS_Labels_Description SAS file doesn't always match 
2. The i94port in immigration data contains XXX 

#### Cleaning Steps
Clean temperature data with AverageTemperature with NAN and ""
Clean immigration data with i94port null and XXX


In [3]:
def clean_temp_data(df_temp_data):
    df_temp_data=df_temp_data.filter(df_temp_data.AverageTemperature != 'NaN')
    df_temp_data=df_temp_data.filter(df_temp_data.AverageTemperature != "")
    df_temp_data=df_temp_data.withColumn("i94_port", map_i94port(df_temp_data.City))
    df_temp_data=df_temp_data.filter(df_temp_data.i94_port != 'null')
    return df_temp_data

def clean_imm_data(imm_data):
    imm_data=imm_data.filter((imm_data.i94port != 'null') & (imm_data.i94port != 'xxx'))
    return imm_data

# Test
# clean_temp_data(get_temp_data())
# clean_imm_data(get_imm_data())

### Pre pipeline Mapping and definition
1. Create the City to port mapping data using the data fiven in I94_SAS_Labels_Descriptions.SAS file
   Upon manual inspection it is found that the mapping data recides from line 303:893
   output will be a dictionary with {City: I94_port} schema
2. Create a UDF to map the city to i94 port using the mapping data created using the above step
   Upon manaul inspection, its found that the city in temperature data is not accurately mathcing with the 
   city in immigration data, hence the mapping is done using simple substring search


In [4]:
port_city_map = {}
with open("I94_SAS_Labels_Descriptions.SAS") as fd:
    lines = fd.readlines()
    port_city_map = {x.split("'")[3].strip().lower(): x.split("'")[1] for x in lines[303:893]}

@udf()
def map_i94port(col):
      for key, value in port_city_map.items():
        if col.lower() in key:
            return value


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

In [5]:
def get_imm_temp_fact_df(df_imm, df_temp_data):
    df_imm.createOrReplaceTempView("imm_data_table")
    df_temp_data.createOrReplaceTempView("temp_data_table")

    fact_table = spark.sql('''
    SELECT
           i94port,
           arrdate,
           depdate,
           i94visa as visa_type,
           AverageTemperature as temp,
           i94yr as year,
           i94mon as month,
           i94cit as city
    FROM imm_data_table JOIN temp_data_table ON imm_data_table.i94port = temp_data_table.i94_port
    ''')
    fact_table.createOrReplaceTempView("temp_imm_data_table")
    return fact_table

def save_df_parquet(df_imm, df_temp_data, fact_table):
    # Save all data as parquet files
    fact_table.write.mode('overwrite').partitionBy("i94port", "visa_type").parquet( "i94_temp_fact.parquet")
    df_imm.write.mode('overwrite').partitionBy("i94port", "i94visa").parquet( "i94_imm_dim.parquet")
    df_temp_data.write.mode('overwrite').partitionBy("i94_port", "City").parquet( "port_temp_dim.parquet")


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [6]:
# load data
temp_data = get_temp_data()
imm_data = get_imm_data()

In [7]:
# Clean data
temp_data = clean_temp_data(temp_data)
imm_data = clean_imm_data(imm_data)

In [8]:
# Load fact data
fact_data = get_imm_temp_fact_df(imm_data, temp_data)

In [9]:
fact_data.count()

10644246266

In [None]:
# Save all data
save_df_parquet(imm_data, temp_data, fact_data)

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
def data_quality_check(test):
    count = test['df'].count()
    if count != test['expected']:
        print(f"[FAIL]: Data quality check failed for {test['name']} with {count} records, expected {test['expected']}")
        return
    print(f"[PASS]: Data quality check failed for {test['name']} with {count} records")

test_cases = [
    {"name": "imm_data", "df":imm_data, "expected": 3096313},
    {"name": "temp_data", "df":temp_data, "expected": 540912},
    {"name": "fact_data", "df":fact_data, "expected": 10644246266}
]
for test in test_cases:
    data_quality_check(test)

#### 4.3 Data dictionary 
Below mentioned dictinoary provides the idea of data with description

In [None]:
Immigration_temperature_fact_schema = {
    "year" : "year of temperature recorded",
    "month": "month of temperature recorded",
    "City": "city code",
    "i94port": "destination us city i94 port code",
    "arrdate": "arrival date in the USA",
    "depdate": "dearture date in the USA",
    "visa_type": "Type of visa",
}

temperature_data_schema = {
    "AverageTemperature": "average temperature",
    "City": "city name",
    "Country": "country name",
    "Latitude": "latitude",
    "Longitude": "longitude"
}

Immegration_data_schema = {
    "i94yr" = "year",
    "i94mon" =  "month",
    "i94cit" = "city",
    "i94port" = "I94 port code",
    "arrdate" = "arrival date",
    "i94mode" = "travel mode",
    "depdate" = "departure date",
    "i94visa" = "Type of visa"
}

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

### Clearly state the rationale for the choice of tools and technologies for the project.
1. Pandas is used to play around the data as it gives developers the power to quickly read the files and play around the data before even implementing it using the Spark on large datasets
2. Spark is used as it can compute complex and resource consuming computations in-memory and distributed with reliablity, also it can handle all variety of data. Also Spark SQL helps to process the large input files into dataframes and manipulated via standard SQL join operations to form additional tables.

### Propose how often the data should be updated and why.

The data should be updated weekly as temperature changes and people will be travelling the country every day.

### Write a description of how you would approach the problem differently under the following scenarios:
     1. The data was increased by 100x.

    Spark should be running in cluster mode using a cluster manager like Mesos or yarm.
     Monitoring will be necessary to keep track of the data being processed
    Spark jobs can be schedules using Apache Airflow to run the spark processing every night or couple of hours before 7AM.
    All the parquet files cam be made available in query tools like, PIG/Hive or Impala like tools to access the data by 100+ people
