# Data Engineer Nanodegree Capstone Project

#### Project Summary
In this project we studying us immegrants data by designing OLAP data model able to build ETL pipeline with defined star schema.
Considering: 
- the Star Schema for the data model, which is optimized for OLAP queries.
- a large amount of data, the prospects for analysis and extracting insights from the data are endless.

We will touch on Exploratory Data Analysis on the collected data.


The project follows the follow steps:
* 1: determine project and collect data 
* 2: Explore and check the Data
* 3: Define the Data Model
* 4: Run ETL pipeline to apply data modeling
* 5: Complete Future works and other related works

In [1]:
# Do all imports and installs here
import pandas as pd
import pyspark
import seaborn as sns
import matplotlib.pyplot as plt

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,lit, upper

In [3]:
spark=SparkSession.builder.appName('DEND_capstone_project').getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
The scope here is looking at the immigration data for the united states. More specifically, we're interested in looking at the following phenomena:

- the effects of temperature on the volume of travellers,
- the seasonality of travel
- the connection between the volume of travel and the demographics of various cities

#### Describe and Gather Data 
To accomplish this study, we will be using the following datasets:

- I94 Immigration Data: This data comes from the US National Tourism and Trade Office and includes the contents of the i94 form on entry to the united states. A data dictionary is included in the workspace.

- World Temperature Data: This dataset comes from Kaggle and includes the temperatures of various cities in the world fomr 1743 to 2013.

- U.S. City Demographic Data: This data comes from OpenSoft. It contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000 and comes from the US Census Bureau's 2015 American Community Survey.


### Step 2: Explore and Assess the Data
#### Explore the Data 
- Use pandas and spark for exploratory data analysis to get an overview on these data sets
- Split data sets to dimensional tables and change column names for better understanding
- Utilize PySpark on one of the SAS data sets to test ETL data pipeline logic

In [4]:
#reading immigration data
df_immi = spark.read.format("csv").load("data/immigration_data_sample.csv", header=True)
df_immi = df_immi.drop("_c0")
df_immi.toPandas().head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,...,,M,1955.0,7202016,F,,JL,56582674633.0,00782,WT
1,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,...,,M,1990.0,10222016,M,,*GA,94361995930.0,XBLNG,B2
2,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,...,,M,1940.0,7052016,M,,LH,55780468433.0,00464,WT
3,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,...,,M,1991.0,10272016,M,,QR,94789696030.0,00739,B2
4,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,...,,M,1997.0,7042016,F,,,42322572633.0,LAND,WT


In [5]:
#focus on target columns and reanme them with relevant names
fact_immigration=df_immi.select('cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode', 'i94visa')
fact_immigration=fact_immigration.withColumnRenamed(
    "cicid","cic_id").withColumnRenamed(
    "i94yr","year").withColumnRenamed(
    "i94mon","month").withColumnRenamed(
    "i94port","city_code").withColumnRenamed(
    "i94addr","state_code").withColumnRenamed(
    "arrdate","arrive_date").withColumnRenamed(
    "depdate","departure_date").withColumnRenamed(
    "i94mode","mode").withColumnRenamed(
    "i94visa","visa")
fact_immigration=fact_immigration.withColumn("country",lit("United States"))
fact_immigration.toPandas().head()

Unnamed: 0,cic_id,year,month,city_code,state_code,arrive_date,departure_date,mode,visa,country
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0,United States
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0,United States
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0,United States
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,2.0,United States
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,2.0,United States


In [6]:
#focus on target columns and reanme them with relevant names
dim_immi_personal=df_immi.select('cicid', 'i94cit', 'i94res', 'biryear', 'gender', 'insnum')
dim_immi_personal=dim_immi_personal.withColumnRenamed(
    "cicid","cic_id").withColumnRenamed(
    "i94cit","citizen_country").withColumnRenamed(
    "i94res","residence_country").withColumnRenamed(
    "biryear","birth_year").withColumnRenamed(
    "gender","gender").withColumnRenamed(
    "insnum","ins_num")
dim_immi_personal.toPandas().head()

Unnamed: 0,cic_id,citizen_country,residence_country,birth_year,gender,ins_num
0,4084316.0,209.0,209.0,1955.0,F,
1,4422636.0,582.0,582.0,1990.0,M,
2,1195600.0,148.0,112.0,1940.0,M,
3,5291768.0,297.0,297.0,1991.0,M,
4,985523.0,111.0,111.0,1997.0,F,


In [7]:
#focus on target columns and reanme them with relevant names
dim_immi_airline=df_immi.select('cicid', 'airline', 'admnum', 'fltno', 'visatype')
dim_immi_airline=dim_immi_airline.withColumnRenamed(
    "cicid","cic_id").withColumnRenamed(
    "airline","airline").withColumnRenamed(
    "admnum","admin_num").withColumnRenamed(
    "fltno","flight_number").withColumnRenamed(
    "visatype","visa_type")
dim_immi_airline.toPandas().head()

Unnamed: 0,cic_id,airline,admin_num,flight_number,visa_type
0,4084316.0,JL,56582674633.0,00782,WT
1,4422636.0,*GA,94361995930.0,XBLNG,B2
2,1195600.0,LH,55780468433.0,00464,WT
3,5291768.0,QR,94789696030.0,00739,B2
4,985523.0,,42322572633.0,LAND,WT


### Explore demography dataset

In [8]:
#read cities demographics data
df_demog = spark.read.format("csv").load("data/us-cities-demographics.csv", header=True, delimiter=';')
df_demog.toPandas().head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [9]:
#focus on target columns and reanme them with relevant names
dim_city_population=df_demog.select('City', 'State', 'Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', 'Race')
dim_city_population=dim_city_population.withColumnRenamed(
    "City","city").withColumnRenamed(
    "State","state").withColumnRenamed(
    "Male Population","male_pop").withColumnRenamed(
    "Female Population","female_pop").withColumnRenamed(
    "Number of Veterans","num_vetarans").withColumnRenamed(
    "Foreign-born","foreign_born").withColumnRenamed(
    "Race","race")
dim_city_population.toPandas().head()

Unnamed: 0,city,state,male_pop,female_pop,num_vetarans,foreign_born,race
0,Silver Spring,Maryland,40601,41862,1562,30908,Hispanic or Latino
1,Quincy,Massachusetts,44129,49500,4147,32935,White
2,Hoover,Alabama,38040,46799,4819,8229,Asian
3,Rancho Cucamonga,California,88127,87105,5821,33878,Black or African-American
4,Newark,New Jersey,138040,143873,5829,86253,White


In [10]:
#focus on target columns and reanme them with relevant names
dim_city_statistics=df_demog.select('City', 'State', 'Median Age', 'Average Household Size')
dim_city_statistics=dim_city_statistics.withColumnRenamed(
    "City","city").withColumnRenamed(
    "State","state").withColumnRenamed(
    "Median Age","median_age").withColumnRenamed(
    "Average Household Size","avg_household_size")
dim_city_statistics.toPandas().head()

Unnamed: 0,city,state,median_age,avg_household_size
0,Silver Spring,Maryland,33.8,2.6
1,Quincy,Massachusetts,41.0,2.39
2,Hoover,Alabama,38.5,2.58
3,Rancho Cucamonga,California,34.5,3.18
4,Newark,New Jersey,34.6,2.73


### Explore tempratures dataset

In [11]:
#read tempreture data by cities
df_temp = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')
df_temp.head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [12]:
#focus on target columns and reanme them with relevant names plus use tempratures data of united states
df_temp_usa = df_temp[df_temp['Country'] == 'United States']
df_temp_usa = df_temp_usa[['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country']]
df_temp_usa.rename(columns={"AverageTemperature":"avg_temp",
                           "AverageTemperatureUncertainty":"avg_temp_uncertnty",
                           "City":"city",
                           "Country":"country"}, inplace=True)
df_temp_usa.head(5)

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country
47555,1820-01-01,2.101,3.217,Abilene,United States
47556,1820-02-01,6.926,2.853,Abilene,United States
47557,1820-03-01,10.767,2.395,Abilene,United States
47558,1820-04-01,17.989,2.202,Abilene,United States
47559,1820-05-01,21.809,2.036,Abilene,United States


In [13]:
# get datetime features and drive year and month data from them
df_temp_usa['dt'] = pd.to_datetime(df_temp_usa['dt'])
df_temp_usa['year'] = df_temp_usa['dt'].apply(lambda t: t.year)
df_temp_usa['month'] = df_temp_usa['dt'].apply(lambda t: t.month)
df_temp_usa.head()

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country,year,month
47555,1820-01-01,2.101,3.217,Abilene,United States,1820,1
47556,1820-02-01,6.926,2.853,Abilene,United States,1820,2
47557,1820-03-01,10.767,2.395,Abilene,United States,1820,3
47558,1820-04-01,17.989,2.202,Abilene,United States,1820,4
47559,1820-05-01,21.809,2.036,Abilene,United States,1820,5


In [14]:
# filter usa temp data in 2016
df_temp_usa_2016 = df_temp_usa[df_temp_usa['year'] == 2016]
df_temp_usa_2016.head(5)

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country,year,month


### Process data

In [15]:
#run once then read from written data
#sas_df=pd.read_sas('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
#sas_df.to_parquet("data/sas_data", index=None)

In [16]:
#read parquet data
sas_df=spark.read.parquet("data/sas_data")

### Cleaning Steps

- Parse I94_SAS_Labels_Descriptions.SAS file to get auxiliary dimension table - country_code, city_code, state_code
- Tranform city, state in demography data to upper case to match city_code and state_code table

#### 2. Parse description file to get auxiliary dimension table - country_code, city_code, state_code

In [17]:
#read labels descriptions line by line
with open("data/I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()
    
# extract contry code
country_code = {}
for countries in contents[10:298]:
    vals = countries.split('=')
    code, country = vals[0].strip(), vals[1].strip().strip("'")
    country_code[code] = country
country_code_df = pd.DataFrame(list(country_code.items()), columns=['code', 'country'])
country_code_df.head(5)

Unnamed: 0,code,country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


In [18]:
#extract city code
city_code = {}
for cities in contents[303:962]:
    vals = cities.split('=')
    code, city = vals[0].strip("\t").strip().strip("'"), vals[1].strip('\t').strip().strip("''") #process description line to extract code and city sense 
    city_code[code] = city

city_code_df = pd.DataFrame(list(city_code.items()), columns=['code', 'city'])
city_code_df.head(5)

Unnamed: 0,code,city
0,ANC,"ANCHORAGE, AK"
1,BAR,"BAKER AAF - BAKER ISLAND, AK"
2,DAC,"DALTONS CACHE, AK"
3,PIZ,"DEW STATION PT LAY DEW, AK"
4,DTH,"DUTCH HARBOR, AK"


In [19]:
#extract state code
state_code = {}
for states in contents[982:1036]:
    vals = states.split('=')
    code, state = vals[0].strip('\t').strip("'"), vals[1].strip().strip("'") #process description line to extract code and state sense they have a pattern to follow
    state_code[code] = state
    
state_code_df = pd.DataFrame(list(state_code.items()), columns=['code', 'state'])
state_code_df.head(5)

Unnamed: 0,code,state
0,AK,ALASKA
1,AZ,ARIZONA
2,AR,ARKANSAS
3,CA,CALIFORNIA
4,CO,COLORADO


#### 3. Tranform city, state in dimension talbe to upper case to match city_code and state_code table

In [20]:
#fill city and state columns with upper case beside the median age and avg house hold size
dim_city_statistics=dim_city_statistics.withColumn('city', upper(dim_city_statistics.city)).withColumn('state', upper(dim_city_statistics.state))
dim_city_statistics.toPandas().head(5)

Unnamed: 0,city,state,median_age,avg_household_size
0,SILVER SPRING,MARYLAND,33.8,2.6
1,QUINCY,MASSACHUSETTS,41.0,2.39
2,HOOVER,ALABAMA,38.5,2.58
3,RANCHO CUCAMONGA,CALIFORNIA,34.5,3.18
4,NEWARK,NEW JERSEY,34.6,2.73


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Since the purpose of this data warehouse is for OLAP and BI app usage, we will model these data sets with star schema data modeling.

- Star Schema
![data model](assets/data_model.png)

#### 3.2 Mapping Out Data Pipelines
- Assume all data sets are stored in S3 buckets as below
    - ```[Source_S3_Bucket]/immigration/18-83510-I94-Data-2016/*.sas7bdat```
    - ```[Source_S3_Bucket]/I94_SAS_Labels_Descriptions.SAS```
    - ```[Source_S3_Bucket]/temperature/GlobalLandTemperaturesByCity.csv```
    - ```[Source_S3_Bucket]/demography/us-cities-demographics.csv```
- Follow by Step 2 – Cleaning step to clean up data sets
- Transform immigration data to 1 fact table and 2 dimension tables, fact table will be partitioned by state
- Parsing label description file to get auxiliary tables
- Transform temperature data to dimension table
- Split demography data to 2 dimension tables
- Store these tables back to target S3 bucket

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Data processing and data model was created by Spark in previous parts of this notebook.

#### 4.2 Data Quality Checks

Data quality checks includes

1. Data schema of every dimensional table matches data model
2. No empty table after running ETL data pipeline

In [21]:
import os
import configparser
from pathlib import Path
from pyspark.sql import SparkSession

#### AWS configurations

In [22]:
#set AWS configurations parsed from .cfg files
config = configparser.ConfigParser()
config.read('capstone.cfg', encoding='utf-8-sig')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
SOURCE_S3_BUCKET = config['S3']['SOURCE_S3_BUCKET']
DEST_S3_BUCKET = config['S3']['DEST_S3_BUCKET']

In [None]:
#set pyspark sessions
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\.enableHiveSupport().getOrCreate()

### 1. tables schema matches defined data models
- Expected quality check results :Tables after modeling should include at least these columns plus some foriegn keys

In [26]:
# check the expected data to be in data model to be ready for comparisons with quality check result
print('- city_code =>', list(city_code_df.columns))
print('- dim_city_statistics =>', list(dim_city_statistics.columns))
print('- fact_immigration =>', list(fact_immigration.columns))
print('- dim_city_population =>', list(dim_city_population.columns))
print('- country_code =>', list(country_code_df.columns))
print('- state_code =>', list(state_code_df.columns))
print('- dim_immi_airline =>', list(dim_immi_airline.columns))
print('- df_temp_usa_2016 =>', list(df_temp_usa_2016.columns))
print('- dim_immi_personal =>', list(dim_immi_personal.columns))

- city_code => ['code', 'city']
- dim_city_statistics => ['city', 'state', 'median_age', 'avg_household_size']
- fact_immigration => ['cic_id', 'year', 'month', 'city_code', 'state_code', 'arrive_date', 'departure_date', 'mode', 'visa', 'country']
- dim_city_population => ['city', 'state', 'male_pop', 'female_pop', 'num_vetarans', 'foreign_born', 'race']
- country_code => ['code', 'country']
- state_code => ['code', 'state']
- dim_immi_airline => ['cic_id', 'airline', 'admin_num', 'flight_number', 'visa_type']
- df_temp_usa_2016 => ['dt', 'avg_temp', 'avg_temp_uncertnty', 'city', 'country', 'year', 'month']
- dim_immi_personal => ['cic_id', 'citizen_country', 'residence_country', 'birth_year', 'gender', 'ins_num']


In [4]:
#get s3 bucket
s3_bucket = Path(SOURCE_S3_BUCKET)

In [5]:
#apply data check for loaded model
for file_dir in s3_bucket.iterdir():
    if file_dir.is_dir():
        path = str(file_dir)
        df = spark.read.parquet(path)
        print("Table: " + path.split('/')[-1])
        schema = df.printSchema()

Table: city_code
root
 |-- code: string (nullable = true)
 |-- city: string (nullable = true)

Table: dim_demog_statistics
root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- avg_household_size: string (nullable = true)
 |-- demog_stat_id: long (nullable = true)

Table: fact_immigration
root
 |-- cic_id: double (nullable = true)
 |-- year: double (nullable = true)
 |-- month: double (nullable = true)
 |-- city_code: string (nullable = true)
 |-- arrive_date: date (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- mode: double (nullable = true)
 |-- visa: double (nullable = true)
 |-- immigration_id: long (nullable = true)
 |-- country: string (nullable = true)
 |-- state_code: string (nullable = true)

Table: dim_demog_population
root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- male_population: string (nullable = true)
 |-- female_population: string (nullable = tru

We can find the check is valid sense all loaded data holds the same columns in loaded models

### 2. null values after running ETL data pipeline

In [6]:
# check null values and count records in every table
for file_dir in s3_bucket.iterdir():
    if file_dir.is_dir():
        path = str(file_dir)
        df = spark.read.parquet(path)
        record_num = df.count()
        if record_num <= 0:
            raise ValueError("This table is empty!")
        else:
            print("Table: " + path.split('/')[-1] + f" is not empty: total {record_num} records.")

Table: city_code is not empty: total 659 records.
Table: dim_demog_statistics is not empty: total 596 records.
Table: fact_immigration is not empty: total 3096313 records.
Table: dim_demog_population is not empty: total 2891 records.
Table: country_code is not empty: total 288 records.
Table: state_code is not empty: total 54 records.
Table: dim_immi_airline is not empty: total 3096313 records.
Table: dim_temperature is not empty: total 687004 records.
Table: dim_immi_personal is not empty: total 3096313 records.


#### 4.3 Data dictionary 
![](assets/data_dictionary/9.png)
![](assets/data_dictionary/8.png)
![](assets/data_dictionary/7.png)
![](assets/data_dictionary/6.png)
![](assets/data_dictionary/5.png)
![](assets/data_dictionary/4.png)
![](assets/data_dictionary/3.png)
![](assets/data_dictionary/2.png)
![](assets/data_dictionary/1.png)

### 4.4 Results Example and Data Usage
- we can use the data models to get insights by tun SQL queries on modeled tables, below I got an example of how many immigrants arrived by country for Jan 2016

```
SELECT c.country, COUNT(*) FROM fact_immigration i
INNER JOIN country_code c ON i.country = c.country_id
WHERE i.year=2016 AND i.month=1
GROUP BY i.country
ORDER BY count DESC
LIMIT 10
```

![](assets/test_query.png)


### Step 5: Conclusion and Reflection

The target star Schema has been successfully created and passed the quality checks. The data model employed the tool and technologies: python, Pandas, and Apache Spark. The Schema is optimized to support OLAP queries. Even though the data model is created using a portion of the data, it is expected to work when we concatenate all of the data and load it with Spark. However, with a larger dataset or handling a large number of simultaneous queries, using more clusters or EMR would as the size of the data scales up.

#### Tools and Technologies usage:

Python, Pandas, and Spark enabled us to read, clean, ingest, process, load, and store the data with ease.

Spark was chosen over Hadoop for its better flexibility in wrangling data and better adaption in working on both local machines and scaling up nicely and efficiently on a larger cluster of nodes.

Pandas dataframes are extremely easy to explore and analyze. If we take a portion of the data in the spark session and put it into Pandas dataframe, we can implement a plethora of built-in functions that can help for serving as a proof of concept on a smaller scale, then test it on a larger scale with spark dataframes and user-defined functions.

We could use more technologies in future work. We might add Apache Airflow DAGs to automate fact table updates and data quality checks. Cloud-based storage and/or instance can be utilized to ensure that database is always up.

#### Data Model:

The star schema was designed with OLAP queries in mind. The ETL process, and sequentially the data quality checks, ensured that the dimension tables are normalized. This normalization allows for analytical queries they are more intuitive and less complex, and computationally inexpensive in case of aggregation on any of the dimensions. And due to the fact that the dimension tables are relatively small with a sufficiently large number of uniques primary keys, the joins are also intuitive and less taxing on the database.

#### Future Work and Optimizations
- **When the data was increased by 100x** in this case we are deploying more work nodes for Spark with data distributed between them. If this might not suffice as data scales up, we could use Amazon EMR, but it's more costly than the addition of more nodes in the cluster.

- **How do you run this pipeline on a daily basis by 7 am every day.** In this case, we can design Apache Airflow DAGs that execute the ETL pipeline as well as data quality checks, scheduled to run every day at 7 am, then check the logs for better understanding for what happened before.

- **How do you make your database could be accessed by 100+ people?** In this case, we might consider creating additional tables that are optimized for the most frequent queries.