# Data Engineering Capstone project: ETL walkthrough

This notebook shows references to the different steps described in the `README.MD` of this repository, showing the code base to understand the main steps in the ETL pipeline designed as well as key choices made in the process.

This file includes a code walkthrough for the 5 sections in which the project ETL is divided into:

1. Create data model
2. Stage data
3. Copy data
4. Run DQ checks
5. Run data analyses

![title](img/pipeline.PNG)

This is support material, that should be used jointly with the main contents of this repository:

- **`README.MD` file**: Contains all relevant project documentation
- **`/airflow` directory**: Contains all the actual ETL pipeline


In [1]:
import pandas as pd
from tqdm import tqdm
import numpy as np
import boto3, psycopg2
import dask.dataframe as dd

# Close connection and connect to prodigy_warehouse
conn_string = "dbname=dev " + \
              "port= '5439' " + \
              "user='awsuser' " +  \
              "password='P4ssw0rd' " + \
              "host=udacity-capstone.cw6miikzoxut.us-west-2.redshift.amazonaws.com"

con = psycopg2.connect(conn_string);
cur = con.cursor()


---
# 1. Create data model
At this step the schemas and tables are created - see `README.MD` for full reference.

These are created using predefined SQL statements that can be found at `/airflow/plugins/helpers/sql_queries.py`

---

# 2. Stage data

Data staging is conducted over three different types of data, each one of them presenting an associated task:
1. Monthly immigration data
2. Immigration dimensions
3. Temperature data

In this section we walk through each of these tasks.

### 2.1. Monthly immigration data

##### DESCRIPTION AND OPERATOR

This dataset contains the **monthly i94 records** of every person entering the United States at any given month.

Each monthly dataset is processed as a part of the ETL, using as input the scheduled run time in Airflow. For detail on the specific process, please refer to `StageImmigrationDataOperator` defined in `/airflow/plugins/operators/stage_immigration_data.py`.

##### EXAMPLE

We take the monthly data for January 2016 as an example, where the raw data has been copied into parquet files:

In [2]:
data = pd.read_parquet(f"s3://ascfraguas-udacity-deng-capstone/raw/immigration-data/i94_jan16_sub.parquet")
data.head()


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,7.0,2016.0,1.0,101.0,101.0,BOS,20465.0,1.0,MA,,...,,,1996.0,D/S,M,,LH,346608285.0,424,F1
1,8.0,2016.0,1.0,101.0,101.0,BOS,20465.0,1.0,MA,,...,,,1996.0,D/S,M,,LH,346627585.0,424,F1
2,9.0,2016.0,1.0,101.0,101.0,BOS,20469.0,1.0,CT,20480.0,...,,M,1999.0,07152016,F,,AF,381092385.0,338,B2
3,10.0,2016.0,1.0,101.0,101.0,BOS,20469.0,1.0,CT,20499.0,...,,M,1971.0,07152016,F,,AF,381087885.0,338,B2
4,11.0,2016.0,1.0,101.0,101.0,BOS,20469.0,1.0,CT,20499.0,...,,M,2004.0,07152016,M,,AF,381078685.0,338,B2


As relevant data for our ETL process we can find the following fields:

- RECORD

    - `admnum` --> Admission number, unique identifier
    - `i94yr` --> Identifies the year of the extraction, int
    - `i94mon` --> Identifies the month of the extraction, int

- PASSENGER

    - `i94cit` --> Origin of the flight (converted with **country_codes** table)
    - `i94res` --> Residency of passenger (converted with **country_codes** table)
    - `i94bir` --> Age of passenger in years, int
    - `gender` --> Gender of the passenger
    - `i94visa` --> Collapsed reason of arrival (converted with **trip_reason_codes** table)

- ARRIVAL

    - `arrdate` --> Date of arrival
    - `i94mode` --> Mode of arrival (converted with **entry_channel_codes** table)
    - `i94adr` --> State of address in the US (converted with **state_codes** table)

- DEPARTURE

    - `depdate` --> Date of departure from the US
    
##### DATA STAGING

Some of the main steps involved in the data preprocessing towards staging, as defined in the `StageImmigrationDataOperator` operator, include:

- Elimination of invalid and duplicate `admnum`, which is considered the primary key of this table
- Ensuring the data is correctly extracted by checking the fields `i94yr` and `i94mon`, which should match the values in the file name
- Data type casting
- Setting to NA invalid values in the fields `i94bir` and `gender`
- Creation of the fields `arrival_day`, `arrival_month` and `arrival_year` by manipulating the field `arrdate`
- Creation of the fields `departure_day`, `departure_month` and `departure_year` by manipulating the field `depdate`
- Creation of the field `length_of_stay`

As an output of the staging step we obtain the monthly data curated and in the format required by the `immigration.us_entries` table - see section 3 for example data from this table.

### 2.2. Immigration dimensions

##### DESCRIPTION AND OPERATOR

The immigration dimensions are a **set of mapping tables** that unravel the numerical codes for several fields in the immigration data into their string values.

These mappings are defined in the data dictionary for the raw immigration data - see the file `additional_resources/I94_SAS_Labels_Descriptions.SAS`.

We can differentiate a total of 5 different mappings:

- `country_codes`
- `port_codes`
- `entry_channel_codes`
- `state_codes`
- `trip_reason_codes`

For full detail ion the process please refer to the class `StageImmigrationDimensionsOperator` defined in `/airflow/plugins/operators/stage_immigration_dimensions.py`.

##### EXAMPLE

As an example, below is shown the mapping corresponding to the `trip_reason_codes` table:


In [3]:
    trip_reason_codes = {
        1: 'Business',
        2: 'Pleasure',
        3: 'Student'
    }

##### DATA STAGING

These mappings are stored as csv files in the staging area in s3, from which the dimension tables are created - see section 3 for detail on these dimension tables

### 2.3. Temperatures data

##### DESCRIPTION AND OPERATOR

This dataset contains **historical series of temperature by city**. 

The ETL uses this data to respond to specific questions, as described in section 5.

For full detail on the process please refer to the class `StageTemperatureDataOperator` defined in `/airflow/plugins/operators/stage_temperature_data.py`.

##### EXAMPLE

The data comes in a csv file, which is direclty staged for copy into Redshift - see below file:

In [4]:
data = pd.read_csv("s3://ascfraguas-udacity-deng-capstone/raw/temperatures-data/GlobalLandTemperaturesByCity.csv")
data.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


##### DATA STAGING

The data is directly moved into the staging area for copying into Redshift, from which the table `temperature.full_temperature_data` is created - see section 3 for detail.

---

# 3. Copy data

The tasks in this stage are intended to populate the schemas in our Redshift database, which compose opur data model.

Specifically, we have 4 different tasks:

1. Copying monthly immigration data
2. Copying immigration dimensions
3. Copying temperature data
4. Summarizing temperature data

### 3.1. Copying monthly immigration data

##### DESCRIPTION AND OPERATOR

In this task we copy the monthly immigration data into the `immigration.us_entries` table.

For detail on the process, please refer to `CopyDataOperator` defined in `/airflow/plugins/operators/copy_data.py`.

##### EXAMPLE

After the data is copied into Redshift we have the table described in the data model - see `README.MD`. These are some example records:

In [5]:
cur.execute("SELECT * FROM immigration.us_entries LIMIT 5")
colnames = [desc[0] for desc in cur.description]
table = pd.DataFrame(cur.fetchall(), columns=colnames)
table


Unnamed: 0,admnum,i94bir,gender,i94visa,i94cit,i94res,i94addr,i94mode,arrival_day,arrival_month,arrival_year,departure_day,departure_month,departure_year,length_of_stay
0,346608285,20.0,M,3.0,101.0,101.0,MA,1.0,12,1,2016,-9999,-9999,-9999,-9999
1,428561085,70.0,F,2.0,101.0,101.0,MA,1.0,27,1,2016,-9999,-9999,-9999,-9999
2,431605285,74.0,M,2.0,101.0,101.0,NV,1.0,28,1,2016,-9999,-9999,-9999,-9999
3,404385885,37.0,M,2.0,101.0,101.0,NY,1.0,21,1,2016,21,2,2016,31
4,358836685,51.0,F,2.0,101.0,101.0,NY,1.0,13,1,2016,28,3,2016,75


### 3.2. Copying immigration dimensions

##### DESCRIPTION AND OPERATOR

In this task we copy the immigration dimensions into the following tables:

- `immigration.country_codes`
- `immigration.port_codes`
- `immigration.entry_channel_codes`
- `immigration.state_codes`
- `immigration.trip_reason_codes`

For detail on the process, please refer to `CopyDimensionsOperator` defined in `/airflow/plugins/operators/copy_dimensions.py`.

##### EXAMPLE

After the data is copied into Redshift we have the dimension tables described in the data model - see `README.MD`. These are some example records:

In [6]:
for table in ['immigration.country_codes', 'immigration.port_codes', 'immigration.entry_channel_codes',
              'immigration.state_codes', 'immigration.trip_reason_codes']:
    print(table)
    cur.execute(f"SELECT * FROM {table} LIMIT 5")
    colnames = [desc[0] for desc in cur.description]
    table = pd.DataFrame(cur.fetchall(), columns=colnames)
    display(table)
    

immigration.country_codes


Unnamed: 0,code,country_name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


immigration.port_codes


Unnamed: 0,code,port_name
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


immigration.entry_channel_codes


Unnamed: 0,code,entry_channel
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported
4,1,Air


immigration.state_codes


Unnamed: 0,code,state_name
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


immigration.trip_reason_codes


Unnamed: 0,code,trip_reason
0,1,Business
1,2,Pleasure
2,3,Student
3,1,Business
4,2,Pleasure


### 3.3. Copying temperature data

##### DESCRIPTION AND OPERATOR

In this task we copy the temperature data into the table `temperature.full_temperature_data`.

For detail on the process, please refer to `CopyDataOperator` defined in `/airflow/plugins/operators/copy_data.py`.

##### EXAMPLE

After the data is copied into Redshift we have the table described in the data model - see `README.MD`. These are some example records:

In [7]:
cur.execute("SELECT * FROM temperature.full_temperature_data LIMIT 5")
colnames = [desc[0] for desc in cur.description]
table = pd.DataFrame(cur.fetchall(), columns=colnames)
table


Unnamed: 0,dt,averagetemperature,averagetemperatureuncertainty,city,country,latitude,longitude
0,1833-02-01,-1.171,2.533,Baglan,Afghanistan,36.17N,69.61E
1,1833-10-01,9.901,1.841,Baglan,Afghanistan,36.17N,69.61E
2,1834-06-01,20.767,2.258,Baglan,Afghanistan,36.17N,69.61E
3,1835-02-01,,,Baglan,Afghanistan,36.17N,69.61E
4,1835-10-01,9.496,1.755,Baglan,Afghanistan,36.17N,69.61E


### 3.4. Summarizing temperature data

##### DESCRIPTION AND OPERATOR

In this task we summarize the temperature data in `temperature.full_temperature_data`, creating a summary table `temperature.temps_summary`.

The task is executed with a predefined `PostgresOperator`, using predefined SQL code - check the definition of the task in the DAG `etl.py` for detail

##### EXAMPLE

After the data is summarized, we have created the new table as described in the data model - see `README.MD`. These are some example records:

In [8]:
cur.execute("SELECT * FROM temperature.temp_summary LIMIT 5")
colnames = [desc[0] for desc in cur.description]
table = pd.DataFrame(cur.fetchall(), columns=colnames)
table


Unnamed: 0,country_name,mean_temp,stddev_temp
0,BELGIUM,9.726197,6.026465
1,OMAN,25.565806,2.848364
2,UNITED ARAB EMIRATES,26.572681,5.636053
3,COLOMBIA,22.683264,3.494776
4,LAOS,24.107664,3.407831


---
# 4. Run DQ checks

This stage is composed of one single task, in which a total of 8 data quality checks are performed to evaluate the correct data transfer into the Redshift tables created - see the actual DAG for detail.

The operator used is `RunQualityCheckOperator`, defined in `/airflow/plugins/operators/run_quality_checks.py`. This custom operator allows for any given SQL statement to be provided as input along with a success criteria, against which the results will be tested.

---
# 5 . Run data analyses

As described in the project documentation, **the ultimate goal is to generate a series of monthly reports delivered on a monthly basis**. 

These specific reports are ran using the `RunAnalysisOperator` defined at `/airflow/plugins/operators/run_analysis.py`.

There's a total of 4 reports run, each one of them linked to a task. Each of these reports is ran from the corresponding SQL query at `/airflow/plugins/helpers/sql_queries.py`, and deliver their outputs to specific Redshift tables created in the `outputs` schema.

### 5.1. Mean age and gender by frequency and channel

This table yields the mean age by gender and channel of arrival into the US for the selected month. 

Example rows from the output table shown below:

In [9]:
cur.execute("SELECT * FROM outputs.jan2016_demographics_by_channel LIMIT 5")
colnames = [desc[0] for desc in cur.description]
table = pd.DataFrame(cur.fetchall(), columns=colnames)
table


Unnamed: 0,entry_channel,gender,average_age
0,Air,X,35.321632
1,Sea,,39.190476
2,Sea,F,45.509144
3,Not reported,M,48.483686
4,Land,M,36.380411


### 5.2. Mean stay by country

This table yields the stay of the people with a communicated arrival date and departure date, segmented by country of residence, and for all the people arriving in the selected. 

Example rows from the output table shown below:

In [10]:
cur.execute("SELECT * FROM outputs.jan2016_length_of_stay LIMIT 5")
colnames = [desc[0] for desc in cur.description]
table = pd.DataFrame(cur.fetchall(), columns=colnames)
table


Unnamed: 0,country_name,average_stay
0,SAN MARINO,8
1,BANGLADESH,18
2,MAURITIUS,15
3,BAHAMAS,9
4,LIBERIA,31


### 5.3. Frequency by trip reason and state

This table shows the number of individuals traveling into the US by month, segmented by state of destination and reason of the travel - type of visa.

Example rows from the output table shown below:

In [11]:
cur.execute("SELECT * FROM outputs.jan2016_state_trip_reasons LIMIT 5")
colnames = [desc[0] for desc in cur.description]
table = pd.DataFrame(cur.fetchall(), columns=colnames)
table


Unnamed: 0,state_name,trip_reason,count
0,NEW MEXICO,Student,973
1,NEW HAMPSHIRE,Pleasure,1219
2,S. CAROLINA,Business,3282
3,,Business,137
4,,Business,248


### 5.3. Frequency against mean historical temperature of the country

This table displays the frequency of travelers by coutry in the analyzed month, and also displays the historical mean temperature and the standard deviation of the series for the country.

Example rows from the output table shown below:

In [12]:
cur.execute("SELECT * FROM outputs.jan2016_freqs_and_mean_temps LIMIT 5")
colnames = [desc[0] for desc in cur.description]
table = pd.DataFrame(cur.fetchall(), columns=colnames)
table


Unnamed: 0,country_name,visitor_count,mean_temp,stddev_temp
0,ARGENTINA,73923,16.999216,5.866641
1,INDIA,93518,25.429224,5.130299
2,MOROCCO,2170,17.295399,5.062322
3,LIBERIA,207,25.673186,1.10519
4,REUNION,87,23.301403,1.880354


---
---
# ANNEX
### Using Dask as part of the ETL process

In the case the volume of data was increased considerably (i.e. x100), and given the current structure of the ETL, **this could pose an issue to the pandas based manipulation of immigration data** which is depicted in section 2.1. of this notebook.

In this case, the natural choice would be migrating the preprocessing previous to staging from Pandas to more powerful frameworks, such as Spark or, for example Dask.

[Dask](https://docs.dask.org/en/latest/) is a powerful framework for both workload parallelyzation and bigger-than-memory data manipulation. It allows for lazy and evaluation on larger-than-memory datasets. Furthermore, it presents a very user friendly and pandas-like API layout that ensures easy manipulation of this type of datasets.

For example, consider loading the data as below:


In [2]:
data = dd.read_parquet(f"s3://ascfraguas-udacity-deng-capstone/raw/immigration-data/i94_jan16_sub.parquet")
data.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,7.0,2016.0,1.0,101.0,101.0,BOS,20465.0,1.0,MA,,...,,,1996.0,D/S,M,,LH,346608285.0,424,F1
1,8.0,2016.0,1.0,101.0,101.0,BOS,20465.0,1.0,MA,,...,,,1996.0,D/S,M,,LH,346627585.0,424,F1
2,9.0,2016.0,1.0,101.0,101.0,BOS,20469.0,1.0,CT,20480.0,...,,M,1999.0,07152016,F,,AF,381092385.0,338,B2
3,10.0,2016.0,1.0,101.0,101.0,BOS,20469.0,1.0,CT,20499.0,...,,M,1971.0,07152016,F,,AF,381087885.0,338,B2
4,11.0,2016.0,1.0,101.0,101.0,BOS,20469.0,1.0,CT,20499.0,...,,M,2004.0,07152016,M,,AF,381078685.0,338,B2


In [3]:
display(type(data))

dask.dataframe.core.DataFrame

Dask would be a natural choice for data manipulation in this scenario, with a very simple migration of the related task.