# Data Wrehouse: US Visitors 
__Provide insights to the officials' decision-making towards optimizing the experience of the visitors in the US__
***
## Project overview
The project is the last project og the Data Engineering Nanodegree in Udacity. The aim of the capstone project is to provide the students of the Nanodegree a chance to demonstrate the knowledge gained throught the courses of the program. This final project will be a critical part of students portfolio that will help to achieve data engineering-related career goals. There were two options available to complete this project one is the data provided by the Udacity team or the student need to describe the data and scope ourselves.  I choose the data provided by Udacity to build the Data Warehouse. The aforementioned data are on immigration to the United States and are provided by the US National Tourism and Trade Office. For more informations please visit https://travel.trade.gov/research/reports/i94/historical/2016.html

## Business Scenario
As a Data Engineer in the InsightData, a data oriented tech firm specialized in data warehouse services. We provided exclusive assistance to enterprises navigating their data needs and we create smart and innovative strategic solutions that deliver optimized business results. In total, our full suite of services includes modernization of businesses data warehousing infrastructure, improvement of the performance and ease of use for the end users of the DW, decrease of cost, helping with data profiling and standardization, data acquisition, transformation and integration.

The U.S. Customs and Border Protection hired us as contractors to provide them with hidden insight of the data flood. Our goal is to model and create a new and smart analytics solution on top of the available state-of-the-art technolgies to open all the information of their data so they can providee the optimum customer experiences to the US visitors.

## Structure of the Project
Following the Udacity guide for this capstone, we structured the documentation of the project with the follwoing steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

## Step 1: Scope the Project and Gather Data

_Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc_

### The Scope 
The main goal of our project here will be to deliver a data warehouse in the cloud that will support the analytics team by answering questions through tables and dashboards. Moreover, we aim to built a source-of-truth database, so the Government of the US could open our smart solution through a web API so  they can query the DW for information relating to international  US visitors by backend web services.

### The Data
For this project we have used:
- the immigration
- the global temperature 
- demographics datasets and 
- the descriptions contained in the `I94_SAS_Labels_Descriptions.SAS` file.

### The Architecture
We provided a cloud based solution on top of __Amazon Web Services (AWS)__. First, we preprossed all the datasets provided with __Apache Spark__ and afterwards we stored the processed data in a staging area in __AWS S3__ bucket. Next, we loaded the staged data to a __Amazon Redshift__ cluster using an __Apache Airflow__ pipeline that transfer and check the quality of the data aiming to provide our customers a convenient data analysis.

![Architecture](images/architecture.png)

The main information that a user would like to extract from the data would be:

* Visitors by nationality.
* Visitors by airline.
* Visitors by origin.
* Correlations between destination in the U.S and the source country.
* Correlations between visitor demographics, and states visited.
* Correlations between immigration by source region, and the source region temperature.
* Correlations between destination in the U.S and source climates.

***

## Step 2: Explore and Assess the Data

_In order to get  familiar with the data provided by Udacity we performed a thorough exploratory data analysis ([EDA](https://en.wikipedia.org/wiki/Exploratory_data_analysis)) investigating the usefulness of the data, the potential preprocessing steps in order to clean, organize and join the provided datasets in a meaningful data model._

We briefly describe the datasets provided in the following sections and we summarize the main reasons we had to consider when deciding what data we will use.

__Immigration Data__

For the last decades, U.S. immigration officers asked every foreign visitor who lawfully entered the United States (e.g.foreign students, business visitors and tourists) to submit the I-94 Form (Arrival/Departure Record). The cabin crew on arrival flights provided the foreign visitor with the I-94 was a small white paper and from the U.S. Customs and Border Protection at the time of entry into the United States. The form had a unique 11-digit identifying number assigned to it and listed port of entry, data of entry into the United States, the traveler's immigration category, the immigration status expiration date. The main purpose of that list was to record the visitor's lawful admission to the United States.

The main dataset consists of one file for each month of the year of 2016 and it's available in the directory `../../data/18-83510-I94-Data-2016/` in the [SAS](https://www.sas.com/en_us/home.html) binary database storage format `sas7bdat`. Due to the massive ammount of data we had when we combined the 12 datasets (e.g. more than 40 million rows (40.790.529) and 28 columns), we used only the month of April of 2016 in our project which pproximately three million records (3.096.313).

In [1]:
# Importing the libraries needed in this project
import os
import pandas as pd
from datetime import datetime

In [3]:
immigration_fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration = pd.read_sas(immigration_fname, 'sas7bdat', encoding="ISO-8859-1")

In [4]:
immigration.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


__Data Dictionary__: Here, we  provide a description on the fields of the dataset. Some assumptions had to be made about the meaning descriptions as there were not clear enough. 

| Column Name | Description |
| :--- | :--- |
| CICID* | Unique ID that identifies one record in the dataset |
| I94YR | 4 digit numeric year |
| I94MON | Numeric month |
| I94CIT | 3 digit code of source city for immigration (Born country) |
| I94RES | 3 digit code of source country for immigration (Residence country) |
| I94PORT | Port addmitted through |
| ARRDATE | Arrival date in the USA |
| I94MODE | Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported) |
| I94ADDR | State of arrival |
| DEPDATE | Departure date |
| I94BIR | Age of Respondent in Years |
| I94VISA | Visa codes collapsed into three categories: (1 = Business; 2 = Pleasure; 3 = Student) |
| COUNT | Used for summary statistics |
| DTADFILE | Character Date Field |
| VISAPOST | Department of State where where Visa was issued |
| OCCUP | Occupation that will be performed in U.S. |
| ENTDEPA | Arrival Flag. Whether admitted or paroled into the US |
| ENTDEPD | Departure Flag. Whether departed, lost visa, or deceased |
| ENTDEPU | Update Flag. Update of visa, either apprehended, overstayed, or updated to PR |
| MATFLAG | Match flag |
| BIRYEAR | 4 digit year of birth |
| DTADDTO | Character date field to when admitted in the US |
| GENDER | Gender |
| INSNUM | INS number |
| AIRLINE | Airline used to arrive in U.S. |
| ADMNUM | Admission number, should be unique and not nullable |
| FLTNO | Flight number of Airline used to arrive in U.S. |
| VISATYPE | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |

The immigration dataset will be our fact meaning it will be at the center of the star schema model of our data warehouse.

__Global Temperature Data__

The three most cited ocean and land temperature data sets provided to public are NOAA’s MLOST, NASA’s GISTEMP and the UK’s HadCrut.

The Berkeley Earth is an organization affiliated with Lawrence Berkeley National Laboratory and recently has repackaged the data from a newer compilation and merged it all together. They have combined 1.6 billion temperature reports from 16 pre-existing archives. Their nicely packaged data allow the user to slice it into interesting subsets (for instance by country).  Their methodology allows weather observations from shorter periods to be included, so fewer observations are neededto be excluded. Finally, they provide the source code and data for all the transformations they have applied.

The original dataset can be downoladed from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data),and  even though several files are available, in our  capstone project we usedonly the `GlobalLandTemperaturesByCity`.

In [5]:
temperature_fname = '../../data2/GlobalLandTemperaturesByCity.csv'
global_temperature = pd.read_csv(temperature_fname)

In [6]:
global_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


__Data Dictionary__

| Column Name | Description |
| :--- | :--- |
| dt | Date in format YYYY-MM-DD |
| AverageTemperature | Average temperature of the city in a given date |
| City | City Name |
| Country | Country Name |
| Latitude | Latitude |
| Longitude | Longitude |

As we can see, a long period of the world's temperature is provided at the dataset specifically from the year 1743 to the year 2013. However, we have been privided with the immigration dataset that contains data data of the US National Tourism Office only for the year of 2016, so the majority of the temperature data here seems not to be necessary. Afterwards, we aggregated this dataset by country, reported the average of the temperatures and use the resulting reduced table to join with lookup\I94CIT_I94RES.csv lookup table, which was extracted from I94_SAS_Labels_Descriptions.SAS. The result of our preprocess is the COUNTRY dimension of our model.

In [7]:
global_temperature = global_temperature.groupby(["Country"]).agg({"AverageTemperature": "mean", 
                                                                        "Latitude": "first", "Longitude": "first"}).reset_index()

In [8]:
global_temperature.head()

Unnamed: 0,Country,AverageTemperature,Latitude,Longitude
0,Afghanistan,13.816497,36.17N,69.61E
1,Albania,15.525828,40.99N,19.17E
2,Algeria,17.763206,36.17N,3.98E
3,Angola,21.759716,12.05S,13.15E
4,Argentina,16.999216,39.38S,62.43W


__Airports Data__

The airport codes may refer to  either the [ICAO](https://en.wikipedia.org/wiki/ICAO_airport_code) airport code, a four letter code used by ATC systems and mainly for airports that do not have an IATA airport code (from wikipedia) or ti the [IATA](https://en.wikipedia.org/wiki/IATA_airport_code) airport code, a three-letter code which is used in passengers tickets and reservation, as well as the baggage-handling systems.

We downoalde  the airports codes around the world from open source http://ourairports.com/data/ who collected this data from different sources.

The list of all the airport codes is saved in the `airport-codes.csv`. Every attribute is identified in datapackage description. The columns of the dataset contain attributes, codes (IATA, local if exist) that are relevant to identification of an airport.
The url of the original is http://ourairports.com/data/airports.csv.

In [9]:
airport = pd.read_csv("./sample_data/airport-codes_csv.csv")

In [10]:
airport.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


__Data Dictionary__

| Column Name | Description |
| :--- | :--- |
| ident | Unique identifier |
| type | Type of the airport |
| name | Airport Name |
| elevation_ft | Altitude of the airport |
| continent | Continent |
| iso_country | ISO code of the country of the airport |
| iso_region | ISO code for the region of the airport |
| municipality | City where the airport is located |
| gps_code | GPS code of the airport |
| iata_code | IATA code of the airport |
| local_code | Local code of the airport |
| coordinates | GPS coordinates of the airport |

As the aiport dataset has been proved a not so good source of analysis since we were not able to join it to the main table immigration, we decided not to use it in our model. None of the codes (ident, iata_code, gps_code or local_code) seemed to match the columns in the immigration fact table hence we did not find a consistent and valid key in both tables in order to cross them.

__U.S. City Demographic Data__

The following dataset contains demographics information about all the census-designated US cities with a population greater or equal to 65,000 citizents. This data are provided from the US Census Bureau's 2015 American Community Survey.

In [11]:
us_demographics = pd.read_csv("./sample_data/us-cities-demographics.csv", sep=";")

In [12]:
us_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


__Data Dictionary__

| Column Name | Description |
| :--- | :--- |
| City | Name of the city |
| State | US state of the city |
| Median Age | The median of the age of the population |
| Male Population | Number of the male population |
| Female Population | Number of the female population |
| Total Population | Number of the total population |
| Number of Veterans | Number of veterans living in the city |
| Foreign-born | Number of residents of the city that were not born in the city |
| Average Household Size | Average size of the houses in the city |
| State Code | Code of the state of the city |
| Race | Race class |
| Count | Number of individual of each race |

The main source of the STATE dimension table in our data model is the the `US Cities Demographics`. We aggregated the dataset by State and pivoted the `Count` and `Race`  columns as a result every different value of Race appears in a single column. A complete table of statistics has been created with the above method so all the information for every US state has been summarised successfully.

## Step 3: Define the Data Model

_In this section of the documentation we explaine the main process of our ETL pipeline, how we  extract, transform and load the data from the various datasets. Only 3-4 datasets provided by Udacity have been used in this project, namely: 
- immigration, 
- temperatures 
- demographics
- the descriptions from labels descriptions file `I94_SAS_Labels_Descriptions.SAS`_

#### 3.1 Conceptual Data Model
_Map out the conceptual data model and explain why you chose that model_

The center of our data moodel is essentially the immigration dataset. The main goal of this priject is to analyse the U.S visitors from the world and the immigration dataset is clearly the facts of what we want to analyse. We focused on this data mostly in our modeling phase. The DATE dimension table was constracted from the immigration dataset as well. We extracted all the distinct values of the arrival dates and departure dates of the columns `arrdate` and `depdate` and with the help of various functions we were able to save a the table of numerous attributes of a specific date: day, year,month, week of year and day of week.


The aggregation of the demographics dataset by the State column resulted in the STATE dimension table. First, we aggregate the Female Population, Male Population, Total Population, Median Age, Number of Veterans, Foreign-born by `City` using `first` function, as they are duplicated accross the different rows of the same city. 
Next, the resulting rows were grouped by `State` applying the `sum` function in the numeric columns so we can privide the total in each U.S State. We used the pivot function of the `pyspark` packag to transform the `Race` column and create different columns for each different value. The final structure consisted of the columns:
- BlackOrAfricanAmerican 
- White 
- ForeignBorn 
- AmericanIndianAndAlaskaNative
- HispanicOrLatino
- Asian
- NumberVeterans
- FemalePopulation
- MalePopulation
- TotalPopulation

for each of the states of the U.S.

The COUNTRY dimention completes our star schema data model. We combined the `GlobalLandTemperaturesByCity` with the code-descriptions found in the file `I94_SAS_Labels_Descriptions.SAS` for the columns `i94cit` and `i94res`. 

Firstly, we used the file `I94_SAS_Labels_Descriptions.SAS` to extracted the key-value pairs and saved those values in csv files in the `lookup` directory. Afterwards, we aggregated the temperature dataset first by `City` and then by `Country`. Finally, we join the two intermediary results to construct the table COUNTRY. 



#### 3.2 Mapping Out Data Pipelines
_List the steps necessary to pipeline the data into the chosen data model_

The package `helper.etl` contains all the main functions used to preprocess the datasets. There are different helper functions to load, clean, select, transform and save the resulting datasets in a very smart and convenient way. Our main tool was the open-source framework Apache Spark. Using Spark we have a strong easy to use interface for programming entire clusters with data parallelism and fault tolerance.

Here we present only the general steps of the ETL since all the logic of preprocessing is located in the `helper.etl` package. 

We use this notebook here is only for document purposes. The  actual run of the ETL pipeline takes place in the Spark clouddata platform [Amazon EMR](https://aws.amazon.com/emr/?nc1=h_ls) through the execution of the main functions of the `etl` package. All the functions are documented with docstrings alongside the code of the package in `helper/etl.py` file.

#### Immigration and Date datasets
The preprocessing of our  main dataset immigration starts by loading the data from the SAS file and afterwards we generate and store the processed dataframes to a S3 bucket in Amazon. In summary, our process consists of the following tasks:
* Load of the immigration SAS file into Spark dataframe. We only include the useful columns identified  in the EDA phase. Specifically we discarded the follouwing columns: 'admnum', 'biryear', 'count', 'dtaddto', 'dtadfile', 'entdepa', 'entdepd', 'entdepu', 'insnum', 'matflag', 'occup', 'visapost';
* We converted the columns of Integer type to the proper class, as the Spark framework loaded them as double or strings.
* The dates are stored in SAS date format in the immigration dataframe,  e.g. a value that represents the number of days between a specified date and January 1, 1960. We convert the dates in the dataframe to a string date format YYYY-MM-DD;
* We drop the columns "visapost", "occup", "entdepu" and "insnum" as they were missing a vast ammount of values;
* We created the `stay` column by calculating the difference in days between the departure (depdate) and arrival (arrdate) date of the US visitors. In this way, it will be easier for our users to analyse the lenght of the average stay of visitors and where they aim to stay longer;
* We created the DATE dataframe from thedate columns `arrdate` and `depdate` ;
* We saved the processed `immigration` and `date` dataframes in parquet format in the Amazon S3 bucket;

In [1]:
# import the ETL package
import os
import os, re
import configparser
import logging
from datetime import timedelta, datetime
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import udf, col, when, lower, isnull, year, month, dayofmonth, hour, weekofyear, dayofweek, date_format, to_date
from pyspark.sql.types import StructField, StructType, IntegerType, DoubleType
from pyspark.sql.types import *
from helper.etl import create_spark_session, process_immigration_data, \
process_temperature_data, process_airport_data, \
process_demographics_data, process_states_data, process_countries_data
from helper.etl import convert_SAS_date, load_data, save_data

In [2]:
# The configuration file "dl.cfg" includs the AWS key id and secret access key
config = configparser.ConfigParser()
config.read('./helper/dl.cfg')
#The AWS access key information are saved in  environment variables

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"



In [3]:
spark = create_spark_session()


In [4]:
# Perform ETL process for the Immigration dataset generating immigration and date tables and save them in the S3 bucket indicated in the output_path parameters.
immigration = process_immigration_data(spark, input_path='../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat',
                                           #output_path="s3a://udacity-data-engineer-capstone/immigration.parquet",
                                       output_path=None,
                                           date_output_path="s3a://udacity-data-engineer-capstone/date.parquet",
                                           input_format = "com.github.saurfang.sas.spark", 
                                           columns = ['i94addr', 'i94mon','cicid','i94visa','i94res','arrdate','i94yr','depdate',
                                                      'airline', 'fltno', 'i94mode', 'i94port', 'visatype', 'gender', 
                                                      'i94cit', 'i94bir'],
                                           debug_size=1000, partitionBy = ["i94yr", "i94mon"], saved_columns = '*', header=False)



#### Countries dataset
To generate the country dataset firts we need to load the data global temperature dataset as well as I94CIT_I94RES lookup table. Afterwards, we generated and stored of the processed dataframe to a S3 bucket in Amazon. In summary, we performed the following tasks :
* We loaded of the global temperature csv file and the I94CIT_I94RES lookup table;
* We aggregated the temperatures dataset by country and rename the new columns;
* We joined the two resalted datasets;
* We saved the wto dataset to the staging area in the S3 Amazon bucket;

In [None]:
countries = process_countries_data(spark, input_path="../../data2/GlobalLandTemperaturesByCity.csv",
                                   output_path="s3a://udacity-data-engineer-capstone/country.parquet",
                                   input_format = "csv", columns = '*', debug_size = None,
                                   header=True)

#### States dataset
We need to load the data in the demographics dataset so we can obtain the states codes, as well as the I94ADDR lookup table to generate the states dataset. Afterwards, to complete the generation we need to store the processed dataframe to a S3 bucket in Amazon. In summary, performed these steps in our  process:
* We run the process_demograohics_data functioon to optain the states codes.
* We loaded I94ADDR lookup table;
* We aggregated of the demographics dataset by state and renamesd the new columns;
* We joined the two datasets;
* We saved the resulting dataset to the staging area in the S3 bucket in Amazon;

In [5]:
demographics = process_demographics_data(spark, input_path="./sample_data/us-cities-demographics.csv",
                                          output_path="s3a://udacity-data-engineer-capstone/demographics.parquet",
                                          input_format = "csv", columns='*',
                                          debug_size = None, partitionBy = ["State Code"],
                                          header=True, sep=";")

In [8]:
cols = ['TotalPopulation', 'FemalePopulation', 'MalePopulation', 'NumberVeterans', 'ForeignBorn', 
            'AmericanIndianAndAlaskaNative', 'Asian', 'BlackOrAfricanAmerican', 'HispanicOrLatino', 'White']
states_codes = demographics.groupby(["State Code", "State"]).agg(dict(zip(cols, len(cols)*["sum"])))
states = process_states_data(spark, input_path="./lookup/lookup/I94ADDR.csv", output_path="s3a://udacity-data-engineer-capstone/states.parquet",
                             input_format = "csv", columns= ['TotalPopulation', 'FemalePopulation', 'MalePopulation',
                                                             'NumberVeterans', 'ForeignBorn',
                                                             'AmericanIndianAndAlaskaNative', 'Asian', 'BlackOrAfricanAmerican',
                                                             'HispanicOrLatino', 'White'],
                             debug_size = None, partitionBy = ["State Code"], states_codes = states_codes, 
                             header=True, sep=";")

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
_Build the data pipelines to create the data model._

The whole ETL pipeline is divided into two stages:
1) The first one, where we used Spark to load, extract, transform and save the provided datasets into the S3 bucket in AWS (staging area).
2) The second stage, where we used the [Apache Airflow](https://airflow.apache.org/) to build a DAG to extract data from S3 buckets and load them into tables with the same name in Amazon Redshift. Finally we perfomerd data qualitu checks to ensure completeness.



<img src="images/architecture.png" alt="architecture" width="800"/>

Here you can see the second stage we builted using Apache Airflow.

<img src="images/dag.PNG" alt="dag"  width="800"/>

The folder `airflow` contains the code to built the airflow pipeline. Inside the `dags` folder you will find the code of the DAG `udacity_capstone.py` as well as the three custom operators built folder `plugins/operators`: `create_tables.oy`, `stage_redshift.py` and `data_quality.py`.

The custom operator `CreateTablesOperator` was designed to create tables in the AWS Redshift. The custom operator `StageToRedshiftOperator` was designed to load the data in [parquet](https://parquet.apache.org/) format from the S3 buckets in AWS and insert the content of the data files into the created tables in AWS Redshift. 
#### 4.2 Data Quality Checks

After completing the loading steps, we perform some data quality checks with the help of the custom operator `DataQualityOperator` to make sure everything was sucessfull.

#### 4.3 Data dictionary 
_Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file._


__Table Immigration__

| Column Name | Description |
| :--- | :--- |
| CICID | Primary Key |
| I94CIT | 3 digit for the country code where the visitor was born. This is a FK to the COUNTRY dimension table |
| I94RES | 3 digit for the country code where the visitor resides in. This is a FK to the COUNTRY dimension table |
| ARRDATE | Arrival date in the USA. This is a FK to the DATE dimension table |
| I94MODE | Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported) |
| I94ADDR | State of arrival. This is a FK to the STATE dimension table |
| DEPDATE | Departure date from the USA. This is a FK to the DATE dimension table |
| I94BIR | Age of Respondent in Years |
| I94VISA | Visa codes collapsed into three categories: (1 = Business; 2 = Pleasure; 3 = Student) |
| BIRYEAR | 4 digit year of birth |
| GENDER | Gender |
| AIRLINE | Airline used to arrive in U.S. |
| FLTNO | Flight number of Airline used to arrive in U.S. |
| VISATYPE | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |
| STAY | Number of days in the US |


__Table STATE__

| Column Name | Description |
| :--- | :--- |
| Code | Primary Key. This is the code of the State as in I94ADDR lookup table |
| State | Name of the state |
| BlackOrAfricanAmerican | Number of residents of the race Black Or African American |
| White | Number of residents of the race White |
| ForeignBorn | Number of residents that born outside th United States |
| AmericanIndianAndAlaskaNative | Number of residents of the race American Indian And Alaska Native |
| HispanicOrLatino | Number of residents of the race Hispanic Or Latino |
| Asian | Number of residents of the race Asian |
| NumberVeterans | Number of residents that are war veterans |
| FemalePopulation | Number of female population |
| MalePopulation | Number of male population |
| TotalPopulation | Number total of the population |


__Table COUNTRY__

| Column Name | Description |
| :--- | :--- |
| Code | Country Code. This is the PK. |
| Country | Country Name |
| Temperature | Average temperature of the country between 1743 and 2013 |
| Latitude | GPS Latitude |
| Longitude | GPS Longitude |


__Table DATE__

| Column Name | Description |
| :--- | :--- |
| date | Date in the format YYYY-MM-DD. This is the PK. |
| day | Two digit day |
| month | Two digit month |
| year | Four digit for the year |
| weekofyear | The week of the year |
| dayofweek | The day of the week |

#### Step 5: Complete Project Write Up
__Clearly state the rationale for the choice of tools and technologies for the project.__

The whole project and our solution is launched on cloud computing technology, specifically the AWS. The cloud computing and AWS provides a reliable, easy to use, scalable and low-cost infrastructure platform in the cloud, the choise for every new solution is very simple. The cost of every service used was reasonable and it was ‘pay as you go’ pricing, a fact that help us start small and scale our solution as we grow.

In particular, why we use the following services:

__S3:__ Provides an easy-to-use, scalable and low cost staging area with optimum availability, security, and performance;

__Spark:__ Spark is a very strong framework for big data processing, with various built-in modules for streaming, SQL quering, data/graph processing and machine learning. An whole interface for programming clusters with implicit data parallelism and fault tolerance is provided by Spark.;

__EMR:__ EMR is a cloud big data platform, which allows team members to process huge ammounts of data in no time, and it is cost-effectively at scale using Spark. EMR is seacure, elastic, low cost and friendly for users. Ideal for our DW project;

__Redshift:__  Redshift provides a parallel, column-oriented data warehouse with easy-scale functionality, facts that made our choise very easy.


__Propose how often the data should be updated and why__

The data model will be updated montly since we receive one file per month. 


__Write a description of how you would approach the problem differently under the following scenarios:__

 * The data was increased by 100x:

Our whole solution is cloud based, specifically in AWS, so scalling the pipeline should not create any issues. 
By increasing the number of nodes of EMR clusters we can hadle more data, as the Amazon Redshift is a data warehouse that can expand to exabyte-scale;
 
* The data populates a dashboard that must be updated on a daily basis by 7am every day.

We can adjust the running interval of the Airflow DAG to `daily` and schedul the run overnight so we will be able to provide the data by 7am.
 
* The database needed to be accessed by 100+ people.

In Redshift, there is a usefull feature `elastic resize` which allows us to add or remove node in a cluster in Amazon Reshift within minutes. 
We can easily obtain extra storage and better performance according to our needs and the  demands of our workloads, and we can easilu reduce the cost during the periods where the demand is relatively low.