# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project has been created to capture immigrant data arriving at the US entries. The datwarehouse has ETL processing
with Spark and Final data will be stored in parquet form on disk.
This would enable the end user to read data as required by using processing power of Spark as the final data is stored in a Fact and Dimension creating a Star Schema.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import configparser
from datetime import datetime
import os
import re
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import monotonically_increasing_id
from RawDataProcess import RawDataProcess
from DataCleanse import DataCleanse
from DataTransform import DataTransform
from DataLoader import DataLoader
from DataValidation import DataValidation

### Step 1: Scope the Project and Gather Data

#### Scope 
As part of this project, we would look at the immigration data in USA and how Temperature,Race affect the decision for immigrants. Data set used are i94 immigration data coupled with temperature for US cities and demographic data that is available to general public. The output of this project would have Fact and Dimension table that will enable to get the required data in an efficient and confirmed way.
Datawarehouse will reside in parquet files that have been created using python ETL scripts and Spark processing power.


#### Describe and Gather Data 
***U.S. City Demographic Data (demog):*** Extracted from OpenSoft and includes data by city, state, age, population, veteran status and race.

***I94 Immigration Data (sas_data):*** Extracted from the US National Tourism and Trade Office and includes details on incoming immigrants and their ports of entry.

***Airport Code Table (airport):*** Extracted from datahub.io and includes airport codes and corresponding cities.

***Countries (countries):*** Extracted from I94_SAS_Labels_Descriptions.SAS with country to code mapping

***Visas (visa):*** Extracted from I94_SAS_Labels_Descriptions.SAS with visa code to type

***Inmigrant Entry Mode (mode):*** Extracted from I94_SAS_Labels_Descriptions.SAS with id maped to Entry mode 

***Port(Port):*** Extracted from I94_SAS_Labels_Descriptions.SAS with Port mapped to country

#### Source Data location 

In [2]:
paths = {
    "demographics" : "./Raw_Data/us-cities-demographics.csv",
    "airports" :  "./Raw_Data/airport-codes_csv.csv",
    "sas_data" : "./sas_data",
    "us_states" : "./Raw_Data/us_states.csv",
    "cities" : "./Raw_Data/cities.csv",
    "countries" : "./Raw_Data/countries.csv",
    "visa" : "./Raw_Data/visa.csv",
    "ports" : "./Raw_Data/port_raw.csv",
    "mode" : "./Raw_Data/mode.csv",
}

In [3]:
def create_spark_session():
    """Creates SparkSession       
    Creates SparkSession and returns it. If SparkSession is already created it returns
    the currently running SparkSession.    
    Input:
        None
    Returns:
        SparkSession
    """
    spark = SparkSession.builder\
                .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
                .enableHiveSupport()\
                .getOrCreate()
    return spark
spark = create_spark_session()

In [42]:
#creating Clean data for the port data provided in txt file
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94port_valid = {}
with open('Raw_Data/portdata_raw.txt') as f:
     for line in f:
         match = re_obj.search(line)
         i94port_valid[match[1]]=[match[2]]

In [4]:
# Read in the data here

rawdataprocess = RawDataProcess(spark, paths)

dfdemog = rawdataprocess.get_cities_demographics_raw()
dfairport=rawdataprocess.get_airports_raw()
dfsas_data = rawdataprocess.get_inmigration_raw()
dfcountries = rawdataprocess.get_countries_raw()
dfvisa = rawdataprocess.get_visa_raw()
dfmode = rawdataprocess.get_mode_raw()
dfports = rawdataprocess.get_ports_raw()
dfusstates = rawdataprocess.get_usstates_raw()



In [10]:
#### Review Data in Raw form

In [6]:
dfdemog.show(2)

+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|         City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|              Race|Count|
+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|Hispanic or Latino|25924|
|       Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|             White|58723|
+-------------+-------------+----------+---------------+-----------------+-----------

In [6]:
dfairport.show(2)


+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
only showing top 2 rows



In [13]:
dfusstates.show(2)

+-------+----------+
|  State|State_Code|
+-------+----------+
|Alabama|        AL|
| Alaska|        AK|
+-------+----------+
only showing top 2 rows



In [7]:
dfsas_data.show(2)


+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [8]:
dfcountries.show(2)


+----+--------------------+
|code|        country_name|
+----+--------------------+
| 582|MEXICO Air Sea, a...|
| 236|         AFGHANISTAN|
+----+--------------------+
only showing top 2 rows



In [9]:
dfvisa.show(2)


+-------+---------+
|visa_id|visa_type|
+-------+---------+
|      1| Business|
|      2| Pleasure|
+-------+---------+
only showing top 2 rows



In [10]:
dfmode.show(2)


+------+----------+
|cod_id| mode_name|
+------+----------+
|   1.0|       Air|
|   2.0|       Sea|
+------+----------+
only showing top 2 rows



In [11]:
dfports.show(2)
#dsplit=dfports[" airport_name"].str.split(",", n = 1, expand = True)
#dsplit = pd.DataFrame(df[" airport_name"].str.split(',',1).tolist(), columns = ['a','b'])

+----+-------------+
|code| airport_name|
+----+-------------+
| ALC|    ALCAN, AK|
| ANC|ANCHORAGE, AK|
+----+-------------+
only showing top 2 rows



### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
*Clean countries dataset to get the port code and state code for only USA

*Clean demograpihc data for converting string to double for double values and correct column name

*Clean ports dataset to get the port code and state code for only USA

*Clean airports dataset filtering only US airports and discarding data that is not an airport("large_airport", "medium_airport", "small_airport"). Extract iso regions and cast fields as required.

*Clean the immigration dataset.Standardize date. Mark data that is marked as type invalid in coutry code or ports code with a new column to identif why data set is missing

All the data cleansing function has been done with help of a class called DataCleanse.py

In [7]:
#create a tableview for all the data to help with cleaning in next steps.
dfdemog.createOrReplaceTempView("City")
dfairport.createOrReplaceTempView("Airport")
dfsas_data.createOrReplaceTempView("Immigration")
dfcountries.createOrReplaceTempView("Countries")
dfvisa.createOrReplaceTempView("Visa")
dfmode.createOrReplaceTempView("Mode")
dfports.createOrReplaceTempView("Ports")

In [8]:
# Performing cleaning tasks here
from DataCleanse import DataCleanse
datacleansed=DataCleanse(spark)

cl_dfairport=datacleansed.get_airports_cleansed(dfairport)
cl_dfairport.show(3)


+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+----------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|state_code|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+----------+
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|        KS|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|        AK|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     US-AL|     Harvest|    00AL|     null|      00AL|-86.7703018188476...|        AL|
+-----+---------

In [9]:
cl_dfport=datacleansed.get_ports_cleansed(dfports)
cl_dfport.show(3)


+---------+--------------------+---------------+
|port_code|        airport_name|port_state_code|
+---------+--------------------+---------------+
|      ALC|           ALCAN, AK|             AK|
|      ANC|       ANCHORAGE, AK|             AK|
|      BAR|BAKER AAF - BAKER...|             AK|
+---------+--------------------+---------------+
only showing top 3 rows



In [10]:
cl_dfcountries=datacleansed.get_countries_cleansed(dfcountries)
cl_dfcountries.show(5)


+------------+--------------------+--------------+
|country_code|        country_name|country_status|
+------------+--------------------+--------------+
|         582|MEXICO Air Sea, a...|         VALID|
|         236|         AFGHANISTAN|         VALID|
|         101|             ALBANIA|         VALID|
|         316|             ALGERIA|         VALID|
|         102|             ANDORRA|         VALID|
+------------+--------------------+--------------+
only showing top 5 rows



In [11]:
cl_dfmode=datacleansed.get_mode_cleansed(dfmode)
cl_dfmode.show(3)


+-------+---------+
|mode_id|mode_name|
+-------+---------+
|      1|      Air|
|      2|      Sea|
|      3|     Land|
+-------+---------+
only showing top 3 rows



In [12]:

cl_dfairport=datacleansed.get_airports_cleansed(dfairport)
cl_dfairport.show(2)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+----------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|state_code|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+----------+
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|        KS|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|        AK|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+----------+
only showing top

In [13]:
cl_dfimmigration=datacleansed.get_Immigration_cleansed(dfsas_data)
cl_dfimmigration.show(3)

+-------+----+-----+--------------+----------------+-------+-------+----------+---+----------+-------+-------------+--------------+------+-------+---------+--------+----------+-------+
| cic_id|year|month|org_cntry_code|org_country_name|port_id|visa_id|birth_year|age|state_code|mode_id|arrrival_date|departure_date|gender|airline|flight_no|visatype|occupation|counter|
+-------+----+-----+--------------+----------------+-------+-------+----------+---+----------+-------+-------------+--------------+------+-------+---------+--------+----------+-------+
|5748517|2016|    4|           245|             438|   null|      1|      1976| 40|        CA|      1|   2016-04-30|    2016-05-08|     F|     QF|    00011|      B1|      null|    1.0|
|5748518|2016|    4|           245|             438|   null|      1|      1984| 32|        NV|      1|   2016-04-30|    2016-05-17|     F|     VA|    00007|      B1|      null|    1.0|
|5748519|2016|    4|           245|             438|   null|      1|      1

In [14]:
cl_dfdemographic=datacleansed.get_demo_cleansed(dfdemog)
cl_dfdemographic.show(3)

+-------------+-------------+----------+----------+---------------+-----------------+----------------+------------------+------------+--------------+------------------+-----+
|         City|        State|State_code|Median_age|Male_population|Female_population|Total_population|number_of_veterans|foreign_born|avg_hshld_size|              Race|count|
+-------------+-------------+----------+----------+---------------+-----------------+----------------+------------------+------------+--------------+------------------+-----+
|Silver Spring|     Maryland|        MD|      33.8|        40601.0|          41862.0|         82463.0|              1562|       30908|           2.6|Hispanic or Latino|25924|
|       Quincy|Massachusetts|        MA|      41.0|        44129.0|          49500.0|         93629.0|              4147|       32935|          2.39|             White|58723|
|       Hoover|      Alabama|        AL|      38.5|        38040.0|          46799.0|         84839.0|              4819|    

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

##### we have chosen the Star Schema with fact and dimension to generate our warehouse. Details related to the dimension and fact tables are as below

##### Star Schema
Dimension Tables:

* dim_demographics
City,State,State_code,Median_age,Male_population,Female_population,Total_population,number_of_veterans,foreign_born,avg_hshld_size, Race,count ,male_ratio,female_ratio

* dim_airports:
ident, type, name, elevation_ft, continent, iso_country, iso_region, municipality, gps_code, iata_code, local_code, coordinates,state_code

* dim_countries:
country_code,country_name,country_status

* dim_get_visa:
visa_id,visa_type

* dim_get_mode:
mode_id,mode_name

* dim_us_states:
State,State_Code

Fact Table:

* immigration_fact_table
cic_id,year,month,org_cntry_code,org_country_name,port_id,visa_id,birth_year,age,state_code,mode_id,arrrival_date,departure_date,gender,airline,flight_no,visatype,occupation,counter,

#### 3.2 Mapping Out Data Pipelines
* There are two steps:

** Tranform data:

    1. Transform demographics dataset to inlcude ratio for male and female population
    2. Transform inmigration dataset on order to get arrival date in different columns (year, month, day) for partitioning the dataset.
    
** Generate Model (Star Schema):

    1. Create all dimensions in parquet.
    2. Create fact table in parquet particioned by year, month, day of th arrival date.
    3. Insert in fact table only items with dimension keys right. For integrity and consistency.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [15]:
# Performing transformation tasks here
#from DataTransform import DataTransform
datatransform=DataTransform(spark)

tr_dfdemographic=datatransform.get_demo_transform(cl_dfdemographic)
tr_dfdemographic.show(3)

+-------------+-------------+----------+----------+---------------+-----------------+----------------+------------------+------------+--------------+------------------+-----+-------------------+------------------+
|         City|        State|State_code|Median_age|Male_population|Female_population|Total_population|number_of_veterans|foreign_born|avg_hshld_size|              Race|count|         male_ratio|      female_ratio|
+-------------+-------------+----------+----------+---------------+-----------------+----------------+------------------+------------+--------------+------------------+-----+-------------------+------------------+
|Silver Spring|     Maryland|        MD|      33.8|        40601.0|          41862.0|         82463.0|              1562|       30908|           2.6|Hispanic or Latino|25924|0.49235414670822064|0.5076458532917794|
|       Quincy|Massachusetts|        MA|      41.0|        44129.0|          49500.0|         93629.0|              4147|       32935|          

In [16]:
paths_write = {
    "demographics" : "./model/demographics.parquet",
    "airports" :  "./model/airports.parquet",
    "ports" : "./model/ports.parquet",
    "countries" : "./model/countries.parquet",
    "visa" : "./model/visa.parquet",
    "mode" : "./model/mode.parquet",
    "facts" : "./model/facts_inmigration.parquet"
}

In [None]:
from DataLoaderc import DataLoader
dataloader = DataLoader(spark, paths_write)
dataloader.FinalWriter(cl_dfimmigration, cl_dfdemographic, cl_dfairport, dfports, dfcountries, dfvisa, cl_dfmode)

+--------+
|count(1)|
+--------+
|       0|
+--------+



#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [18]:
# Perform quality checks here
from DataValidation import DataValidation
datavalidation=DataValidation(spark, paths_write)

facts=datavalidation.get_facts()
dim_demographics, dim_airports, dim_ports, dim_countries, dim_get_visa, dim_get_mode = datavalidation.get_dimensions()

In [36]:
# Write code here
facts.createOrReplaceTempView("immigration")
dim_airports.createOrReplaceTempView("airport")
dim_demographics.createOrReplaceTempView("demo")
dim_ports.createOrReplaceTempView("ports")
dim_countries.createOrReplaceTempView("countries")
dim_get_mode.createOrReplaceTempView("mode")
dim_get_visa.createOrReplaceTempView("visa")
sss=spark.sql("""
select * from  immigration f join demo dd
on f.state_code = dd.state_code
join visa on f.visa_id = visa.visa_id 
join mode on f.mode_id = mode.mode_id
""")
sss.show(3)

+-------+--------------+----------------+-------+-------+----------+---+----------+-------+-------------+--------------+------+-------+---------+--------+----------+-------+----+-----+-----------+----------+----------+----------+---------------+-----------------+----------------+------------------+------------+--------------+--------------------+------+-------+---------+-------+---------+
| cic_id|org_cntry_code|org_country_name|port_id|visa_id|birth_year|age|state_code|mode_id|arrrival_date|departure_date|gender|airline|flight_no|visatype|occupation|counter|year|month|       City|     State|State_code|Median_age|Male_population|Female_population|Total_population|number_of_veterans|foreign_born|avg_hshld_size|                Race| count|visa_id|visa_type|mode_id|mode_name|
+-------+--------------+----------------+-------+-------+----------+---+----------+-------+-------------+--------------+------+-------+---------+--------+----------+-------+----+-----+-----------+----------+---------

In [19]:
#Validate all dimensions have data
datavalidation.exists_rows(dim_demographics)

True

In [32]:
datavalidation.exists_rows(dim_airports)


True

In [35]:
datavalidation.exists_rows(dim_ports)


True

In [36]:
datavalidation.exists_rows(dim_countries)


True

In [37]:
datavalidation.exists_rows(dim_get_visa)


True

In [38]:
datavalidation.exists_rows(dim_get_mode)

True

In [39]:
datavalidation.exists_rows(facts)

True

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

file created - DataDictionary.md


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.
 
 
### Rationale
* For the project the scripting language of choice is python as there are are number of libraries available to perform various functions. the data processing is done using Spark as spark can process a huge volume of data and has huge number of library available to support.
* The final datawarehouse has been persisted  in parquet files as they can scale upto petabytes of data without any major issues

### Dataupdation
* propose that the data be updated everyday as we have partioned by month and year and should be easily scalable. and can be traversed without issues. we can use APache airflow to perform this activity.

### Different solution
* if the data was increased by 100x Spark should be still ale to handle it.
* Use Apache airflow  as it has effective SLA, email notificaiton and monitoring capability to help for daily SLA based requirement
* Use Hive and spark sql template views for more than 100 users