# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project build up a data warehouse by integrating immigration data and demography data together to provide a wider range single-source-of-truth database.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [11]:
# Do all imports and installs here
import pandas as pd
import warnings

warnings.filterwarnings("ignore")

### Step 1: Scope the Project and Gather Data

#### Scope 
This project will integrate I94 immigration data, world temperature data and US demographic data to setup a data warehouse with fact and dimension tables.

#### Describe and Gather Data 
__I94 Immigration Data (SAS)__:
Data contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries).

__World Temperature Data (csv)__:
This dataset is from Kaggle and contains monthly average temperature data at different country in the world wide.

__U.S. City Demographic Data (csv)__:
This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000.

### Step 2: Explore and Assess the Data
#### Explore the Data 
1. Use pandas for exploratory data analysis to get an overview on these data sets
2. Split data sets to dimensional tables and change column names for better understanding
3. Utilize PySpark on one of the SAS data sets to test ETL data pipeline logic

In [21]:
# Read in the data here
df_immi = pd.read_csv("immigration_data_sample.csv")
df_temp = pd.read_csv("../../data2/GlobalLandTemperaturesByCity.csv")
df_graphic = pd.read_csv('us-cities-demographics.csv', delimiter=';')

In [6]:
df_immi.head(5)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [7]:
df_immi.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [8]:
fact_immigration = df_immi[['cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode', 'i94visa']]
fact_immigration.columns = ['cic_id', 'year', 'month', 'city_code', 'state_code', 'arrive_date', 'departure_date', 'mode', 'visa']
fact_immigration.head(5)

Unnamed: 0,cic_id,year,month,city_code,state_code,arrive_date,departure_date,mode,visa
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,2.0
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,2.0


In [12]:
fact_immigration['country'] = 'United States'
fact_immigration.head(5)

Unnamed: 0,cic_id,year,month,city_code,state_code,arrive_date,departure_date,mode,visa,country
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0,United States
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0,United States
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0,United States
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,2.0,United States
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,2.0,United States


In [13]:
dim_immi_personal = df_immi[['cicid', 'i94cit', 'i94res', 'biryear', 'gender', 'insnum']]
dim_immi_personal.columns = [['cic_id', 'citizen_country', 'residence_country', 'birth_year', 'gender', 'ins_num']]
dim_immi_personal.head(5)

Unnamed: 0,cic_id,citizen_country,residence_country,birth_year,gender,ins_num
0,4084316.0,209.0,209.0,1955.0,F,
1,4422636.0,582.0,582.0,1990.0,M,
2,1195600.0,148.0,112.0,1940.0,M,
3,5291768.0,297.0,297.0,1991.0,M,
4,985523.0,111.0,111.0,1997.0,F,


In [14]:
dim_immi_airline = df_immi[['cicid', 'airline', 'admnum', 'fltno', 'visatype']]
dim_immi_airline.columns = ['cic_id', 'airline', 'admin_num', 'flight_number', 'visa_type']
dim_immi_airline.head(5)

Unnamed: 0,cic_id,airline,admin_num,flight_number,visa_type
0,4084316.0,JL,56582670000.0,00782,WT
1,4422636.0,*GA,94362000000.0,XBLNG,B2
2,1195600.0,LH,55780470000.0,00464,WT
3,5291768.0,QR,94789700000.0,00739,B2
4,985523.0,,42322570000.0,LAND,WT


In [17]:
df_temp.head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [18]:
df_temp_usa = df_temp[df_temp['Country'] == 'United States']
df_temp_usa = df_temp_usa[['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country']]
df_temp_usa.columns = ['dt', 'avg_temp', 'avg_temp_uncertnty', 'city', 'country']
df_temp_usa.head(5)

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country
47555,1820-01-01,2.101,3.217,Abilene,United States
47556,1820-02-01,6.926,2.853,Abilene,United States
47557,1820-03-01,10.767,2.395,Abilene,United States
47558,1820-04-01,17.989,2.202,Abilene,United States
47559,1820-05-01,21.809,2.036,Abilene,United States


In [19]:
df_temp_usa['dt'] = pd.to_datetime(df_temp_usa['dt'])
df_temp_usa['year'] = df_temp_usa['dt'].apply(lambda t: t.year)
df_temp_usa['month'] = df_temp_usa['dt'].apply(lambda t: t.month)
df_temp_usa.head()

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country,year,month
47555,1820-01-01,2.101,3.217,Abilene,United States,1820,1
47556,1820-02-01,6.926,2.853,Abilene,United States,1820,2
47557,1820-03-01,10.767,2.395,Abilene,United States,1820,3
47558,1820-04-01,17.989,2.202,Abilene,United States,1820,4
47559,1820-05-01,21.809,2.036,Abilene,United States,1820,5


In [20]:
# Does not contain data of year 2016
df_temp_usa_2016 = df_temp_usa[df_temp_usa['year'] == 2016]
df_temp_usa_2016.head(5)

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country,year,month


In [22]:
df_graphic.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [23]:
dim_city_population = df_graphic[['City', 'State', 'Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', 'Race']]
dim_city_population.columns = ['city', 'state', 'male_pop', 'female_pop', 'num_vetarans', 'foreign_born', 'race']
dim_city_population.head(5)

Unnamed: 0,city,state,male_pop,female_pop,num_vetarans,foreign_born,race
0,Silver Spring,Maryland,40601.0,41862.0,1562.0,30908.0,Hispanic or Latino
1,Quincy,Massachusetts,44129.0,49500.0,4147.0,32935.0,White
2,Hoover,Alabama,38040.0,46799.0,4819.0,8229.0,Asian
3,Rancho Cucamonga,California,88127.0,87105.0,5821.0,33878.0,Black or African-American
4,Newark,New Jersey,138040.0,143873.0,5829.0,86253.0,White


In [24]:
dim_city_statistics = df_demog[['City', 'State', 'Median Age', 'Average Household Size']]
dim_city_statistics.columns = ['city', 'state', 'median_age', 'avg_household_size']
dim_city_statistics.head(5)

Unnamed: 0,city,state,median_age,avg_household_size
0,Silver Spring,Maryland,33.8,2.6
1,Quincy,Massachusetts,41.0,2.39
2,Hoover,Alabama,38.5,2.58
3,Rancho Cucamonga,California,34.5,3.18
4,Newark,New Jersey,34.6,2.73


### Cleaning Steps
1. Transform arrive_date, departure_date in immigration data from SAS time format to pandad datetime format
2. Tranform city, state in demography data to upper case to match city_code and state_code table

In [26]:
def SAS_to_datetime(date):
    return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')

In [27]:
fact_immigration['arrive_date'] = SAS_to_datetime(fact_immigration['arrive_date'])
fact_immigration['departure_date'] = SAS_to_datetime(fact_immigration['departure_date'])
fact_immigration.head(5)

Unnamed: 0,cic_id,year,month,city_code,state_code,arrive_date,departure_date,mode,visa,country
0,4084316.0,2016.0,4.0,HHW,HI,2016-04-22,2016-04-29,1.0,2.0,United States
1,4422636.0,2016.0,4.0,MCA,TX,2016-04-23,2016-04-24,1.0,2.0,United States
2,1195600.0,2016.0,4.0,OGG,FL,2016-04-07,2016-04-27,1.0,2.0,United States
3,5291768.0,2016.0,4.0,LOS,CA,2016-04-28,2016-05-07,1.0,2.0,United States
4,985523.0,2016.0,4.0,CHM,NY,2016-04-06,2016-04-09,3.0,2.0,United States


In [28]:
dim_city_statistics['city'] = dim_city_statistics['city'].str.upper()
dim_city_statistics['state'] = dim_city_statistics['state'].str.upper()
dim_city_statistics.head(5)

Unnamed: 0,city,state,median_age,avg_household_size
0,SILVER SPRING,MARYLAND,33.8,2.6
1,QUINCY,MASSACHUSETTS,41.0,2.39
2,HOOVER,ALABAMA,38.5,2.58
3,RANCHO CUCAMONGA,CALIFORNIA,34.5,3.18
4,NEWARK,NEW JERSEY,34.6,2.73


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Since the purpose of this data warehouse is for OLAP and BI app usage, we will model these data sets with __star schema__ data modeling. </br>
https://ibb.co/jZJm6k4

#### 3.2 Mapping Out Data Pipelines
1. Dataset from different sources: </br>
    a. sas_data/ </br>
    b. GlobalLandTemperaturesByCity.csv </br>
    c. us-cities-demographics.csv </br>
2. Follow by Step 2 – Cleaning step to clean up data sets
3. Transform immigration data to 1 fact table and 2 dimension tables, fact table will be partitioned by state
4. Parsing label description file to get auxiliary tables
5. Transform temperature data to dimension table 
6. Split demography data to 2 dimension tables
7. Store these tables back to target S3 bucket

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

Data processing and data model was created by Spark. </br>
Refer to `etl.py`

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

Refer to `check_quality.py`

#### 4.3 Data dictionary 
Refer to https://ibb.co/hKYSVqc

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Clearly state the rationale for the choice of tools and technologies for the project.
1. Pandas for sample data set exploratory data analysis
2. PySpark for large data set data processing to transform staging table to dimensional table

#### Propose how often the data should be updated and why.
1. Tables created from immigration and temperature data set should be updated monthly since the raw data set is built up monthly.
2. Tables created from demography data set could be updated annually since demography data collection takes time and high frequent demography might take high cost but generate wrong conclusion.
3. All tables should be update in an append-only mode.

#### The data was increased by 100x.
* If Spark with standalone server mode can not process 100x data set, we could consider to put data in AWS EMR which is a distributed data cluster for processing large data sets on cloud

#### The data populates a dashboard that must be updated on a daily basis by 7am every day.
* Apache Airflow could be used for building up a ETL data pipeline to regularly update the date and populate a report. Apache Airflow also integrate with Python and AWS very well. More applications can be combined together to deliever more powerful task automation.

#### The database needed to be accessed by 100+ people.
* AWS Redshift can handle up to 500 connections. If this SSOT database will be accessed by 100+ people, we can move this database to Redshift with confidence to handle this request. Cost/Benefit analysis will be needed if we are going be implement this cloud solution.

#### Validate data model

##### Airport station will get benefits from this data warehouse
##### Some questions would be anwsered with the help of this data model:
1. Which state has the most people arrive?
2. When do people most usually travel?
3. Does temperature of city or state affect to the traveling of people

##### Some queries could be provided:
Top 5 states that people love to travel mostly `select top 5 state_code, count(immigration_id) from fact_immigration group by state_cde order by state_code desc`

Top 3 months that people love to travel mostly `select top 3 month(arrive_date) as month_arrive, count(immigration_id) from fact_immigration group by month_arrive order by month_arrive desc`