
## Overview
The purpose of this data engineering capstone project is to give students a chance to combine what they've learned throughout the program. This project will be an important part of learners portfolio that will help to achieve data engineering-related career goals. We could choose to complete the project provided by the Udacity team or define the scope and data ourselves. I took the first approach in building the DW on the data on immigration to the United States provided by Udacity.

## Scenario
Modernization of corporations' data warehousing infrastructure by improving performance and ease of use for end users, enhancing functionality, decreasing total cost of ownership while making it possible for real-time decision making. In total, our full suite of services includes helping enterprises with data profiling, data standardization, data acquisition, data transformation and integration.A business consulting firm specialized in data warehouse services through assisting the enterprises with navigating their data needs and creating strategic operational solutions that deliver tangible business results.

. We aim to model and create a brand new analytics solution on top of the state-of-the-art technolgies available to enable them to unleash insights from data then providing better customer experiences when coming to the US.

## Structure of the Project
Following the Udacity guide for this project, we structured this documentation with steps below:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

## Step 1: Scope the Project and Gather Data

_Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc_

### The Scope 
The main deliverable of our work here will be a data warehouse in the cloud that will support answering questions through analytics tables and dashboards. Additionally, as we developed a general source-of-truth database, the Government of the US could open the solution through a web API so backend web services could query the warehouse for information relating to international visitors.

### The Data
For this work we have used the immigration, the global temperature and demographics datasets as well as the descriptions contained in the `I94_SAS_Labels_Descriptions.SAS` file.

### The Architecture
The whole solution is cloud based on top of __Amazon Web Services (AWS)__. First, all the datasets were preprocessed with __Apache Spark__ and stored in a staging area in __AWS S3__ bucket. Then, we loaded those to a __Amazon Redshift__ cluster using an __Apache Airflow__ pipeline that transfer and check the quality of the data to finally provide our customers a data mart for their convenient analysis.

![Architecture](images/architecture.png)

The main information and questions a user may want to extract from the data mart would be:

* Visitors by nationality.
* Visitors by origin.
* Visitors by airline.
* Correlations between destination in the U.S and the source country.
* Correlations between destination in the U.S and source climates.
* Correlations between immigration by source region, and the source region temperature.
* Correlations between visitor demographics, and states visited.

***

## Step 2: Explore and Assess the Data

_To familiarize ourselves with the data provided by Udacity we have done an exhaustive exploratory data analysis ([EDA](https://en.wikipedia.org/wiki/Exploratory_data_analysis)) checking what data would be useful and what preprocessing steps we should take in order to clean, organize and join the various datasets in a meaningful data model._

In the following sections we briefly describe the datasets provided and give a summarized idea on the reasons we took into consideration when deciding what data to use.

__Immigration Data__

For decades, U.S. immigration officers issued the I-94 Form (Arrival/Departure Record) to foreign visitors (e.g., business visitors, tourists and foreign students) who lawfully entered the United States. The I-94 was a small white paper form that a foreign visitor received from cabin crews on arrival flights and from U.S. Customs and Border Protection at the time of entry into the United States. It listed the traveler's immigration category, port of entry, data of entry into the United States, status expiration date and had a unique 11-digit identifying number assigned to it. Its purpose was to record the traveler's lawful admission to the United States.

This is the main dataset and there is a file for each month of the year of 2016 available in the directory `../../data/18-83510-I94-Data-2016/` in the [SAS](https://www.sas.com/en_us/home.html) binary database storage format `sas7bdat`. Combined, the 12 datasets have got more than 40 million rows (40.790.529) and 28 columns. For most of the work we used only the month of April of 2016 which has more than three million records (3.096.313).

In [1]:
# Importing the libraries needed in this project
import os
import pandas as pd
from datetime import datetime

In [2]:
import os
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql import SparkSession
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
spark = SparkSession.builder.\
config("spark.jars","saurfang:spark-sas7bdat:3.0.0-s_2.12")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.load('./sas_data')
print("Completed, ran successfully")

Completed, ran successfully


In [3]:
immigration = spark.read.parquet("sas_data")
print(immigration.count())
immigration.limit(10).toPandas()

3096313


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1
5,5748522.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20579.0,...,,M,1959.0,10292016,M,,NZ,94981800000.0,10,B2
6,5748523.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,...,,M,1950.0,10292016,F,,NZ,94979690000.0,10,B2
7,5748524.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,...,,M,1975.0,10292016,F,,NZ,94979750000.0,10,B2
8,5748525.0,2016.0,4.0,245.0,464.0,HOU,20574.0,1.0,FL,20581.0,...,,M,1989.0,10292016,M,,NZ,94973250000.0,28,B2
9,5748526.0,2016.0,4.0,245.0,464.0,LOS,20574.0,1.0,CA,20581.0,...,,M,1990.0,10292016,F,,NZ,95013550000.0,2,B2


In [4]:
immigration.head()

Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')

__Data Dictionary__: Below is the description for various fields of the dataset. 

| Column Name | Description |
| :--- | :--- |
| CICID* | ID that uniquely identify one record in the dataset |
| I94YR | 4 digit year |
| I94MON | Numeric month |
| I94CIT | 3 digit code of source city for immigration (Born country) |
| I94RES | 3 digit code of source country for immigration (Residence country) |
| I94PORT | Port addmitted through |
| ARRDATE | Arrival date in the USA |
| I94MODE | Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported) |
| I94ADDR | State of arrival |
| DEPDATE | Departure date |
| I94BIR | Age of Respondent in Years |
| I94VISA | Visa codes collapsed into three categories: (1 = Business; 2 = Pleasure; 3 = Student) |
| COUNT | Used for summary statistics |
| DTADFILE | Character Date Field |
| VISAPOST | Department of State where where Visa was issued |
| OCCUP | Occupation that will be performed in U.S. |
| ENTDEPA | Arrival Flag. Whether admitted or paroled into the US |
| ENTDEPD | Departure Flag. Whether departed, lost visa, or deceased |
| ENTDEPU | Update Flag. Update of visa, either apprehended, overstayed, or updated to PR |
| MATFLAG | Match flag |
| BIRYEAR | 4 digit year of birth |
| DTADDTO | Character date field to when admitted in the US |
| GENDER | Gender |
| INSNUM | INS number |
| AIRLINE | Airline used to arrive in U.S. |
| ADMNUM | Admission number, should be unique and not nullable |
| FLTNO | Flight number of Airline used to arrive in U.S. |
| VISATYPE | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |

The immigration dataset is our fact so that will be at the center of the star schema model of our data warehouse.

__Global Temperature Data__

* World Temperature Data: This dataset came from Kaggle found [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

In the original dataset from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data), several files are available but in this capstone project we will be using only the `GlobalLandTemperaturesByCity`.

In [5]:
temperature_fname = '../../data2/GlobalLandTemperaturesByCity.csv'
world_temperature = pd.read_csv(temperature_fname)

In [6]:
world_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


__Data Dictionary__

| Column Name | Description |
| :--- | :--- |
| dt | Date in format YYYY-MM-DD |
| AverageTemperature | Average temperature of the city in a given date |
| City | City Name |
| Country | Country Name |
| Latitude | Latitude |
| Longitude | Longitude |

The immigration dataset only has data of the US National Tourism Office in the year of 2016, the vast majority of the data here seems not to be suitable. We decided to aggregate this dataset by country, averaging the temperatures and use this reduced table to join with `lookup\I94CIT_I94RES.csv` lookup table (extracted from `I94_SAS_Labels_Descriptions.SAS`) resulting in the COUNTRY dimension of our model.


In [7]:
world_temperature = world_temperature.groupby(["Country"]).agg({"AverageTemperature": "mean", 
                                                                        "Latitude": "first", "Longitude": "first"}).reset_index()

In [8]:
world_temperature.head()

Unnamed: 0,Country,AverageTemperature,Latitude,Longitude
0,Afghanistan,13.816497,36.17N,69.61E
1,Albania,15.525828,40.99N,19.17E
2,Algeria,17.763206,36.17N,3.98E
3,Angola,21.759716,12.05S,13.15E
4,Argentina,16.999216,39.38S,62.43W


__Airports Data__

`airport-codes.csv` contains the list of all airport codes, the attributes are identified in datapackage description. Some of the columns contain attributes identifying airport locations, other codes (IATA, local if exist) that are relevant to identification of an airport.
Original source url is http://ourairports.com/data/airports.csv (stored in archive/data.csv).

In [9]:
airport = pd.read_csv("airport-codes_csv.csv")

In [10]:
airport.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


__Data Dictionary__

| Column Name | Description |
| :--- | :--- |
| ident | Unique identifier |
| type | Type of the airport |
| name | Airport Name |
| elevation_ft | Altitude of the airport |
| continent | Continent |
| iso_country | ISO code of the country of the airport |
| iso_region | ISO code for the region of the airport |
| municipality | City where the airport is located |
| gps_code | GPS code of the airport |
| iata_code | IATA code of the airport |
| local_code | Local code of the airport |
| coordinates | GPS coordinates of the airport |

We are not using the airport dataset in our model.We will stick to Global temperature data. Airport data did not prove to be a good source of analysis once we were not able to join this to the main table immigration. No valid and consistent key in both tables in order to cross them. None of the codes (ident, gps_code, iata_code or local_code) seemed to match the columns in the immigration fact table.

__U.S. City Demographic Data__

This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey.

In [11]:
us_cities_demographics = pd.read_csv("us-cities-demographics.csv", sep=";")

In [12]:
us_cities_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


__Data Dictionary__

| Column Name | Description |
| :--- | :--- |
| City | Name of the city |
| State | US state of the city |
| Median Age | The median of the age of the population |
| Male Population | Number of the male population |
| Female Population | Number of the female population |
| Total Population | Number of the total population |
| Number of Veterans | Number of veterans living in the city |
| Foreign-born | Number of residents of the city that were not born in the city |
| Average Household Size | Average size of the houses in the city |
| State Code | Code of the state of the city |
| Race | Race class |
| Count | Number of individual of each race |

The `US Cities Demographics` is the source of the STATE dimension in our data model. We aggregated the dataset by State and pivoted the `Race` and `Count` columns in order to make each different value of Race to be a column. That way we create a complete table of statistics that summarizes the information for every US state.

## Step 3: Define the Data Model

_In this section of the documentation we detail the process of extract, transform and load the data from the various datasets. As me mentioned before, we are using 3 of the 4 data sources provided by the Udacity team: immigration, temperatures and demographics. Also, we extract descriptions from labels descriptions file `I94_SAS_Labels_Descriptions.SAS`_

#### 3.1 Conceptual Data Model
_Map out the conceptual data model and explain why you chose that model_

The data model consists of tables immigration, us_cities_demographics, airport_codes, world_temperature, i94cit_res, i94port, i94mode, i94addr, i94visa

The immigration dataset is the origin of the center of our model. As this represent the facts of what we want to analyse - U.S visitors from the world -, this was transformed to the fact table IMMIGRATION as represented in the schema below. We gave this data most of the focus during our modeling phase. The immigration dataset is also the data source for the DATE dimension table. We extracted all the distinct values of the columns arrdate and depdate and applied various functions to store in the table a number of attributes of a particular date: day, month, year, week of year and day of week.

![Star-Schema](images/star-schema.PNG)

The STATE dimension table is the result of the aggregation of the demographics dataset by the State column. Median Age, Male Population, Female Population, Total Population, Number of Veterans, Foreign-born were first aggregated by `City` using `first` function, since they are repeated accross the different rows of the same city. Then, we grouped the resulting rows by `State` applying the `sum` function in the numeric columns to make a cosolidated total in each U.S State. We needed to transform the column `Race` in order to make its different values to become different columns. We achieve this by usig the pivot function of the `pyspark` package. As a result we reached to a final structure where we have got the columns (BlackOrAfricanAmerican, White, ForeignBorn, AmericanIndianAndAlaskaNative, HispanicOrLatino, Asian, NumberVeterans, FemalePopulation, MalePopulation, TotalPopulation) for each of the states of the U.S.

The COUNTRY dimention completes our star schema model. To get to the structure we see in the figure above we combined the `GlobalLandTemperaturesByCity` with the code-descriptions found in the file `I94_SAS_Labels_Descriptions.SAS` for the columns `i94cit` and `i94res` showed in the image below.
Firstly, we extracted the key-value pairs from the `I94_SAS_Labels_Descriptions.SAS` and saved those in csv files in the `lookup` directory. Following we aggregated the temperature dataset by `City` and then by `Country`. Finally, we join the two intermediary results to form the table COUNTRY. 

![i94cit](images/i94cit.PNG)

#### 3.2 Mapping Out Data Pipelines
_List the steps necessary to pipeline the data into the chosen data model_


To accomplish all the tasks related to the preprocessing of the datasets it was developed a number of functions in a package we called `helper.etl`. There you will find different helper functions to load, select, clean, transform and store the resultind datasets in a very convenient way. The open-source framework Apache Spark was the main tool in this journey. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance.

We concentrated all the logic of preprocessing there in order to only represent here the general steps of the ETL. This notebook here is only for document purposes whereas the actual run of the ETL takes place in the Spark in cloud-native big data platform [Amazon EMR](https://aws.amazon.com/emr/?nc1=h_ls) through the execution of the main function of the `etl` package. 

In [1]:
import pandas as pd
import os, re
import configparser
from datetime import timedelta, datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, when, lower, isnull, year, month, dayofmonth, hour, weekofyear, dayofweek, date_format, to_date
from pyspark.sql.types import StructField, StructType, IntegerType, DoubleType, LongType

In [2]:
# The AWS key id and password are configured in a configuration file "dl.cfg"
config = configparser.ConfigParser()
config.read('dl.cfg')
# Reads and saves the AWS access key information and saves them in a environment variable
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
# OUTPUT = config['ETL']['OUTPUT_DATA']

In [3]:
import os
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql import SparkSession
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
spark = SparkSession.builder.\
config("spark.jars","saurfang:spark-sas7bdat:3.0.0-s_2.12")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.load('./sas_data')
print("Completed, ran successfully")

Completed, ran successfully


In [4]:
# Read US Cities Demo dataset file
demographics=spark.read.csv("us-cities-demographics.csv", sep=';', header=True)

In [5]:
demographics.count()

2891

In [6]:
# Print Schema to verify that all the columns are in "string" format
demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [7]:
def cast_type(df, cols):
    """
    Convert the types of the columns according to the configuration supplied in the cols dictionary in the format {"column_name": type}
    
    Args:
        df (:obj:`SparkDataFrame`): Spark dataframe to be processed. 
            Represents the entry point to programming Spark with the Dataset and DataFrame API.
        cols (:obj:`dict`): Dictionary in the format of {"column_name": type} indicating what columns and types they should be converted to
    """
    for k,v in cols.items():
        if k in df.columns:
            df = df.withColumn(k, df[k].cast(v))
    return df

In [8]:
# Convert numeric columns to the proper types: Integer and Double
int_cols = ['Count', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born']
float_cols = ['Median Age', 'Average Household Size']
demographics = cast_type(demographics, dict(zip(int_cols, len(int_cols)*[IntegerType()])))
demographics = cast_type(demographics, dict(zip(float_cols, len(float_cols)*[DoubleType()])))

In [9]:
# demographics.printSchema()
    
first_agg = {"Median Age": "first", "Male Population": "first", "Female Population": "first", 
            "Total Population": "first", "Number of Veterans": "first", "Foreign-born": "first", "Average Household Size": "first"}

# First aggregation - City
agg_df = demographics.groupby(["City", "State", "State Code"]).agg(first_agg)

# Pivot Table to transform values of the column Race to different columns
piv_df = demographics.groupBy(["City", "State", "State Code"]).pivot("Race").sum("Count")

In [10]:
agg_df.head()

Row(City='Rockville', State='Maryland', State Code='MD', first(Total Population)=66998, first(Female Population)=35793, first(Median Age)=38.1, first(Number of Veterans)=1990, first(Foreign-born)=25047, first(Male Population)=31205, first(Average Household Size)=2.6)

In [11]:
piv_df.head()

Row(City='Delray Beach', State='Florida', State Code='FL', American Indian and Alaska Native=None, Asian=1696, Black or African-American=21138, Hispanic or Latino=6397, White=40980)

In [12]:
# Rename column names removing the spaces to avoid problems when saving to disk (we got errors when trying to save column names with spaces)
demographics = agg_df.join(other=piv_df, on=["City", "State", "State Code"], how="inner")\
    .withColumnRenamed('State Code', 'StateCode')\
    .withColumnRenamed('first(Total Population)', 'TotalPopulation')\
    .withColumnRenamed('first(Female Population)', 'FemalePopulation')\
    .withColumnRenamed('first(Male Population)', 'MalePopulation')\
    .withColumnRenamed('first(Median Age)', 'MedianAge')\
    .withColumnRenamed('first(Number of Veterans)', 'NumberVeterans')\
    .withColumnRenamed('first(Foreign-born)', 'ForeignBorn')\
    .withColumnRenamed('first(Average Household Size)', 'AverageHouseholdSize')\
    .withColumnRenamed('Hispanic or Latino', 'HispanicOrLatino')\
    .withColumnRenamed('Black or African-American', 'BlackOrAfricanAmerican')\
    .withColumnRenamed('American Indian and Alaska Native', 'AmericanIndianAndAlaskaNative')

In [13]:
demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- StateCode: string (nullable = true)
 |-- TotalPopulation: integer (nullable = true)
 |-- FemalePopulation: integer (nullable = true)
 |-- MedianAge: double (nullable = true)
 |-- NumberVeterans: integer (nullable = true)
 |-- ForeignBorn: integer (nullable = true)
 |-- MalePopulation: integer (nullable = true)
 |-- AverageHouseholdSize: double (nullable = true)
 |-- AmericanIndianAndAlaskaNative: long (nullable = true)
 |-- Asian: long (nullable = true)
 |-- BlackOrAfricanAmerican: long (nullable = true)
 |-- HispanicOrLatino: long (nullable = true)
 |-- White: long (nullable = true)



In [14]:
numeric_cols = ['TotalPopulation', 'FemalePopulation', 'MedianAge', 'NumberVeterans', 'ForeignBorn', 'MalePopulation', 
                'AverageHouseholdSize', 'AmericanIndianAndAlaskaNative', 'Asian', 'BlackOrAfricanAmerican', 'HispanicOrLatino', 'White']

# Fill the null values with 0
demographics = demographics.fillna(0, numeric_cols)

In [15]:
demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- StateCode: string (nullable = true)
 |-- TotalPopulation: integer (nullable = true)
 |-- FemalePopulation: integer (nullable = true)
 |-- MedianAge: double (nullable = false)
 |-- NumberVeterans: integer (nullable = true)
 |-- ForeignBorn: integer (nullable = true)
 |-- MalePopulation: integer (nullable = true)
 |-- AverageHouseholdSize: double (nullable = false)
 |-- AmericanIndianAndAlaskaNative: long (nullable = true)
 |-- Asian: long (nullable = true)
 |-- BlackOrAfricanAmerican: long (nullable = true)
 |-- HispanicOrLatino: long (nullable = true)
 |-- White: long (nullable = true)



In [16]:
# Now write (and overwrite) transformed `demographics` dataset onto parquet file
# demographics.write.mode('overwrite').parquet("s3a://udacitycapstone-us-immigration/us_cities_demographics.parquet")
demographics.write.mode('overwrite').parquet("us_cities_demographics.parquet")

In [17]:

# Read i94 immigration dataset
immigration=spark.read.parquet("sas_data")

In [18]:
immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [19]:
int_cols = ['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 
        'arrdate', 'i94mode', 'i94bir', 'i94visa', 'count', 'biryear', 'dtadfile', 'depdate']
    
date_cols = ['arrdate', 'depdate']
    
high_null = ["visapost", "occup", "entdepu", "insnum"]
not_useful_cols = ["count", "entdepa", "entdepd", "matflag", "dtaddto", "biryear", "admnum"]

In [20]:
# Convert columns read as string/double to integer
immigration = cast_type(immigration, dict(zip(int_cols, len(int_cols)*[IntegerType()])))

In [21]:
# The date format string preferred to our work here: YYYY-MM-DD
date_format = "%Y-%m-%d"
convert_sas_udf = udf(lambda x: x if x is None else (timedelta(days=x) + datetime(1960, 1, 1)).strftime(date_format))

In [22]:
def convert_sas_date(df, cols):
    """
    Convert dates in the SAS datatype to a date in a string format YYYY-MM-DD
    
    Args:
        df (:obj:`SparkDataFrame`): Spark dataframe to be processed. 
            Represents the entry point to programming Spark with the Dataset and DataFrame API.
        cols (:obj:`list`): List of columns in the SAS date format to be convert
    """
    for c in [c for c in cols if c in df.columns]:
        df = df.withColumn(c, convert_sas_udf(df[c]))
    return df

In [23]:
# Convert SAS date to a meaningful string date in the format of YYYY-MM-DD
immigration = convert_sas_date(immigration, date_cols)
    
# Drop high null columns and not useful columns
immigration = immigration.drop(*high_null)
immigration = immigration.drop(*not_useful_cols)

In [24]:
def date_diff(date1, date2):
    '''
    Calculates the difference in days between two dates
    '''
    if date2 is None:
        return None
    else:
        a = datetime.strptime(date1, date_format)
        b = datetime.strptime(date2, date_format)
        delta = b - a
        return delta.days

In [25]:
date_diff_udf = udf(date_diff)

In [26]:
# Create a new columns to store the length of the visitor stay in the US
immigration = immigration.withColumn('stay', date_diff_udf(immigration.arrdate, immigration.depdate))
immigration = cast_type(immigration, {'stay': IntegerType()})

In [27]:
immigration.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- dtadfile: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- stay: integer (nullable = true)



In [28]:
immigration.write.mode("overwrite").parquet('immigration.parquet')

In [29]:
# Start processing the I9I94_SAS_Labels_Description.SAS to create master i94 code dimensions:

'''
/* I94MODE - There are missing values as well as not reported (9) */
	1 = 'Air'
	2 = 'Sea'
	3 = 'Land'
	9 = 'Not reported' ;
'''
# Create i94mode list
i94mode_data =[[1,'Air'],[2,'Sea'],[3,'Land'],[9,'Not reported']]

# Convert to spark dataframe
i94mode=spark.createDataFrame(i94mode_data)

In [30]:
i94mode.printSchema()

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)



In [31]:
def etl_demographics_data(spark, input_path="us-cities-demographics.csv", output_path="out/demographics.parquet", 
                         input_format = "csv", columns='*',
                          load_size = None, partitionBy = ["State Code"], header=True, sep=";", **options):
    """
    Reads the demographics dataset indicated in the input_path, performs the ETL process and saves it in the output path indicated by the parameter out_put path.
    
    Args:
        spark (:obj:`SparkSession`): Spark session. 
            Represents the entry point to programming Spark with the Dataset and DataFrame API.
        input_path (:obj:`str`): Directory where to find the input files.
        output_path (:obj:`str`): Directory where to save immigration output files.
        input_format (:obj:`str`): Type of the input files. Default to "csv" (comma-separated value).
        columns (:obj:`list`): List of the columns names to read in. Useful when only some columns are useful.
        load_size (int): Number of rows to read for debug purposes.
        partitionBy (:obj:`list`): Files will be saved in partitions using the columns of this list.
        header: (bool): Uses the first line as names of columns. If None is set, it uses the default value, false.
        options: All other string options.
    """
    # Loads the demographics dataframe using Spark
    demographics = read_data(spark, input_path=input_path, input_format=input_format, 
                            columns=columns, debug_size = load_size, header=header, sep=sep, **options)
    
    # Convert numeric columns to the proper types: Integer and Double
    int_cols = ['Count', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born']
    float_cols = ['Median Age', 'Average Household Size']
    demographics = cast_type(demographics, dict(zip(int_cols, len(int_cols)*[IntegerType()])))
    demographics = cast_type(demographics, dict(zip(float_cols, len(float_cols)*[DoubleType()])))
    
    first_agg = {"Median Age": "first", "Male Population": "first", "Female Population": "first", 
                 "Total Population": "first", "Number of Veterans": "first", "Foreign-born": "first", "Average Household Size": "first"}
    # First aggregation - City
    agg_df = demographics.groupby(["City", "State", "State Code"]).agg(first_agg)
    # Pivot Table to transform values of the column Race to different columns
    piv_df = demographics.groupBy(["City", "State", "State Code"]).pivot("Race").sum("Count")
    
    # Rename column names removing the spaces to avoid problems when saving to disk (we got errors when trying to save column names with spaces)
    demographics = agg_df.join(other=piv_df, on=["City", "State", "State Code"], how="inner")\
    .withColumnRenamed('first(Total Population)', 'TotalPopulation')\
    .withColumnRenamed('first(Female Population)', 'FemalePopulation')\
    .withColumnRenamed('first(Male Population)', 'MalePopulation')\
    .withColumnRenamed('first(Median Age)', 'MedianAge')\
    .withColumnRenamed('first(Number of Veterans)', 'NumberVeterans')\
    .withColumnRenamed('first(Foreign-born)', 'ForeignBorn')\
    .withColumnRenamed('first(Average Household Size)', 'AverageHouseholdSize')\
    .withColumnRenamed('Hispanic or Latino', 'HispanicOrLatino')\
    .withColumnRenamed('Black or African-American', 'BlackOrAfricanAmerican')\
    .withColumnRenamed('American Indian and Alaska Native', 'AmericanIndianAndAlaskaNative')
    
    numeric_cols = ['TotalPopulation', 'FemalePopulation', 'MedianAge', 'NumberVeterans', 'ForeignBorn', 'MalePopulation', 'AverageHouseholdSize',
                    'AmericanIndianAndAlaskaNative', 'Asian', 'BlackOrAfricanAmerican', 'HispanicOrLatino', 'White']
    # Fill the null values with 0
    demographics = demographics.fillna(0, numeric_cols)
    
    # Save the demographics dataset to the output_path
    if output_path is not None:
        save(df=demographics, output_path=output_path, partitionBy = partitionBy)
    
    return demographics

def etl_states_data(spark, output_path="out/state.parquet"):
    cols = ['TotalPopulation', 'FemalePopulation', 'MalePopulation', 'NumberVeterans', 'ForeignBorn', 
            'AmericanIndianAndAlaskaNative', 'Asian', 'BlackOrAfricanAmerican', 'HispanicOrLatino', 'White']
    """
    Reads the states dataset indicated in the input_path, performs the ETL process and saves it in the output path indicated by the parameter out_put path.
    
    Args:
        spark (:obj:`SparkSession`): Spark session. 
            Represents the entry point to programming Spark with the Dataset and DataFrame API.
        output_path (:obj:`str`): Directory where to save immigration output files.
    """
    # Loads the demographics dataframe using Spark
    demographics = etl_demographics_data(spark, output_path=None)
    # Aggregates the dataset by State
    states = demographics.groupby(["State Code", "State"]).agg(dict(zip(cols, len(cols)*["sum"])))
    # Loads the lookup table I94ADDR
    addr = read_data(spark, input_path="lookup/I94ADDR.csv", input_format="csv", columns="*", header=True)\
    .withColumnRenamed('State', 'State Original')
    
    # Join the two datasets
    addr = addr.join(states, states["State Code"] == addr.Code, "left")
    addr = addr.withColumn("State", when(isnull(addr["State"]), capitalize_udf(addr['State Original'])).otherwise(addr["State"]))
    addr = addr.drop('State Original', 'State Code')
    
    cols = ['sum(BlackOrAfricanAmerican)', 'sum(White)', 'sum(AmericanIndianAndAlaskaNative)',
            'sum(HispanicOrLatino)', 'sum(Asian)', 'sum(NumberVeterans)', 'sum(ForeignBorn)', 'sum(FemalePopulation)', 
            'sum(MalePopulation)', 'sum(TotalPopulation)']
    
    # Rename the columns to modify default names returned when Spark aggregates the values of the columns.
    # For example: column 'sum(MalePopulation)' becomes 'MalePopulation'
    mapping = dict(zip(cols, [re.search(r'\((.*?)\)', c).group(1) for c in cols]))
    addr = rename_columns(addr, mapping)
    
    # Save the resulting dataset to the output_path
    if output_path is not None:
        save(df=addr, output_path=output_path)
    return addr
    
def etl_countries_data(spark, input_path="../../data2/GlobalLandTemperaturesByCity.csv", output_path="out/country.parquet", 
                         input_format = "csv", columns = '*', load_size = None, header=True, **options):
    """
    Reads the global temperatures dataset indicated in the input_path and transform it to generate the country dataframe. Performs the ETL process and saves it in the output path indicated by the parameter out_put path.
    
    Args:
        spark (:obj:`SparkSession`): Spark session. 
            Represents the entry point to programming Spark with the Dataset and DataFrame API.
        input_path (:obj:`str`): Directory where to find the input files.
        output_path (:obj:`str`): Directory where to save immigration output files.
        input_format (:obj:`str`): Type of the input files. Default to "csv" (comma-separated value).
        columns (:obj:`list`): List of the columns names to read in. Useful when only some columns are useful.
        load_size (int): Number of rows to read for debug purposes.
        header: (bool): Uses the first line as names of columns. If None is set, it uses the default value, false.
        options: All other string options.
    """
    # Loads the demographics dataframe using Spark
    countries = read_data(spark, input_path=input_path, input_format=input_format, 
                            columns=columns, debug_size = load_size, header=header, **options)
    # Aggregates the dataset by Country and rename the name of new columns
    countries = countries.groupby(["Country"]).agg({"AverageTemperature": "avg", "Latitude": "first", "Longitude": "first"})\
    .withColumnRenamed('avg(AverageTemperature)', 'Temperature')\
    .withColumnRenamed('first(Latitude)', 'Latitude')\
    .withColumnRenamed('first(Longitude)', 'Longitude')
    
    # Rename specific country names to match the I94CIT_I94RES lookup table when joining them
    change_countries = [("Country", "Congo (Democratic Republic Of The)", "Congo"), ("Country", "Côte D'Ivoire", "Ivory Coast")]
    countries = change_field_value_condition(countries, change_countries)
    countries = countries.withColumn('Country_Lower', lower(countries.Country))
    
    # Rename specific country names to match the demographics dataset when joining them
    change_res = [("I94CTRY", "BOSNIA-HERZEGOVINA", "BOSNIA AND HERZEGOVINA"), 
                  ("I94CTRY", "INVALID: CANADA", "CANADA"),
                  ("I94CTRY", "CHINA, PRC", "CHINA"),
                  ("I94CTRY", "GUINEA-BISSAU", "GUINEA BISSAU"),
                  ("I94CTRY", "INVALID: PUERTO RICO", "PUERTO RICO"),
                  ("I94CTRY", "INVALID: UNITED STATES", "UNITED STATES")]
    
    # Loads the lookup table I94CIT_I94RES
    res = read_data(spark, input_path="lookup/I94CIT_I94RES.csv", input_format=input_format, columns="*",
                          debug_size = load_size, header=header, **options)
    res = cast_type(res, {"Code": IntegerType()})
    res = change_field_value_condition(res, change_res)
    res = res.withColumn('Country_Lower', lower(res.I94CTRY))
    # Join the two datasets to create the country dimmension table
    res = res.join(countries, res.Country_Lower == countries.Country_Lower, how="left")
    res = res.withColumn("Country", when(isnull(res["Country"]), capitalize_udf(res.I94CTRY)).otherwise(res["Country"]))   
    res = res.drop("I94CTRY", "Country_Lower")
    
    # Save the resulting dataset to the output_path
    if output_path is not None:
        save(df=res, output_path=output_path)
    return res

def cast_type(df, cols):
    """
    Convert the types of the columns according to the configuration supplied in the cols dictionary in the format {"column_name": type}
    
    Args:
        df (:obj:`SparkDataFrame`): Spark dataframe to be processed. 
            Represents the entry point to programming Spark with the Dataset and DataFrame API.
        cols (:obj:`dict`): Dictionary in the format of {"column_name": type} indicating what columns and types they should be converted to
    """
    for k,v in cols.items():
        if k in df.columns:
            df = df.withColumn(k, df[k].cast(v))
    return df

def convert_sas_date(df, cols):
    """
    Convert dates in the SAS datatype to a date in a string format YYYY-MM-DD
    
    Args:
        df (:obj:`SparkDataFrame`): Spark dataframe to be processed. 
            Represents the entry point to programming Spark with the Dataset and DataFrame API.
        cols (:obj:`list`): List of columns in the SAS date format to be convert
    """
    for c in [c for c in cols if c in df.columns]:
        df = df.withColumn(c, convert_sas_udf(df[c]))
    return df

def change_field_value_condition(df, change_list):
    '''
    Helper function used to rename column values based on condition.
    
    Args:
        df (:obj:`SparkDataFrame`): Spark dataframe to be processed.
        change_list (:obj: `list`): List of tuples in the format (field, old value, new value)
    '''
    for field, old, new in change_list:
        df = df.withColumn(field, when(df[field] == old, new).otherwise(df[field]))
    return df

def rename_columns(df, mapping):
    '''
    Rename the columns of the dataset based in the mapping dictionary
    
    Args:
        df (:obj:`SparkDataFrame`): Spark dataframe to be processed.
        mapping (:obj: `dict`): Mapping dictionary in the format {old_name: new_name}
    '''
    df = df.select([col(c).alias(mapping.get(c, c)) for c in df.columns])
    return df

def date_diff(date1, date2):
    '''
    Calculates the difference in days between two dates
    '''
    if date2 is None:
        return None
    else:
        a = datetime.strptime(date1, date_format)
        b = datetime.strptime(date2, date_format)
        delta = b - a
        return delta.days

# User defined functions using Spark udf wrapper function to convert SAS dates into string dates in the format YYYY-MM-DD, to capitalize the first letters of the string and to calculate the difference between two dates in days.
convert_sas_udf = udf(lambda x: x if x is None else (timedelta(days=x) + datetime(1960, 1, 1)).strftime(date_format))
capitalize_udf = udf(lambda x: x if x is None else x.title())
date_diff_udf = udf(date_diff)

In [32]:
def create_spark_session():
    """
    This function creates a session with Spark, the entry point to programming Spark with the Dataset and DataFrame API.
    """
    spark = SparkSession.builder.config("spark.jars.packages",
                                        "saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.0")\
    .enableHiveSupport().getOrCreate()
    return spark

def read_data(spark, input_path, input_format = "csv", columns = '*', debug_size = None, **options):
    """
    Loads data from a data source using the pyspark module and returns it as a spark 'DataFrame'.
    
    Args:
        spark (:obj:`SparkSession`): Spark session. 
            Represents the entry point to programming Spark with the Dataset and DataFrame API.
        input_path (:obj:`str`): Directory where to find the input files.
        input_format (:obj:`str`): Optional string for format of the data source. Default to 'csv'.
        columns (:obj:`list`): List of columns of the dataframe to return. Default to "*", which means 'all columns'.
        debug_size (int): Define the number of rows to read for debug purposes. The default value None means 'all rows'.
        options: All other string options.
    """
    if debug_size is None:
        df = spark.read.load(input_path, format=input_format, **options).select(columns)
    else:
        df = spark.read.load(input_path, format=input_format, **options).select(columns).limit(debug_size)
    return df

def save(df, output_path, mode = "overwrite", output_format = "parquet", columns = '*', partitionBy=None, **options):
    """
    Saves the contents of the DataFrame to a data source.
    The data source is specified by the format and a set of options. If format is not specified, 'parquet' will be used.
    
    Args:
        df (:obj:`DataFrame`): Spark DataFrame.
        output_path (:obj:`str`): The path in a Hadoop supported file system where the DataFrame contentes will be saved.
        mode (:obj:`str`): Specifies the behavior of the save operation when data already exists. Default to 'overwrite'.
        output_format (:obj:`str`): Optional string for format of the data source to be saved. Default to 'parquet'.
        columns (:obj:`list`): List of columns of the dataframe to save. Default to "*", which means 'all columns'.
        partitionBy (:obj:`list`): Names of partitioning columns. The default value None means 'no partitions'.
        options: All other string options.
    """

    df.select(columns).write.save(output_path, mode= mode, format=output_format, partitionBy = partitionBy, **options)

In [33]:
def etl_immigration_data(spark, input_path="immigration_data_sample.csv", output_path="out/immigration.parquet",
                         date_output_path="out/date.parquet",
                         input_format = "csv", columns = ['i94addr', 'i94mon','cicid','i94visa','i94res','arrdate','i94yr','depdate',
                                                          'airline', 'fltno', 'i94mode', 'i94port', 'visatype', 'gender', 
                                                          'i94cit', 'i94bir'], 
                         load_size = None, partitionBy = ["i94yr", "i94mon"], columns_to_save='*', header=True, **options):
    """
    Reads the immigration dataset indicated in the input_path, performs the ETL process and saves it in the output path indicated by the parameter 
    out_put path.
    
    Args:
        spark (:obj:`SparkSession`): Spark session. 
            Represents the entry point to programming Spark with the Dataset and DataFrame API.
        input_path (:obj:`str`): Directory where to find the input files.
        output_path (:obj:`str`): Directory where to save immigration output files.
        date_output_path (:obj:`str`): Directory where to save date output files.
        input_format (:obj:`str`): Type of the input files. Default to "csv" (comma-separated value).
        columns (:obj:`list`): List of the columns names to read in. Useful when only some columns are useful.
        load_size (int): Number of rows to read for debug purposes.
        partitionBy (:obj:`list`): Files will be saved in partitions using the columns of this list.
        columns_to_save (:obj:`list`): Define what columns will be saved.
        header: (bool): Uses the first line as names of columns. If None is set, it uses the default value, false.
        options: All other string options.
    """
    
    # Loads the immigration dataframe using Spark
    # We discard the columns ['admnum', 'biryear', 'count', 'dtaddto', 'dtadfile', 'entdepa', 'entdepd', 'entdepu', 'insnum', 'matflag', 'occup', 'visapost'] as they seemed not to be very useful for our goals.
    # Some of them were very unclear of what they really represent.
    immigration = read_data(spark, input_path=input_path, input_format=input_format, 
                            columns=columns, debug_size = load_size, header=header, **options)
    
    int_cols = ['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 
        'arrdate', 'i94mode', 'i94bir', 'i94visa', 'count', 'biryear', 'dtadfile', 'depdate']
    
    date_cols = ['arrdate', 'depdate']
    
    high_null = ["visapost", "occup", "entdepu", "insnum"]
    not_useful_cols = ["count", "entdepa", "entdepd", "matflag", "dtaddto", "biryear", "admnum"]
    
    # Convert columns read as string/double to integer
    immigration = cast_type(immigration, dict(zip(int_cols, len(int_cols)*[IntegerType()])))
    
    # Convert SAS date to a meaningful string date in the format of YYYY-MM-DD
    immigration = convert_sas_date(immigration, date_cols)
    
    # Drop high null columns and not useful columns
    immigration = immigration.drop(*high_null)
    immigration = immigration.drop(*not_useful_cols)
    
    # Create a new columns to store the length of the visitor stay in the US
    immigration = immigration.withColumn('stay', date_diff_udf(immigration.arrdate, immigration.depdate))
    immigration = cast_type(immigration, {'stay': IntegerType()})
    
    # Generate DATE dataframe and save it to the output_path indicated as parameter of the function
    if date_output_path is not None:
        arrdate = immigration.select('arrdate').distinct()
        depdate = immigration.select('depdate').distinct()
        dates = arrdate.union(depdate)
        dates = dates.withColumn("date", to_date(dates.arrdate, date_format))
        dates = dates.withColumn("year", year(dates.date))
        dates = dates.withColumn("month", month(dates.date))
        dates = dates.withColumn("day", dayofmonth(dates.date))
        dates = dates.withColumn("weekofyear", weekofyear(dates.date))
        dates = dates.withColumn("dayofweek", dayofweek(dates.date))
        dates = dates.drop("date").withColumnRenamed('arrdate', 'date')
        save(df=dates.select("date", "year", "month", "day", "weekofyear", "dayofweek"), output_path=date_output_path)
    
    # Save the processed immigration dataset to the output_path
    if output_path is not None:
        save(df=immigration.select(columns_to_save), output_path=output_path, partitionBy = partitionBy)
    return immigration

In [34]:
  # Perform ETL process for the Immigration dataset generating immigration and date tables and save them in the S3 bucket indicated in the output_path parameters.
immigration = etl_immigration_data(spark, input_path='../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat',
                                     output_path="s3a://data--engineer-capstone/immigration.parquet",
                                     date_output_path="s3a://data-engineer-capstone/date.parquet",
                                     input_format = "com.github.saurfang.sas.spark", 
                                     load_size=1000, partitionBy=None, 
                                     columns_to_save = 'sparkprojdata')
# Perform ETL process for the Country table. Generating the Country table and saving it in the S3 bucket indicated in the output_path parameter.
countries = e.etl_countries_data(spark, output_path=e.OUTPUT + "country.parquet")
# Perform ETL process for the State table. Generating the State table and saving it in the S3 bucket indicated in the output_path parameter.
states = e.etl_states_data(spark, output_path=e.OUTPUT + "state.parquet")

#### Countries dataset
For country dataset it starts by loading the data global temperature dataset along with I94CIT_I94RES lookup table and is completed by generating and the storing of the processed dataframe to a bucket in Amazon S3.The following tasks are performed :
* Loading of the csv file of the global temperature and I94CIT_I94RES lookup table;
* Aggregation of the temperatures dataset by country and rename new columns;
* Join the two datasets;
* Save the resulting dataset to the staging area in Amazon S3;

#### States dataset
The generation of the states dataset starts by loading the data in demographics dataset as well as I94ADDR lookup table and is completed by generating and the storing of the processed dataframe to a bucket in Amazon S3. In summary, the following tasks are performed throughout the process:
* Loading of the csv file of the demographics and I94ADDR lookup table;
* Aggregation of the demographics dataset by state and rename new columns;
* Join the two datasets;
* Save the resulting dataset to the staging area in Amazon S3;

<img src="images/etl_state.png" alt="etl_state" width="400"/>

The parquet files are saved in the S3 bucket in the AWS and are used to load the tables of the same name in the Amazon Redshift. We create the schema by running the SQL script found in `sql/create_tables.sql`. From there, our model is ready to be explored by the customers whether through open query editor in Redshift.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
_Build the data pipelines to create the data model._

Pipeline is divided into two stages. The first, where we used spark to load, extracted, transform and store the provided datasets into the AWS S3 staging area. The second stage is [Apache Airflow](https://airflow.apache.org/) to build a DAG to extract data from S3 and load them into tables of the same name in Amazon Redshift. As a final step we check the data counting checking to ensure completeness.

<img src="images/architecture.png" alt="architecture" width="800"/>

Below we show the pipeline of the second stage we developed using Apache Airflow.

<img src="images/dag.PNG" alt="dag" width="800"/>

The code to build the Airflow pipeline is located in the folder `airflow`. There you will find the code of the DAG itself (file `capstone.py` inside folder `dags`) as well as the two custom operators built for this capstone project in folder `plugins/operators`: `stage_redshift.py` and `data_quality.py`.

The custom operator `StageToRedshiftOperator` was designed to load data in [parquet](https://parquet.apache.org/) format from S3 buckets in AWS and insert the content into a table in AWS Redshift. That operator is customizable to work with different buckets and with different tables by input parameters. Then it is used in our DAG to load to Redshift both fact and dimension tables.

#### 4.2 Data Quality Checks

First, we load the `IMMIGRATION` fact table through the step `Immigration_Fact_Table`, which is followed by the steps to load the dimension tables `STATE`, `DATE` and `COUNTRY`, respectively `State_Dimension_Table`, `Date_Dimension_Table`, `Country_Dimension_Table` steps. All the tables have a primary key constraint that uniquely identify the records and in the fact table there are foriegn key that guarantee that values in the fact are present in the dimension tables.

After completing the loading process, we perform a data quality check through the step `Data_Quality_Checks` to make sure everything was OK. In this check we verify if every table was actually loaded with count check in all the tables of the model.

In [13]:
import numpy as np
import pandas as pd
df_immig_sample = pd.read_csv('immigration_data_sample.csv')

In [14]:
df_immig_sample.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [15]:
pd.set_option('display.max_columns', 50)
df_immig_sample.head(10)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT
5,721257,1481650.0,2016.0,4.0,577.0,577.0,ATL,20552.0,1.0,GA,20606.0,51.0,2.0,1.0,20160408,,,T,N,,M,1965.0,10072016,M,,DL,736852600.0,910,B2
6,1072780,2197173.0,2016.0,4.0,245.0,245.0,SFR,20556.0,1.0,CA,20635.0,48.0,2.0,1.0,20160412,,,T,O,,M,1968.0,10112016,F,,CX,786312200.0,870,B2
7,112205,232708.0,2016.0,4.0,113.0,135.0,NYC,20546.0,1.0,NY,20554.0,33.0,2.0,1.0,20160402,,,G,O,,M,1983.0,6302016,F,,BA,55474490000.0,00117,WT
8,2577162,5227851.0,2016.0,4.0,131.0,131.0,CHI,20572.0,1.0,IL,20575.0,39.0,2.0,1.0,20160428,,,O,O,,M,1977.0,7262016,,,LX,59413420000.0,00008,WT
9,10930,13213.0,2016.0,4.0,116.0,116.0,LOS,20545.0,1.0,CA,20553.0,35.0,2.0,1.0,20160401,,,O,O,,M,1981.0,6292016,,,AA,55449790000.0,00109,WT


#### 4.3 Data dictionary 
_Create a data dictionary for our data model. For each field, provided a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file._


__Table Immigration__

| Column Name | Description |
| :--- | :--- |
| CICID | Primary Key |
| I94YR | Year |
| I94MON | Month |
| I94CIT | 3 digit for the country code where the visitor was born. This is a FK to the COUNTRY dimension table |
| I94RES | 3 digit for the country code where the visitor resides in. This is a FK to the COUNTRY dimension table |
| ARRDATE | Arrival date in the USA. This is a FK to the DATE dimension table |
| I94MODE | Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported) |
| I94ADDR | State of arrival. This is a FK to the STATE dimension table |
| DEPDATE | Departure date from the USA. This is a FK to the DATE dimension table |
| I94BIR | Age of Respondent in Years |
| I94VISA | Visa codes collapsed into three categories: (1 = Business; 2 = Pleasure; 3 = Student) |
| BIRYEAR | 4 digit year of birth |
| GENDER | Gender |
| AIRLINE | Airline used to arrive in U.S. |
| FLTNO | Flight number of Airline used to arrive in U.S. |
| VISATYPE | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |
| STAY | Number of days in the US |


__Table STATE__

| Column Name | Description |
| :--- | :--- |
| Code | Primary Key. This is the code of the State as in I94ADDR lookup table |
| State | Name of the state |
| BlackOrAfricanAmerican | Number of residents of the race Black Or African American |
| White | Number of residents of the race White |
| ForeignBorn | Number of residents that born outside th United States |
| AmericanIndianAndAlaskaNative | Number of residents of the race American Indian And Alaska Native |
| HispanicOrLatino | Number of residents of the race Hispanic Or Latino |
| Asian | Number of residents of the race Asian |
| NumberVeterans | Number of residents that are war veterans |
| FemalePopulation | Number of female population |
| MalePopulation | Number of male population |
| TotalPopulation | Number total of the population |


__Table COUNTRY__

| Column Name | Description |
| :--- | :--- |
| Code | Country Code. This is the PK. |
| Country | Country Name |
| Temperature | Average temperature of the country between 1743 and 2013 |
| Latitude | GPS Latitude |
| Longitude | GPS Longitude |


__Table DATE__

| Column Name | Description |
| :--- | :--- |
| date | Date in the format YYYY-MM-DD. This is the PK. |
| day | Two digit day |
| month | Two digit month |
| year | Four digit for the year |
| weekofyear | The week of the year |
| dayofweek | The day of the week |

#### Step 5: Complete Project Write Up
__Clearly state the rationale for the choice of tools and technologies for the project.__

The  solution is implemented on cloud computing technology, AWS in particular. Because  cloud computing provides a low-cost, scalable, and highly reliable infrastructure platform in the cloud this is a natural choice for every new solution like we did here. Every service we use (S3, EMR, Redshift) has reasonable cost and is ‘pay as you go’ pricing. So we can start small and scale as our solution grows. No up-front costs involved.

In particular, why we use the following services:

__S3:__ Provides a relatively cheap, easy-to-use with scalability, high availability, security, and performance. This seems to be perfect to a staging area like our solution here;
__EMR:__ This is a cloud-native big data platform, allowing teams to process vast amounts of data quickly, and cost-effectively at scale using Spark. EMR is easy to use, secure, elastic and low-cost. Perfect to our project;

_Redshift:__ A natural and logical choice since we based all the solution in the cloud in AWS. Redshift provides a massively parallel, column-oriented data warehouse that provides easy-scale functionality. The main analytical tools have native interface to load from Redshift.

__Spark:__ This is simply the best framework for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance. Most of our team are pythonians and Spark has a very convenient API for python programmers to use;


__Propose how often the data should be updated and why__

Since we receive one file per month it seems reasonable to update the model monthly.

__Write a description of how you would approach the problem differently under the following scenarios:__

 * The data was increased by 100x:

Scaling the whole pipeline shouldn't be a problem. Since the solution is on Amazon cloud, that is easily scalable, the only thing we would need to do is the number of increase the number of nodes of the clusters in EMR should be increased to hadle more data. Also, Amazon Redshift is a data warehouse;
 
* The data populates a dashboard that must be updated on a daily basis by 7am every day.

The runnig interval of the Airflow DAG could be changed to daily and scheduled to run overnight to make the data available y 7am.
 
* The database needed to be accessed by 100+ people.
 
With Redshift we can make use of the feature "elastic resize" that enables us to add or remove nodes in an Amazon Redshift cluster in minutes. This further increases the agility to get better performance and more storage for demanding workloads, and to reduce cost during periods of low demand.