# US National Tourism and Trade Office - Data Engineering
### Data Engineering 2020 -  Capstone Project

#### Author: Priscila De-Stefano

#### Project Summary

The main goal of this project is to organize data from US National Tourism and Trade Office in order to create a data lake and a structured database.

The project follows the following sections:
* Project Scope
* Gather Data
* Data Explore
* Data Model
* Data Pipeline
* Project Write Up

### Project Scope

The main goal of this project is to organize data from United States immigration in order to create a data lake and a structured database. This two environments could be used by third-party softwares in other to gather detailed information about the flow of people in and out of the United States in the year of 2016.

Example of data that could be retrived from this environment:
* number of non-immigrants arriving in the US by state in 2016
* number of non-immigrants arriving in the US by city in 2016
* number of non-immigrants arriving in the US for Education purposes in 2016
* number of non-immigrants arriving in the US for Business purposes in 2016
* ranking of states that receive more students in the US
* ranking of airports that most receive people from a given country
* etc...


#### Data Lake - AWS S3

Data is load from data sources into Dataframes. After data is organized, verified and cleaned, it is saved in a AWS S3 enviroment.
The S3 bucket contains csv files and parquet files.

#### Database - AWS Redshift

This final database is implemented on AWS Redshift. Data is loaded from Data Lake by tasks on Apache Airflow and then saved in the database.

### Gather Data

This project is based on three data sources:
* **Immigration Data**: This data comes from US National Tourism and Trade Office and is the biggest data set of this project. For the scope of this project submission, only data from April 2016 is being used.
* **Aiports data source**: This data comes from DataHub.com and it describes basic information about airports around the world
* **Immigration data dictionary**: This data source is provided by Udacity team and being used to better describe and clean data

The steps below describe how data is being gathered and saved into Pandas Dataframes and a Spark Dataframe.

### IMPORTANT:
Package s3fs must be installed in this enviroment in order to run this notebook properly.
<br>
**command**: pip install s3fs

In [1]:
# Do all imports and installs here
import s3fs
import pandas as pd
import numpy as np
import os
import configparser
from pyspark.sql.types import IntegerType
from datetime import datetime, timedelta
from pyspark.sql import types as T
from pyspark.sql.functions import udf

In [2]:
# Get AWS credentials
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
# DATA SOURCE: US National Tourism and Trade Office.
df_immig_sample = pd.read_csv("immigration_data_sample.csv")

In [4]:
# DATA SOURCE: COUNTRY CODES FROM DATA DICTIONARY
df_country = pd.read_csv("country_code.csv",delimiter=";")

In [5]:
# DATA SOURCE: ENTRY PORT CODES FROM DATA DICTIONARY
df_port = pd.read_csv("entry_port_code.csv",delimiter=";")

In [6]:
# DATA SOURCE: TRAVEL MODE CODES FROM DATA DICTIONARY
df_mode = pd.read_csv("travel_mode.csv",delimiter=";")

In [7]:
# DATA SOURCE: STATE CODES FROM DATA DICTIONARY
df_state = pd.read_csv("state.csv",delimiter=";")

In [8]:
# DATA SOURCE: VISA CODES FROM DATA DICTIONARY
df_visa = pd.read_csv("visa.csv",delimiter=";")

In [9]:
# DATA SOURCE: STATE CODES FROM DATA DICTIONARY
df_state = pd.read_csv("state.csv",delimiter=";")

In [10]:
# DATA SOURCE: Airport Codes from DataHub
df_airport = pd.read_csv("airport-codes_csv.csv")

In [11]:
# source for property names:
# https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/#Authentication_properties
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.2").\
config("fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID']).\
config("fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY'])\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

#### Data Sample

In [12]:
df_immig_sample.head(5)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [13]:
df_country.head(5)

Unnamed: 0,code,description
0,582,"'MEXICO Air Sea, and Not Reported (I-94, no ..."
1,236,'AFGHANISTAN'
2,101,'ALBANIA'
3,316,'ALGERIA'
4,102,'ANDORRA'


In [14]:
df_state.head(5)

Unnamed: 0,code,description
0,'AL','ALABAMA'
1,'AK','ALASKA'
2,'AZ','ARIZONA'
3,'AR','ARKANSAS'
4,'CA','CALIFORNIA'


In [15]:
df_port.head(5)

Unnamed: 0,code,description
0,'ALC'\t,"\t'ALCAN, AK '"
1,'ANC'\t,"\t'ANCHORAGE, AK '"
2,'BAR'\t,"\t'BAKER AAF - BAKER ISLAND, AK'"
3,'DAC'\t,"\t'DALTONS CACHE, AK '"
4,'PIZ'\t,"\t'DEW STATION PT LAY DEW, AK'"


In [16]:
df_mode.head(5)

Unnamed: 0,code,description
0,1,'Air'
1,2,'Sea'
2,3,'Land'
3,9,'Not reported'


In [17]:
df_airport.head(5)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [19]:
df_visa.head(5)

Unnamed: 0,code,description
0,1,Business
1,2,Pleasure
2,3,Student


### Data Explore
In this section data will be explored in order to find missing values, duplicate data, etc.
Also, some columns are renamed in order to improve the analysis.

#### Cleaning Steps

In [18]:
# remove \t and \n from data dictionary dataframes
df_country = df_country.replace('(\t|\n)','', regex=True)
df_mode = df_mode.replace('(\t|\n)','', regex=True)
df_state = df_state.replace('(\t|\n)','', regex=True)
df_visa = df_visa.replace('(\t|\n)','', regex=True)
df_port = df_port.replace('(\t|\n)','', regex=True)
df_state = df_state.replace('(\t|\n)','', regex=True)


# remove '' from data dictionary dataframes
df_country['description'] = df_country['description'].map(lambda x: x.lstrip(' \'').rstrip('\''))
df_port['code'] = df_port['code'].map(lambda x: x.lstrip(' \'').rstrip('\''))
df_port['description'] = df_port['description'].map(lambda x: x.lstrip(' \'').rstrip('\''))
df_mode['description'] = df_mode['description'].map(lambda x: x.lstrip(' \'').rstrip('\''))
df_state['description'] = df_state['description'].map(lambda x: x.lstrip(' \'').rstrip('\''))
df_visa['description'] = df_visa['description'].map(lambda x: x.lstrip(' \'').rstrip('\''))
df_state['code'] = df_state['code'].map(lambda x: x.lstrip(' \'').rstrip('\''))

In [20]:
# drop columns that won't be used during project
columns = ['count', 'visapost','i94cit','biryear', 'insnum','dtaddto','dtadfile', 'matflag', 'entdepa', 
           'entdepd', 'entdepu', 'occup', 'airline', 'fltno', 'adm_num']
df_spark = df_spark.drop(*columns)

In [21]:
# renaming columns
df_spark = df_spark.withColumnRenamed("cicid","cic_id") \
    .withColumnRenamed("i94yr","year") \
    .withColumnRenamed("i94mon","month") \
    .withColumnRenamed("i94res","country_code") \
    .withColumnRenamed("i94port","us_entry_port") \
    .withColumnRenamed("arrdate","arrival_date") \
    .withColumnRenamed("i94mode","travel_mode") \
    .withColumnRenamed("i94addr","us_state") \
    .withColumnRenamed("depdate","departure_date") \
    .withColumnRenamed("i94bir","age") \
    .withColumnRenamed("i94visa","visa_code") \
    .withColumnRenamed("visatype","visa_type") \
    .withColumnRenamed("biryear","birth_year") \
    .withColumnRenamed("admnum","adm_num") \

df_spark.limit(1).toPandas()

Unnamed: 0,cic_id,year,month,country_code,us_entry_port,arrival_date,travel_mode,us_state,departure_date,age,visa_code,gender,adm_num,visa_type
0,6.0,2016.0,4.0,692.0,XXX,20573.0,,,,37.0,2.0,,1897628000.0,B2


In [22]:
# clean by column us_state: remove rows which us_state is not valid
df_state_array =  list(df_state['code'])
df_spark = df_spark.filter(df_spark.us_state.isin(*df_state_array) == True)

In [23]:
def convert_datetime(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
    
udf_datetime_from_sas = udf(lambda x: convert_datetime(x), T.DateType())

In [24]:
# convert text columns to integer in spark dataframe
df_spark = df_spark.withColumn("year", df_spark["year"].cast(IntegerType()))
df_spark = df_spark.withColumn("month", df_spark["month"].cast(IntegerType()))
df_spark = df_spark.withColumn("cic_id", df_spark["cic_id"].cast(IntegerType()))
df_spark = df_spark.withColumn("country_code", df_spark["country_code"].cast(IntegerType()))
df_spark = df_spark.withColumn("arrival_date", df_spark["arrival_date"].cast(IntegerType()))
df_spark = df_spark.withColumn("departure_date", df_spark["departure_date"].cast(IntegerType()))
df_spark = df_spark.withColumn("travel_mode", df_spark["travel_mode"].cast(IntegerType()))
df_spark = df_spark.withColumn("age", df_spark["age"].cast(IntegerType()))
df_spark = df_spark.withColumn("visa_code", df_spark["visa_code"].cast(IntegerType()))
df_spark = df_spark.withColumn("adm_num", df_spark["adm_num"].cast(IntegerType()))
df_spark = df_spark.withColumn("arrival_date",udf_datetime_from_sas("arrival_date"))
df_spark = df_spark.withColumn("departure_date",udf_datetime_from_sas("departure_date"))
df_spark.limit(1).toPandas()

Unnamed: 0,cic_id,year,month,country_code,us_entry_port,arrival_date,travel_mode,us_state,departure_date,age,visa_code,gender,adm_num,visa_type
0,7,2016,4,276,ATL,2016-04-07,1,AL,,25,3,M,2147483647,F1


In [25]:
# convert text columns to integer in pandas dataframe
df_visa["code"] = pd.to_numeric(df_visa["code"])
df_mode["code"] = pd.to_numeric(df_mode["code"])

#### Remove Duplicated Data

In [26]:
rows_count = df_spark.count()
print("Number of immigration rows: {}".format(rows_count))

# removing duplicated rows
print("Removing duplicated rows")
df_spark = df_spark.drop_duplicates()
rows_count_current = df_spark.count()
print("Number of rows removed: {}".format(rows_count - rows_count_current))

Number of immigration rows: 2917199
Removing duplicated rows
Number of rows removed: 0


#### Saving Data
Saving data to parquet files and csv files.
All files will be saved on AWS S3.

**S3 Bucket: 
<br>
s3a://udacity-capstone-2020-3/parquets/capstone_final**
<br>
**s3a://udacity-capstone-2020-3/csv_files/**

**note**: A local copy of parquet files can be found under folder **/capstone** 

In [31]:
#write immigration data to parquet
output_data = "s3a://udacity-capstone-2020-3/parquets/"
df_spark.write.parquet(output_data + "capstone_final")

In [30]:
#write data to csv
output_data = "s3a://udacity-capstone-2020-3/"
fs = s3fs.S3FileSystem(key=os.environ['AWS_ACCESS_KEY_ID'], secret=os.environ['AWS_SECRET_ACCESS_KEY'])

bytes_to_write = df_country.to_csv(header=None, index=None).encode()
with fs.open(output_data + "csv_files/countries.csv", 'wb') as f:
   f.write(bytes_to_write)

bytes_to_write = df_state.to_csv(header=None, index=None).encode()
with fs.open(output_data + "csv_files/states.csv", 'wb') as f:
   f.write(bytes_to_write)

bytes_to_write = df_port.to_csv(header=None, index=None).encode()
with fs.open(output_data + "csv_files/entry_ports.csv", 'wb') as f:
   f.write(bytes_to_write)

bytes_to_write = df_airport.to_csv(header=None, index=None).encode()
with fs.open(output_data + "csv_files/airports.csv", 'wb') as f:
   f.write(bytes_to_write)

bytes_to_write = df_country.to_csv(header=None, index=None).encode()
with fs.open(output_data + "csv_files/countries.csv", 'wb') as f:
   f.write(bytes_to_write)

bytes_to_write = df_visa.to_csv(header=None, index=None).encode()
with fs.open(output_data + "csv_files/visa.csv", 'wb') as f:
   f.write(bytes_to_write)

bytes_to_write = df_mode.to_csv(header=None, index=None).encode()
with fs.open(output_data + "csv_files/travel_modes.csv", 'wb') as f:
   f.write(bytes_to_write)

### Data Model
#### Conceptual Data Model and Data Dictionary

The final data model of this project is designed as a Star schema, because it can be easily undestood and it also provide good query performance.
<br>
The data model contains the tables below:

#### Staging tables:

![title](staging.png)


   * staging_i94
       * cic_id - ID from data source table. Is was kept as primary for immigration table. 
           * **Source**: Immigration Data Source
       * year - immigration year. 
           * **Source**: Immigration Data Source
       * month - immigration month. 
           * **Source**: Immigration Data Source
       * country_code - country of origin. 
           * **Source**: Immigration Data Source
       * us_entry_port - entry port to US. 
           * **Source**: Immigration Data Source
       * arrival_date - date of arrival in US. 
           * **Source**: Immigration Data Source
       * travel_mode - mode of travel: Air, sea, land or not reported. 
           * **Source**: Immigration Data Source
       * us_state - state of stay in the US. 
           * **Source**: Immigration Data Source
       * departure_date - date of departure fom the US. 
           * **Source**: Immigration Data Source
       * age -  age of non-immigrant - 
           * **Source**: Immigration Data Source
       * visa_code - visa category: Business, Pleasure or Student. 
           * **Source**: Immigration Data Source
       * gender - gender of non-immigrant. 
           * **Source**: Immigration Data Source
       * adm_num - admission number. 
           * **Source**: Immigration Data Source
       * visa_type - class of admission. 
           * **Source**: Immigration Data Source
### ====================================================               
   * staging_airports
       * ident - airport code
           * **Source**: Aiports data source  
       * airport_type - airport type: heliport, small_airport, closed, seaplane_base, balloonport, medium_airport, large_airport
           * **Source**: Aiports data source 
       * airport_name - name of airport
           * **Source**: Aiports data source        
       * elevation_ft - airport elevation
           * **Source**: Aiports data source        
       * continent - continent airport is located
           * **Source**: Aiports data source       
       * iso_country - country airport is located
           * **Source**: Aiports data source 
       * iso_region - region(or state) airport is located
           * **Source**: Aiports data source        
       * municipality - city of airport
           * **Source**: Aiports data source        
       * gps_code - gps code
            * **Source**: Aiports data source 
       * iata_code - three-letter geocode designating many airports and metropolitan areas around the world, defined by the International Air Transport Association (IATA). 
            * **Source**: Aiports data source      
       * local_code - local code
            * **Source**: Aiports data source       
       * coordinates - airport coordinates
            * **Source**: Aiports data source 
### ====================================================              
   * staging_countries
       * code - country code
           * **Source**: Immigration data dictionary
       * descrip - country name
           * **Source**: Immigration data dictionary    
### ====================================================             
   * staging_states
       * code - state code
            * **Source**: Immigration data dictionary
       * descrip - state name
            * **Source**: Immigration data dictionary
### ====================================================              
   * staging_ports
       * code - entry port code
           * **Source**: Immigration data dictionary       
       * descrip - entry port name
           * **Source**: Immigration data dictionary       
### ====================================================             
   * staging_visa
       * code - visa code
           * **Source**: Immigration data dictionary       
       * descrip - visa name
           * **Source**: Immigration data dictionary       
### ====================================================             
   * staging_travel_mode
       * code - travel mode code
           * **Source**: Immigration data dictionary       
       * descrip - travel mode name
           * **Source**: Immigration data dictionary       
       
#### Dimension e Fact tables

![title](dimension_fact.png)
   * immigrations - fact table
       * cic_id - ID from data source table. Is was kept as primary for immigration table. 
           * **Source**: Immigration Data Source
       * airport_id - airport code
           * **Source**: Aiports data source   
       * admission_id - admission number. 
           * **Source**: Immigration Data Source
       * arrival_date - date of arrival in US. 
           * **Source**: Immigration Data Source  
       * departure_date - date of departure fom the US. 
           * **Source**: Immigration Data Source   
       * travel_mode - mode of travel
           * **Source**: Immigration data dictionary          
       * year - immigration year. 
           * **Source**: Immigration Data Source
       * month - immigration month. 
           * **Source**: Immigration Data Source
       * country - country of origin. 
           * **Source**: Immigration Data Source
       * us_entry_port - entry port to US. 
           * **Source**: Immigration Data Source
       * gender - gender of non-immigrant. 
           * **Source**: Immigration Data Source
       * visa_code - visa category: Business, Pleasure or Student. 
           * **Source**: Immigration data dictionary 
       * visa_type - class of admission. 
           * **Source**: Immigration Data Source 
       * admission_state - state of stay in the US. 
           * **Source**: Immigration Data Source
       * age -  age of non-immigrant
           * **Source**: Immigration Data Source
### ====================================================  
   * airport - dimension table
       * airport_id - airport code
           * **Source**: Aiports data source  
       * airport_type - airport type
           * **Source**: Aiports data source 
       * airport_name - name of airport
           * **Source**: Aiports data source        
       * elevation_ft - airport elevation
           * **Source**: Aiports data source        
       * continent - continent airport is located
           * **Source**: Aiports data source       
       * iso_country - country airport is located
           * **Source**: Aiports data source 
       * iso_region - region(or state) airport is located
           * **Source**: Aiports data source        
       * municipality - city of airport
           * **Source**: Aiports data source        
       * gps_code - gps code
            * **Source**: Aiports data source 
       * iata_code - three-letter geocode designating many airports and metropolitan areas around the world, defined by the International Air Transport Association (IATA). 
            * **Source**: Aiports data source      
       * local_code - local code
            * **Source**: Aiports data source       
       * coordinates - airport coordinates
            * **Source**: Aiports data source  

### ====================================================  
   * time - dimension table

       * start_time - time as timestamp
            * **Source**: extracted from arrival_date and departure_date     
       * hour - hour
            * **Source**: calculated from start_time       
       * day - day of month
            * **Source**: calculated from start_time       
       * week - week number
            * **Source**: calculated from start_time       
       * month - month
            * **Source**: calculated from start_time       
       * year - year
            * **Source**: calculated from start_time       
       * weekday - day of week
            * **Source**: calculated from start_time       

#### Data Model creation

All tables were created using the script **create_tables.sql**, which is located under folder **/airflow**.

### Data Pipelines

The Data Pipeline was built with Apache Airflow, according to the diagram below:

![title](airflow1.png)

#### Data Pipeline Steps

The data pipeline follow the steps below:

1. Begin Execution
    * This step starts the data pipeline
2. Load staging tables
    * This step loads data into all staging tables. This is made by the use of the command COPY
3. Load immigration fact table
    * This step loads data into the fact table immigration. This is made by the use of a SQL query
4. Load dimension tables
    * This step loads data into the fact table immigration. This is made by the use of SQL queries
5. Run data quality checks
    * This step checks data quality. More information is described in the next section
6. Stop Execution
    * This step ends the data pipeline

#### Data Quality Checks
Data quality checks described below are performed during data pipeline:

 1. Count check
     * The size of tables from the final data model are checked in order to identify potential problems
 2. Integrity constraints
     * All tables from the final data model have a unique key (primary key constraint) in order to ensure data integrity and avoid duplicated data.

All data pipeline source code can be found at folder **/airflow**.

### Project Write Up

#### Tools:

Python, Pandas, Spark and SQL were the main tools in this project to read, clean, process, and create tables.
Considering that the amount of data per month and year provided by US National Tourism and Trade Office is is constantly growing, the architecture designed for this project supports scalability by the use of AWS tools.

#### Data Update
Considering that data provided by US National Tourism and Trade Office is released once a month, the steps described in this project should run **monthly**.

#### Data Increase Scenario

Considering that this project already supports scalability by the use of AWS S3, AWS Redshift and Apache Airflow:
  * **If the data was increased by 100x:**
    * AWS S3 parquets could be partitioned by US state
    * AWS EMR could be used for parallel processing    

#### Data availability Scenario

 * **If the data populates a dashboard that must be updated on a daily basis by 7am every day:**
   * The script could be changed to process only new data, instead of processing all input files everytime it is runned
   * Another database (mirror) could be created in order to ensure that data will be always available.

#### Data concurrency Scenario
   * **The database needed to be accessed by 100+ people**
     * AWS Redshift configuration could be improved in order to increase performance.

### Summary

This project was implemented by the use of tools and techniques learned in the Data Enginner Nanodegree.