# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
import  pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
#from pyspark.sql.functions import sum as Fsum


import datetime

import numpy as np
import pandas as pd

In [None]:
# Project: Capstone Project.
# Scope of the Project:
Creating a ETL Pipeline using Spark.To achieve this we use Immigration data which comes from US National Tourism and Trade Office , City Demophrapics data which comes from Opensoft.

Immigration data has detailed information of people arriving to US through Air, Sea, Land. for ex: Arrival City, Visa type, arrival date and time, departure date and time etc, 
City Demographic data file has information related to each city which includes Population,State code, Median Population , native born, foriegn born etc. 

Description:
To complete the project, we will need to load data from Harddisk, process the data into analytics tables using Spark.
If the data is biggerthan disk we will deploy spark process on a cluster using AWS and load the data on to S3.

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
Immig_data =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
city_data=spark.read.csv('us-cities-demographics.csv',sep=";", inferSchema=True, header=True)

In [None]:
#Immig_data.write.mode('overwrite').parquet("sas_data")
Immmig_data=spark.read.parquet("sas_data")

In [11]:
Immmig_data.printSchema()
Immmig_data.show(5)

In [None]:
city_data.printSchema()
city_data.show(5)

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [None]:
#Exploring the data and identifying the data issues.
Immmig_data.count()

In [None]:
#Based on the Cicid count which matches with Total cound of Immigration data we can consider cicid to unique with no duplictae values
ccid_count = State_dtls.select(countDistinct("cicid"))
ccid_count.show()

In [None]:
#checking below if we have any missing values for the required columns in Immigration data. 
Immmig_data.describe("i94addr","i94yr","i94mon","arrdate","visatype","i94mode").show()

In [None]:
#Dropping rows with null values
Immmig_data = Immmig_data.dropna(how = "any", subset = ["cicid", "i94addr","i94mode"])

In [None]:
#Dropped all the rows with no address as our analysis is only based on the state codes in US. 
Immmig_data.describe("cicid","i94addr","i94yr","i94mon","arrdate","visatype","i94mode").show()

In [None]:
#Checking if there are any Empty Values in Cicid column
Immmig_data.select("cicid").dropDuplicates().sort("cicid").show()

In [None]:
#Changing the data types from Double to Int for all the Numeric Fields.I could have done this by defining a new schema for \
#Immigration data and infering this schema when importing the data.
#Converting the SAS Numeric date form to actual date form.
Immmig_data = Immmig_data.withColumn('i94yr', expr("cast(i94yr  as int)")) \
.withColumn('i94mon', expr("cast(i94mon  as int)")) \
.withColumn('arrdate', expr("date_add('1960-01-01',arrdate)")) \
.withColumn('depdate', expr("date_add('1960-01-01',depdate)")) \
.withColumn('cicid', expr("cast(cicid  as int)")) \
.withColumn('i94mode', expr("cast(i94mode  as int)")) \
.withColumn('i94bir', expr("cast(i94bir  as int)")) \
.withColumn('i94visa', expr("cast(i94visa  as int)")) \
.withColumn('biryear', expr("cast(biryear  as int)"))

In [None]:
#checking to see how data is distributed as per the i94addr
Immmig_data.groupby("i94addr").count().orderBy(desc("count")).show()

In [None]:
#Exploring City Data
city_data.count()

In [None]:
city_data.describe("City","State","Male Population","Female Population","State Code","Median Age").show()

In [None]:
#Dropping Null rows if there are any.
city_data = city_data.dropna(how = "any", subset = ["City", "State","State Code"])

In [None]:
#validating why we have duplicate values for each city and State
city_data.select("City", "State","State Code","Total Population","Race","Count").filter(city_data["City"].isin(["Lawrence"])).sort("Count").show()

In [None]:
#changing the name of the columns to remove trailing whitespaces.
city_data = city_data.selectExpr("City as city","State as state","`Median Age` as Median_age",\
                                 "`Male Population` as Male_Population","`Female Population` as Female_Population",\
                                 "`Total Population` as Total_Population","`Number of Veterans` as Total_Veterans",\
                                 "`Foreign-born` as Foreign_born","`Average Household Size` as Average_Household_Size",\
                                 "`State Code` as State_Code","Race as race","count as count")

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Schema for Imigration Visit Analysis
Using the Immigration and City Demographic datasets,I have created a star schema optimized for queries on Immigration Visit analysis. This includes the following tables.

The Goal of the design is to organize the data to avoid duplication of fields and repeating data, and to ensure quality of the data.
Followed "Normalization" process to distribute the data into individual entities.

Fact Table
Immig Visit - records in Immigartion data associated with Immigration visits 
  Immig_Event_id: long 
  visitor_id: integer 
  Date: date 
  month: integer 
  year: integer 
  i94mode: integer 
  i94visa: integer 
  Match_flag: string 
  i94addr: string 
  
Dimension Tables
Immig Details - Vistors entering the country
  visitor_id: integer (nullable = true)
  i94yr: integer 
  i94mon: integer
  arrival_date: date 
  i94mode: string 
  departure_date: date 
  Age: integer 
  i94visa: string 
  create_dt: string 
  visapost: string 
  birthyr: integer 
  gender: string 
  airline: string 
  visatype: string
 
State Details - State Details
  state_id: long 
  state: string 
  State_Code: string 
  city: string 
  
Date Table - Date of records in Immigration visits broken down into specific units
  Date: date 
  day: integer 
  week: integer 
  month: integer 
  year: integer 
  weekday: integer

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model
 * Processing both immigration and City demographics data to create analytics tables
 * Creat Immigration view to store data in staging table
 * Create City Data view to store data in staging table
 * extract columns from Immigration view to create Immig_dtls table
 * extract columns from State Data view to create State details table
 * extract columns from Immigration view to create Date table
 * extract columns from Immigration view to create Date table
 * extract columns from Immigration and state to create Immigration event Fact table.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
#Processing both immigration and City data to create analytics tables
#Creating Immigration view to store data in staging table
#Creating City Data view to store data in staging table
Immmig_data.createOrReplaceTempView("stg_Immig_data")
city_data.createOrReplaceTempView("stg_city_data")

In [None]:
# extract columns to create Immig_dtls table
Immig_dtls = spark.sql("""
             select Distinct 
             a.cicid as visitor_id,
             a.i94yr,
             a.i94mon,
             a.arrdate as arrival_date,
             Case when a.i94mode = 1 then 'AIR' \
             when a.i94mode = 2 then 'Sea'\
             when a.i94mode = 3 then 'Land' else 'other' end as i94mode,
             a.depdate as departure_date,
             a.i94bir as Age,
             Case when a.i94visa = 1 then 'Business'\
                  when a.i94visa = 2 then 'Pleasure' \
                  when a.i94visa = 3 then 'Student' else 'other' end as i94visa,
             a.dtadfile as create_dt,
             a.visapost ,
             a.biryear as birthyr,
             a.gender,
             a.airline,
             a.visatype
             from
             stg_Immig_data  a left join
             stg_city_data b on a.i94addr = b.State_Code
             where 
             a.cicid is not null 
          """)

In [None]:
Immig_dtls.createOrReplaceTempView("dim_Immig_dtls")

In [None]:
# extract columns to create State_dtls table
State_dtls = spark.sql("""
             select Distinct
             monotonically_increasing_id() as state_id,
             a.state as state,
             a.State_Code,
             a.city
             from
             stg_city_data  a 
             where
             a.State_Code is not null
          """)

In [None]:
State_dtls.createOrReplaceTempView("dim_State_dtls")

In [None]:
# extract columns to create Date_table 
Date_table = spark.sql("""
             select Distinct
             a.arrdate as Date,
             hour(a.arrdate) as hour,
             dayofmonth(a.arrdate) as day,
             weekofyear(a.arrdate) as week,
             month(a.arrdate) as month,
             year(a.arrdate) as year,
             dayofweek(a.arrdate) as weekday
             from
             stg_Immig_data  a 
             where
             a.arrdate is not null
          """)

In [None]:
Date_table.createOrReplaceTempView("dim_Date_table")

In [None]:
Immig_visit = spark.sql("""
             select Distinct 
             monotonically_increasing_id() as Immig_visit_id,
             a.cicid as visitor_id,
             a.arrdate as Date,
             month(a.arrdate) as month,
             year(a.arrdate) as year,
             a.i94mode,
             a.i94visa,
             a.matflag as Match_flag,
             Case when a.i94addr = b.State_Code then a.i94addr \
             else '99' end as i94State
             from
             stg_Immig_data  a left join
             stg_city_data b on a.i94addr = b.State_Code
             where 
             a.cicid is not null 
          """)

In [None]:
Immig_visit.createOrReplaceTempView("Fact_Immig_visit")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
#Immigration Source/Analytics table count check to ensure completness
Immmig_data.count()
Immig_dtls.count()

In [None]:
#City Demographic Source/Analytics table count check to ensure completness
city_data.count()
State_dtls.select(countDistinct("state_id"))

In [None]:
#making sure analytic tables are created as per Integrity Constraints by running below commands
Immig_visit.printSchema()

In [None]:
Date_table.printSchema()

In [None]:
#Running unit tests to make sure that data loaded properly. 
Immig_dtls.show(5)

In [None]:
Immig_visit.show(5)

In [None]:
Date_table.show(5)

In [None]:
State_dtls.show(5)

In [None]:
#Run below query to make sure all the joins work
spark.sql("""
             select
             count(Distinct a.visitor_id),
             b.i94mode,
             c.week,
             d.state
             from
             Fact_Immig_visit  a inner join 
             dim_Immig_dtls b on a.visitor_id = b.visitor_id inner join
             dim_Date_table c on a.date = c.date inner join 
             dim_State_dtls d on d.state_code = i94State
             where
             a.i94mode = 1
             group by 
             a.i94mode,c.week,d.state
          """).show(10)

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

In [None]:
#I have created separate Data Dictionary file saved as Data_Dictionary_Capstone_Project.csv

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

In [None]:
#Rationale for the choice of tools and Technologies for the project
 * As the immigration data is huge i have used Spark to process the data.
 * Spark tries to process the data In memory whenever possible before writing the results to HardDisk.This helps us to process the data Faster.
 * As i have processed Immigration data only for the month of April , i avoided using  s3(Data Lake) to store the data. 
 * If i have opted to process all the datafiles for the Year 2016, 1ST i would have stored the data in S3.
 * Processed them through Spark and stored back my analytics data to the S3. 
 * Using S3 also involves a bit of cost, so i tried to avoid the cost. 
 * Used PySpark sql to run the functions 
 * I could have used Python and Pandas Libraries to process the data as well, as i have only dealt with April month data. 

In [None]:
#Propose how often the data should be updated and why.
 * Data needs to be processed daily as batch job.Visitors arrive through immigration everyday.
 * As per the File , records are also added daily to the Immigartion file.
 * This helps us to have uptodate data and find daily patterns if required.


In [None]:
#The data was increased by 100x
there are couple of ways to handle this
 1. Move the data to cloud source (S3,Azure,GPS) as the traditional storage systems are getting Expensive.
 2. Increase the Nodes to distribute the data for better performance.
 3. Enhancing the data pipelines  by partitioning the data to avoid data skewdness.
 4. Filtering the data to include only required observations.


In [None]:
#The data populates a dashboard that must be updated on a daily basis by 7am every day
I would use Apache Airflow to schedule a data Pipeline that would update the data daily by 6:30 am. 

In [None]:
#The database needed to be accessed by 100+ people
1. Move the data to cloud source (S3,Azure,GPS) as they automatically scales to high request rates 
 for ex: In terms of S3 we can read or write performances by parallelizing reads.
        Also Combine Amazon S3 (Storage) and Amazon EC2 (Compute) in the Same AWS Region
        (https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html)