# Project Title
### Data Engineering Capstone Project

#### Project Summary
 The objective of this project is to create ETL pipeline inorder to help provide insightful stats on the available demographics using top 10 aggregations of the most important dimensions available such as airline, country of origin, destination, age groups and so on. We could also provide insights on the 
 1. Period were most people travel to US and the most popular state.
 2. Most popular country of Origin 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

**Importing all libraries**

In [1]:
import pandas as pd
import configparser
import os
import datetime as dt
from datetime import timedelta, datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark.sql import SQLContext
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear,to_date
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import *

import utils

**Creating Spark session**

In [2]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
To create a design & develop a ETL which will fetch data from different datasets and setup a data warehouse with fact and dimension tables.

To acheive this, below steps will be followed
1. Load the data into dataframes using Spark.
2. Explore and clean data for datasets I94 immigration(month of April 2016), airport-codes and us-cities-demograhic. 
3. Create fact and dimension tables.
4. Fetch code and country data available in I94_SAS_Labels_Descriptions and load into a dimension table.

This project is developed on this notebook with a Utility file which comprises of common function used across dataset/tables.


### Describe and Gather Data

1. **I94 Immigration Data** | This data comes from the US National Tourism and Trade Office. This Dataset contains details of all international visitor arrival in the year 2016. Dataset contains type of visa, mode of transportation, age groups, states visited,the top ports of entry etc.
2. **I94_SAS_Labels_Descriptions** | This file contains the code for each country and code for states in the US.

3. **U.S. City Demographic Data** | This dataset contains the demographic details of the US cities.

4. **Airport-codes** | This dataset contains details about various airport code across the world.

### Step 2: Explore and Assess the Data

***Load immigration data***

To view immigration data that is available. Below we will be loading data of immigration in the US for the just the month of April 2016.

**Reading immigration data for month of April 2016**

In [3]:
df_immig= spark.read.format('com.github.saurfang.sas.spark')\
                .load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [4]:
df_immig.count()

3096313

In [5]:
df_immig.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [8]:
df_immig =df_immig.dropDuplicates()
df_immig.count()

3096313

In [9]:
df_immig.summary("count").toPandas()

Unnamed: 0,summary,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,count,3096313,3096313,3096313,3096313,3096313,3096313,3096313,3096074,2943721,...,392,2957884,3095511,3095836,2682044,113708,3012686,3096313,3076764,3096313


***Load airport codes***

To view the dataset for airport codes

In [3]:
df_airportcodes=spark.read.csv('airport-codes_csv.csv',header=True, inferSchema=True)

In [4]:
df_airportcodes.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [12]:
df_airportcodes.summary("count").toPandas()

Unnamed: 0,summary,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,count,55075,55075,55075,48069,55075,55075,55075,49399,41030,9189,28686,55075


In [13]:
df_airportcodes.select("iso_country").distinct().show(5)

+-----------+
|iso_country|
+-----------+
|         DZ|
|         LT|
|         MM|
|         CI|
|         TC|
+-----------+
only showing top 5 rows



***Load US city demographics dataset***

To view data on US demographics

In [4]:
df_usdemo=spark.read.csv('us-cities-demographics.csv',sep=';',header=True, inferSchema=True)

In [5]:
df_usdemo.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [16]:
df_usdemo.summary("count").toPandas()

Unnamed: 0,summary,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,count,2891,2891,2891,2888,2888,2891,2878,2878,2875,2891,2891,2891


***Load Country codes***

To read codes for each country

In [17]:
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    read_lines = f.readlines()
code_country = {}
for countries in read_lines[10:298]:
    values = countries.split('=')
    code, country = values[0].strip(), values[1].strip().strip("'")
    code_country[code] = country

In [18]:
countryColumns = ["Code","Country"]
df_code_country_dim=spark.createDataFrame(data=list(code_country.items()),schema=countryColumns)
df_code_country_dim.limit(5).toPandas()

Unnamed: 0,Code,Country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


## Explore data

Explore the datasets for columns with null value

### *Explore **Immigration** data*

In [19]:
utils.get_null_count(df_immig).T

Unnamed: 0,0
cicid,0
i94yr,0
i94mon,0
i94cit,0
i94res,0
i94port,0
arrdate,0
i94mode,239
i94addr,152592
depdate,142457


#### *Cleaning Steps - **Immigration** data*

In [20]:
# Dropping columns with more than 50% Nulls
drop_cols=['i94yr','i94mon','dtadfile','visapost','occup',
           'count','entdepd','entdepu','entdepa','matflag','insnum','admnum']
df_immig=df_immig.drop(*drop_cols)

In [21]:
df_immig.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [22]:
df_immig=df_immig.dropDuplicates(['cicid'])
df_immig.count()

3096313

In [23]:
df_immig=df_immig.dropna(how='all',subset=['cicid'])
df_immig.count()

3096313

> From the above we can determine that there is no duplicates and null values for "cicid" column in immigration dataset.

## *Explore **Airport Codes** data*

In [24]:
df_airportcodes.count()

55075

In [25]:
utils.get_null_count(df_airportcodes).T

Unnamed: 0,0
ident,0
type,0
name,0
elevation_ft,7006
continent,0
iso_country,0
iso_region,0
municipality,5676
gps_code,14045
iata_code,45886


#### *Cleaning Steps - **Airport codes** data*

In [26]:
#dropping columns with missing data
code_cols=['ocal_code','iata_code','gps_code','municipality','elevation_ft','type',
          'type','continent','local_code','coordinates']
df_airportcodes=df_airportcodes.drop(*code_cols)
df_airportcodes.printSchema()

root
 |-- ident: string (nullable = true)
 |-- name: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)



In [27]:
df_airportcodes=df_airportcodes.dropna(how='all',subset=['iso_region','iso_country'])
df_airportcodes.count()

55075

> From the above we can determine that there is no null values for "iso_region" and "iso_country" column in airport codes dataset.

## *Explore **US Demographics** data*

In [28]:
df_usdemo.count()

2891

In [29]:
utils.get_null_count(df_usdemo).T

Unnamed: 0,0
City,0
State,0
Median Age,0
Male Population,3
Female Population,3
Total Population,0
Number of Veterans,13
Foreign-born,13
Average Household Size,16
State Code,0


#### *Cleaning Steps - **US Demographics** data*

In [30]:
df_usdemo.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [31]:
df_usdemo=df_usdemo.dropna(how='all',subset=['State Code'])
df_usdemo.count()

2891

> From the above we can determine that there is no null values for "State Code" column in US Demographics dataset.

## *Explore **country & code** data*

In [32]:
df_code_country_dim.count()

288

In [33]:
df_code_country_dim.dropDuplicates().count()

288

In [34]:
utils.get_null_count(df_code_country_dim).T

Unnamed: 0,0
Code,0
Country,0


> No cleaning required for this data frame

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
We are using Star schema model for this project. Start schema makes it easy for querying data and also easy for business reporting.
         
         * Star Schema Data model is added in this project folder "Data Model_Capstone Project"


#### 3.2 Mapping Out Data Pipelines
1. Load Immigration dataset from S3 buckets. We have chosen datset for april 2016 in this project and can be scaled easily to data for the whole year.
2. All other datasets are loaded from the project folder but can be loaded from s3 as a source.
3. Clean the I94 Immigration data and create immigration fact table
4. Create Travel dimension table which contains details of the immigrants travel. Data for travel table is extracted from Immigration dataset.
5. Extract country dimension table
6. Clean and create US demographics dimension table
7. Extract data for country code and the corresponding country

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

***Create the immigration fact table***

In [3]:
destination_path="output_tables/"

In [51]:
def immigration_fact_table(df,dest_data):
    
   
    date_format = "%Y-%m-%d"
    get_datetime = udf(lambda x: x if x is None else (timedelta(days=int(float(x))) + datetime(1960, 1, 1)).strftime(date_format))
                       
    immig_df = df.withColumnRenamed('cicid','id') \
                 .withColumnRenamed('i94res','immig_residence_country_code') \
                 .withColumnRenamed('i94cit','immig_birth_country_code') \
                 .withColumnRenamed('i94port','port_of_entry') \
                 .withColumnRenamed('i94mode','mode_of_transportation') \
                 .withColumnRenamed('i94addr','arrival_state_code') \
                 .withColumnRenamed('i94bir','age') \
                 .withColumnRenamed('i94visa','visa_code') \
                 .withColumnRenamed('biryear','birth_year') \
                 .withColumnRenamed('gender','gender')
    
    immig_df=immig_df.withColumn("arrival_date",get_datetime(df.arrdate)) \
                      .withColumn("departure_date",get_datetime(df.depdate))
              
    immig_df = immig_df.withColumn("arrival_date",immig_df["arrival_date"].cast(DateType())) \
                       .withColumn("departure_date",immig_df["departure_date"].cast(DateType()))
    immig_df = immig_df.withColumn("date_until",to_date(df['dtaddto'], "MMddyyyy"))
        
 
    # writing dimension to parquet file
    immig_df.write.mode("overwrite").parquet(path=dest_data + 'immigration_fact_table')
    
    return immig_df

In [52]:
immigration_fact_table(df_immig, destination_path)

DataFrame[id: double, immig_birth_country_code: double, immig_residence_country_code: double, port_of_entry: string, arrdate: double, mode_of_transportation: double, arrival_state_code: string, depdate: double, age: double, visa_code: double, birth_year: double, dtaddto: string, gender: string, airline: string, fltno: string, visatype: string, arrival_date: date, departure_date: date, date_until: date]

***Create airport code dimension table***

In [43]:
def airport_code_dim_table(df,dest_data):
    airport_df = df.filter(col("iso_country") == "US")
    airport_df = airport_df.withColumnRenamed('iso_country','country')\
                           .withColumnRenamed('iso_region','state_code')
    
    airport_df.write.parquet(dest_data + "airport_dim_table", mode="overwrite")
    return airport_df

In [44]:
airport_code_dim_table(df_airportcodes,destination_path)

DataFrame[ident: string, name: string, country: string, state_code: string]

***Create US Demographic dimension table***

In [45]:
def us_demo_dim_table(df,dest_data):
    us_demo_dim_df = df.select(col("City"),col("State"),col("State Code"),col("Total Population"))
    
    us_demo_dim_df =us_demo_dim_df.withColumnRenamed('State Code','state_code') \
                                  .withColumnRenamed('Total Population','total_population')
    
    us_demo_dim_df.write.parquet(dest_data + "us_demographic_dim_table", mode="overwrite")
    
    return us_demo_dim_df

In [46]:
us_demo_dim_table(df_usdemo,destination_path)

DataFrame[City: string, State: string, state_code: string, total_population: int]

***Create Travel dimension table***

In [47]:
def travel_dim_table(df,dest_data):
    travel_dim_df = df.select(col("cicid"),col("visatype"),col("airline"),col("fltno"))
    
    travel_dim_df = travel_dim_df.withColumnRenamed('cicid','id')\
                                 .withColumnRenamed('visatype','visa_type')\
                                 .withColumnRenamed('airline','airline_code')\
                                 .withColumnRenamed('fltno', 'flight_number')
    
    travel_dim_df.write.parquet(dest_data + "travel_dim_table", mode="overwrite")
    
    return travel_dim_df                                    

In [48]:
travel_dim_table(df_immig,destination_path)

DataFrame[id: double, visa_type: string, airline_code: string, flight_number: string]

***Create Country code dimension table***

In [49]:
def country_dim_table(df,dest_data):
    country_dim_df = df
    
    country_dim_df.write.parquet(dest_data + "country_code_dim_table", mode="overwrite")
    
    return country_dim_df

In [50]:
country_dim_table(df_code_country_dim,destination_path)

DataFrame[Code: string, Country: string]

#### 4.2 Data Quality Checks
1. The data quality checks are performed to ensure the pipeline ran as expected and there are no nulls

In [5]:
# Perform quality checks here
utils.data_quality_check(destination_path,spark)

us_demographic_dim_table
Data quality check passed: Table us_demographic_dim_table has a total of 2891 rows.
country_code_dim_table
Data quality check passed: Table country_code_dim_table has a total of 288 rows.
travel_dim_table
Data quality check passed: Table travel_dim_table has a total of 3096313 rows.
immigration_fact_table
Data quality check passed: Table immigration_fact_table has a total of 3096313 rows.
airport_dim_table
Data quality check passed: Table airport_dim_table has a total of 22757 rows.


2. Data quality check to ensure there are no duplicate IDs in immigration table

In [7]:
col_outputfolder = {
    'id':"immigration_fact_table"   
}

In [6]:
utils.data_quality_duplicate_check(destination_path,col_outputfolder,spark)

Data quality check passed: Table immigration_fact_table has no duplicates in column id


#### 4.3 Data dictionary 

**Immigration Fact Table**

* id - Unique record ID
* immig_birth_country_code - 3 digit code for immigrant country of birth
* immig_residence_country_code - 3 digit code for immigrant country of residence
* port_of_entry - Port of admission
* arrival_date - Arrival Date in the USA
* mode_of_transportation - Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
* arrival_state_code - USA State of arrival
* departure_date - Departure Date from the USA
* age - Age of Respondent in Years
* visa_code - Visa codes collapsed into three categories
* birth_year - 4 digit year of birth
* date_until - Date allowed to stay until
* gender - gender

**travel_dim**

* airline - Airline used to arrive in U.S.
* fltno - Flight number of Airline used to arrive in U.S.
* visatype - Class of admission legally admitting the non-immigrant to temporarily stay in U.S.
* id - Unique record ID

**airport_dim**

* name - Name of airport.
* iso_region - State code of state where the airport is located based on the country
* iso_country - Country code of country where the airport is located

**us_demographics_dim**

* City - City Name
* State - US State where city is located
* state_code - US state code
* total_population - total population

**Country_code_dim**

* code - Country code
* country - Name of country

#### Step 5: Complete Project Write Up
* the choice of tools and technologies for the project
  * We have used start schema as the data model. Start schema makes it easy for querying data and also easy for business reporting
  * Why spark? Because of:
    * it's capability to handle large amounts of data and support various file formats.
    * Spark is also easy to use

* Propose how often the data should be updated and why.
   * For the insights we want to provide, we would update data on a monthly basis. So we can keep track of travel activiies through the year.

* Write a description of how you would approach the problem differently under the following scenarios:
   * The data was increased by 100x.
        * Spark can handle large amount of data but has the data grows we could consider using AWS EMR.
   * The data populates a dashboard that must be updated on a daily basis by 7am every day.
        * We could use Apache Airflow to schedule and run data pipelines on a daily basis.
   * The database needed to be accessed by 100+ people.
        * Amazon Redshift would be a good choice to handle large number of user. Redshift can easily handle 500 concurrent connections and can  handle 50 concurrent queries.

*Below code illustrates the usage of the tables to gather smiple insights on data for the country of Origin from where most immigrants travel from to the United States.*

In [3]:
df_immg_read = spark.read.parquet("output_tables/immigration_fact_table")

In [5]:
df_immg_read.createOrReplaceTempView("immigration_table")

In [8]:
df_country_read=spark.read.parquet("output_tables/country_code_dim_table")

In [16]:
df_country_read.createOrReplaceTempView("Country_code_table")

In [22]:
q="SELECT COUNT(a.id) as No_of_immigrants, b.Country as country FROM immigration_table AS a \
            JOIN Country_code_table as b on a.immig_residence_country_code=b.Code \
            GROUP BY(country) ORDER BY (No_of_immigrants) desc"

In [23]:
sqlDF = spark.sql(q)

In [24]:
sqlDF.show()

+----------------+--------------+
|No_of_immigrants|       country|
+----------------+--------------+
|          368421|UNITED KINGDOM|
|          249167|         JAPAN|
|          185609|    CHINA, PRC|
|          185339|        FRANCE|
|          156613|       GERMANY|
|          136312|   SOUTH KOREA|
|          134907|        BRAZIL|
|          112407|     AUSTRALIA|
|          107193|         INDIA|
|           75128|    ARGENTINA |
|           74619|   NETHERLANDS|
|           65782|         ITALY|
|           54330|      COLOMBIA|
|           49138|         SPAIN|
|           47969|   SWITZERLAND|
|           45063|        SWEDEN|
|           43435|        ISRAEL|
|           42495|       ECUADOR|
|           40979|     VENEZUELA|
|           29921|        TAIWAN|
+----------------+--------------+
only showing top 20 rows

