# Project 6 - Data Engineering Capstone Project
## Immigration, demographic and travel analysis

### Overview
The purpose of the data engineering capstone project is to give you a chance to combine what you've learned throughout the program. This project will be an important part of your portfolio that will help you achieve your data engineering-related career goals.

### Instructions

#### Step 1: Scope the Project and Gather Data
Since the scope of the project will be highly dependent on the data, these two things happen simultaneously. In this step, you’ll:

* Identify and gather the data you'll be using for your project (at least two sources and more than 1 million rows). See Project Resources for ideas of what data you can use.

* Explain what end use cases you'd like to prepare the data for (e.g., analytics table, app back-end, source-of-truth database, etc.)

#### Step 2: Explore and Assess the Data
* Explore the data to identify data quality issues, like missing values, duplicate data, etc.

* Document steps necessary to clean the data

#### Step 3: Define the Data Model
* Map out the conceptual data model and explain why you chose that model

* List the steps necessary to pipeline the data into the chosen data model

#### Step 4: Run ETL to Model the Data
* Create the data pipelines and the data model

* Include a data dictionary

* Run data quality checks to ensure the pipeline ran as expected

    * Integrity constraints on the relational database (e.g., unique key, data type, etc.)

    * Unit tests for the scripts to ensure they are doing the right thing

    * Source/count checks to ensure completeness

#### Step 5: Complete Project Write Up
* What's the goal? What queries will you want to run? How would Spark or Airflow be incorporated? Why did you choose the model you chose?

* Clearly state the rationale for the choice of tools and technologies for the project.

* Document the steps of the process.

* Propose how often the data should be updated and why.

* Post your write-up and final data model in a GitHub repo.

* Include a description of how you would approach the problem differently under the following scenarios:

    * If the data was increased by 100x.

    * If the pipelines were run on a daily basis by 7am.

    * If the database needed to be accessed by 100+ people.

#### Rubric
In the [Project Rubric](https://review.udacity.com/#!/rubrics/2497/view), you'll see more detail about the requirements. Use the rubric to assess your own project before you submit to Udacity for review. As with other projects, Udacity reviewers will use this rubric to assess your project and provide feedback. If your project does not meet specifications, you can make changes and resubmit.

In [192]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, date_add, to_date, expr
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType

### Step 1: Scope the Project and Gather Data

#### Scope 
This project aims to combine immigration, weather, demographic and airport data in order to derive some insights.
We work as a data engineer in a travel agency that wants to know, for example, to which US states people from different
countries and ages travel, what type of weather they prefer and demographics for each visited city. With this information, the agency
hopes to get a better idea about which travel packages to offer to which customers, for example, in order to maximize sales potential.

#### Datasets
The following datasets are used:

- __I94 Immigration Data__: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.
- __World Temperature Data__: This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).
- __U.S. City Demographic Data__: This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
- __Airport Code Table__: This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).

### Step 2: Explore and Assess the Data
#### Explore the Data 

To explore the data, pandas was used because it is easy to use and visualize data, and since we are just exploring and cleanning, and not doing heavy transformations, it is appropriate.

#### Cleaning Steps

For each of the datasets used, we check the columns for missing values, drop unnecessary columns and duplicates, and split columns when necessary to make data access easier.

##### Cleanning state weather data

In [34]:
# Exploring and cleanning airport data
state_weather = pd.read_csv('data/temperature_by_state.csv', sep=',')

# Droping NA entries
state_weather.dropna(inplace=True)

# Dropping columns that will not be used
state_weather.drop(['dt'], axis=1, inplace=True)
state_weather.drop(['AverageTemperatureUncertainty'], axis=1, inplace=True)

# Selecting only US states and dropping Country column
state_weather = state_weather[state_weather['Country'] == 'United States']
state_weather.drop(['Country'], axis=1, inplace=True)

# Renaming columns
state_weather_columns = {
    'AverageTemperature': 'average_temperature',
    'State': 'state'
}
state_weather.rename(columns=state_weather_columns, inplace=True)

# Grouping by state and getting temperature average
state_weather = state_weather.groupby(['state'])['average_temperature'].mean().reset_index()

state_weather.head()

Unnamed: 0,state,average_temperature
0,Alabama,17.066138
1,Alaska,-4.890738
2,Arizona,15.381526
3,Arkansas,15.573963
4,California,14.327677


##### Cleanning country weather data

In [105]:
# Exploring and cleanning airport data
country_weather = pd.read_csv('data/temperature_by_country.csv', sep=',')

# Droping NA entries
country_weather.dropna(inplace=True)

# Dropping columns that will not be used
country_weather.drop(['dt'], axis=1, inplace=True)
country_weather.drop(['AverageTemperatureUncertainty'], axis=1, inplace=True)

# Renaming columns
country_weather_columns = {
    'AverageTemperature': 'average_temperature',
    'Country': 'country'
}
country_weather.rename(columns=country_weather_columns, inplace=True)

# Grouping by state and getting temperature average
country_weather = country_weather.groupby(['country'])['average_temperature'].mean().reset_index()

country_weather.head()

Unnamed: 0,country,average_temperature
0,Afghanistan,14.045007
1,Africa,24.074203
2,Albania,12.610646
3,Algeria,22.985112
4,American Samoa,26.611965


##### Cleanning airport data

In [47]:
# Exploring and cleanning airport data
airports = pd.read_csv('data/airport-codes_csv.csv', sep=',', keep_default_na=False, na_values=[''])

# Checking if ident is present in all rows
airports.ident.count() == airports.shape[0]

# Select only US airports
airports = airports[airports['iso_country'] == 'US']

# Spliting coordinates into longitude and latitude
airports['longitude'], airports['latitude'] = airports['coordinates'].str.split(',', 1).str
airports[['longitude', 'latitude']] = airports[['longitude', 'latitude']].astype(float)

# Spliting iso_region and creating state column
_, airports['state'] = airports['iso_region'].str.split('-', 1).str

# Drop airports without IATA_CODE
airports.dropna(subset=['iata_code'], inplace=True)

# Checking if IATA_CODEs are unique
airports.iata_code.count() == airports.shape[0]

# Dropping columns that will not be used
unused_columns = ['ident', 'type', 'elevation_ft', 'iso_region', 'gps_code', 'local_code', 'coordinates']
airports.drop(unused_columns, axis=1, inplace=True)

# Resetting indexes
airports.reset_index(inplace=True)
airports.drop(['index'], axis=1, inplace=True)

# Dropping duplicates
airports.drop_duplicates(inplace=True)

# Renamming iso_country column
airports.rename(columns={'iso_country': 'country', 'municipality': 'city'}, inplace=True)

airports.head()

Unnamed: 0,name,continent,country,city,iata_code,longitude,latitude,state
0,Ocean Reef Club Airport,,US,Key Largo,OCA,-80.274803,25.325399,FL
1,Pilot Station Airport,,US,Pilot Station,PQS,-162.899994,61.934601,AK
2,Crested Butte Airpark,,US,Crested Butte,CSE,-106.928341,38.851918,CO
3,LBJ Ranch Airport,,US,Johnson City,JCY,-98.622498,30.251801,TX
4,Metropolitan Airport,,US,Palmer,PMX,-72.311401,42.223301,MA


##### Cleaning cities demographics data

In [162]:
# Exploring and cleaning demographics data
cities_demographics = pd.read_csv('data/us-cities-demographics.csv', sep=';')

# Dropping NA and duplicates
cities_demographics.dropna(inplace=True)
cities_demographics.drop_duplicates(inplace=True)

# Renaming columns for convention
demographics_columns = {
    'City': 'city',
    'State': 'state',
    'Median Age': 'median_age',
    'Male Population': 'male_population',
    'Female Population': 'female_population',
    'Total Population': 'total_population',
    'Number of Veterans': 'number_of_veterans',
    'Foreign-born': 'foreign_born',
    'Average Household Size': 'average_household_size',
    'State Code': 'state_code',
    'Race': "race",
    'Count': "count"
}
cities_demographics.rename(columns=demographics_columns, inplace=True)

# Dropping unused columns
unused_columns = ['number_of_veterans', 'average_household_size']
cities_demographics.drop(unused_columns, axis=1, inplace=True)

# Dropping duplicates
immigration_data.drop_duplicates(inplace=True)

# Casting column types
cities_demographics['male_population'] = cities_demographics['male_population'].astype(int)
cities_demographics['female_population'] = cities_demographics['female_population'].astype(int)
cities_demographics['foreign_born'] = cities_demographics['foreign_born'].astype(int)

cities_demographics.race.unique()
cities_demographics.head()

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,foreign_born,state_code,race,count
0,Silver Spring,Maryland,33.8,40601,41862,82463,30908,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,32935,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,8229,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,33878,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,86253,NJ,White,76402


##### Cleaning immigration data

In [56]:
# fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration_data = pd.read_csv('immigration_data_sample.csv', sep=',')

# Droppinig unused columns
unused_columns = [0, 1, 13, 14, 15, 16, 17, 18, 19, 20, 22, 24, 25, 26, 27]
immigration_data.drop(immigration_data.columns[unused_columns], axis=1, inplace=True)

# Renaming columns
immigration_columns = {
    "i94yr": "year",
    "i94mon": "month",
    "i94cit": "citizenship_country",
    "i94res": "residence_country",
    "i94port": "airport_code",
    "arrdate": "arrival_date",
    "i94mode": "travel_mode",
    "i94addr": "state",
    "depdate": "departure_date",
    "i94visa": "visa_purpose",
    "i94bir": "passenger_age",
    "biryear": "passenger_birthyear",
    "visatype": "vita_type"
}
immigration_data.rename(columns=immigration_columns, inplace=True)

# Dropping NA and duplicates
immigration_data.dropna(inplace=True)
immigration_data.drop_duplicates(inplace=True)

# Casting columns to correct types
immigration_data['citizenship_country'] = immigration_data['citizenship_country'].astype(int)
immigration_data['residence_country'] = immigration_data['residence_country'].astype(int)
immigration_data['year'] = immigration_data['year'].astype(int)
immigration_data['month'] = immigration_data['month'].astype(int)
immigration_data['passenger_age'] = immigration_data['passenger_age'].astype(int)
immigration_data['visa_purpose'] = immigration_data['visa_purpose'].astype(int)
immigration_data['passenger_birthyear'] = immigration_data['passenger_birthyear'].astype(int)

# Casting arrival_date and departure_date to date type
immigration_data['arrival_date'] = pd.to_datetime("1960-01-01") + pd.to_timedelta(immigration_data['arrival_date'], unit='d')
immigration_data['departure_date'] = pd.to_datetime("1960-01-01") + pd.to_timedelta(immigration_data['departure_date'], unit='d')

# Dropping year and month column since they can be derived from arrival date
immigration_data.drop(['year', 'month'], axis=1, inplace=True)

immigration_data.head()

Unnamed: 0,citizenship_country,residence_country,airport_code,arrival_date,travel_mode,state,departure_date,passenger_age,visa_purpose,passenger_birthyear,gender,vita_type
0,209,209,HHW,2016-04-22,1.0,HI,2016-04-29,61,2,1955,F,WT
1,582,582,MCA,2016-04-23,1.0,TX,2016-04-24,26,2,1990,M,B2
2,148,112,OGG,2016-04-07,1.0,FL,2016-04-27,76,2,1940,M,WT
3,297,297,LOS,2016-04-28,1.0,CA,2016-05-07,25,2,1991,M,B2
4,111,111,CHM,2016-04-06,3.0,NY,2016-04-09,19,2,1997,F,WT


##### Cleanning country data

In [107]:
country_data = pd.read_csv('data/country_codes.csv', sep='=')

# Removing single quotes from country names
country_data['country_name'] = country_data['country_name'].str.replace('\'', '')

# Trimming whitespace
country_data['country_name'] = country_data['country_name'].str.strip()

# Capitalize only first letters of country names
country_data['country_name'] = country_data['country_name'].str.title()

country_data.head()

Unnamed: 0,country_code,country_name
0,582,Mexico
1,236,Afghanistan
2,101,Albania
3,316,Algeria
4,102,Andorra


##### Cleanning state data

In [103]:
state_data = pd.read_csv('data/state_codes.csv', sep='=')

# Removing single quotes from state_codes and state_names
state_data['state_code'] = state_data['state_code'].str.replace('\'', '')
state_data['state_name'] = state_data['state_name'].str.replace('\'', '')

# Capitalize only first letters of state names
state_data['state_name'] = state_data['state_name'].str.title()

state_data.head()

Unnamed: 0,state_code,state_name
0,AL,Alabama
1,AK,Alaska
2,AZ,Arizona
3,AR,Arkansas
4,CA,California


##### Loading additional mappers

In [217]:
visa_purposes = pd.read_csv('data/visa_purposes.csv', sep='=')
travel_modes = pd.read_csv('data/travel_modes.csv', sep='=')

visa_purposes.head()

Unnamed: 0,visa_code,visa_purpose
0,1,Business
1,2,Pleasure
2,3,Student


### Step 3: Define the Data Model

No matching between airport and immigration data set was found. The IATA_CODEs don't correspond to the airport code of the immigration data, so airports data will not be used, since there is no fail-safe way of linking both data sources. State could be used, but is too broad to identify an airport.

#### 3.1 Conceptual Data Model
A star schema will be used because it is very easy to query analytics and aggregate data to anwser business questions with it.
Dimension tables for country, state and times will be created.
Our facts table will be based on immigration data from the immigration dataset. A picture of our schema follows.

![](db_diagram.png)
#### 3.2 Mapping Out Data Pipelines

Spark will be used to aggregate data and create our data model. We move from Pandas because now we are not exploring subsets of data anymore. Now we will work with a huge volume of data and Spark allows us to scale it and paralelize processing if required.

The state table will contain demographics data that will be obtained by aggregating the cities demographics dataset. It will also contain weather information.

The country table will contain weather information and country codes obtained from the immigration dataset.

The times table will be built based on dates contained in the immigration data.

To create our facts table, we will use the immigration data and some mappings for state, country and visa purpose codes.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### Creating spark session

In [180]:
spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .enableHiveSupport().getOrCreate()

##### Importing and cleanning full immigration data on spark

In [230]:
immigration_data_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
# immigration_data_spark = spark.read.csv('immigration_data_sample.csv')

# Dropping unnecessary columns
drop_columns = ['cicid', 'i94yr', 'i94mon', 'i94port', 'entdepu', 'matflag', 'airline', 'admnum', \
                'fltno', 'occup', 'entdepg', 'insnum', 'dtaddto', 'entdepd', 'count', 'dtadfile', \
                'entdepa', 'visapost']
immigration_data_spark = immigration_data_spark.drop(*drop_columns)

# Renaming columns
immigration_data_spark = immigration_data_spark.withColumnRenamed('i94cit', 'citizenship_country')
immigration_data_spark = immigration_data_spark.withColumnRenamed('i94res', 'residence_country')
immigration_data_spark = immigration_data_spark.withColumnRenamed('i94port', 'airport_code')
immigration_data_spark = immigration_data_spark.withColumnRenamed('arrdate', 'arrival_date')
immigration_data_spark = immigration_data_spark.withColumnRenamed('i94mode', 'travel_mode')
immigration_data_spark = immigration_data_spark.withColumnRenamed('i94addr', 'state')
immigration_data_spark = immigration_data_spark.withColumnRenamed('depdate', 'departure_date')
immigration_data_spark = immigration_data_spark.withColumnRenamed('i94visa', 'visa_purpose')
immigration_data_spark = immigration_data_spark.withColumnRenamed('i94bir', 'passenger_age')
immigration_data_spark = immigration_data_spark.withColumnRenamed('biryear', 'passenger_birthyear')
immigration_data_spark = immigration_data_spark.withColumnRenamed('visatype', 'visa_type')

# Cleanning data
immigration_data_spark = immigration_data_spark.dropna()
immigration_data_spark = immigration_data_spark.distinct()

# Processing dates columns
immigration_data_spark = immigration_data_spark.withColumn('arrival_date', expr("date_add(to_date('1960-01-01'), arrival_date)"))
immigration_data_spark = immigration_data_spark.withColumn('departure_date', expr("date_add(to_date('1960-01-01'), departure_date)"))

# Casting data to correct type
immigration_data_spark = immigration_data_spark.withColumn('citizenship_country', immigration_data_spark.citizenship_country.cast('int'))
immigration_data_spark = immigration_data_spark.withColumn('residence_country', immigration_data_spark.residence_country.cast('int'))
immigration_data_spark = immigration_data_spark.withColumn('travel_mode', immigration_data_spark.travel_mode.cast('int'))
immigration_data_spark = immigration_data_spark.withColumn('passenger_age', immigration_data_spark.passenger_age.cast('int'))
immigration_data_spark = immigration_data_spark.withColumn('visa_purpose', immigration_data_spark.visa_purpose.cast('int'))
immigration_data_spark = immigration_data_spark.withColumn('passenger_birthyear', immigration_data_spark.passenger_birthyear.cast('int'))

immigration_data_spark.limit(10).toPandas()

Unnamed: 0,citizenship_country,residence_country,arrival_date,travel_mode,state,departure_date,passenger_age,visa_purpose,passenger_birthyear,gender,visa_type
0,103,103,2016-04-01,1,IN,2016-04-07,42,1,1974,F,WB
1,103,103,2016-04-01,1,PR,2016-04-16,60,2,1956,M,WT
2,103,112,2016-04-01,1,FL,2016-04-15,52,2,1964,M,WT
3,104,104,2016-04-01,1,NJ,2016-04-08,49,2,1967,M,WT
4,104,104,2016-04-01,1,NY,2016-04-10,40,2,1976,M,WT
5,104,104,2016-04-01,1,NY,2016-04-17,62,2,1954,F,WT
6,104,104,2016-04-01,1,CA,2016-04-29,71,2,1945,M,WT
7,104,104,2016-04-01,1,NV,2016-04-09,38,2,1978,M,WT
8,104,104,2016-04-01,1,FL,2016-04-10,24,2,1992,F,WT
9,104,104,2016-04-01,1,FL,2016-04-08,47,2,1969,F,WT


##### Getting cleanned pandas df as spark df and registering them as tables

In [231]:
state_weather_spark = spark.createDataFrame(state_weather)
country_weather_spark = spark.createDataFrame(country_weather)
state_data_spark = spark.createDataFrame(state_data)
country_data_spark = spark.createDataFrame(country_data)
cities_demographics_spark = spark.createDataFrame(cities_demographics)
travel_modes_spark = spark.createDataFrame(travel_modes)
visa_purposes_spark = spark.createDataFrame(visa_purposes)

state_weather_spark.createOrReplaceTempView("state_weather")
country_weather_spark.createOrReplaceTempView("country_weather")
state_data_spark.createOrReplaceTempView("state_data")
country_data_spark.createOrReplaceTempView("country_data")
cities_demographics_spark.createOrReplaceTempView("cities_demographics")
immigration_data_spark.createOrReplaceTempView("immigration_data")
travel_modes_spark.createOrReplaceTempView("travel_modes")
visa_purposes_spark.createOrReplaceTempView("visa_purposes")

##### Creating dimension tables

In [232]:
states_sql = """
    SELECT first(state_code) as state_code,
        first(sw.state) as state, 
        first(average_temperature) as average_temperature, 
        CAST(SUM(male_population) / 5 AS int) AS male_population,
        CAST(SUM(female_population) / 5 AS int) AS female_population,
        CAST(SUM(total_population) / 5 AS int) AS total_population,
        CAST(SUM(foreign_born) / 5 AS int) AS foreign_born,
        SUM(CASE WHEN race = 'Hispanic or Latino' THEN count ELSE 0 END) AS hispanic_or_latino,
        SUM(CASE WHEN race = 'White' THEN count ELSE 0 END) AS white,
        SUM(CASE WHEN race = 'Asian' THEN count ELSE 0 END) AS asian,
        SUM(CASE WHEN race = 'Black or African-American' THEN count ELSE 0 END) AS black_or_african_american,
        SUM(CASE WHEN race = 'American Indian and Alaska Native' THEN count ELSE 0 END) AS american_indian_and_alaska_native
    FROM state_weather AS sw
    JOIN cities_demographics AS cd ON sw.state = cd.state
    GROUP BY sw.state
"""


states_table = spark.sql(states_sql)
states_table.limit(10).toPandas()

Unnamed: 0,state_code,state,average_temperature,male_population,female_population,total_population,foreign_born,hispanic_or_latino,white,asian,black_or_african_american,american_indian_and_alaska_native
0,UT,Utah,8.478841,517350,506585,1023935,130362,201695,889798,48801,21893,18746
1,HI,Hawaii,22.439283,176807,175959,352766,101312,24586,110508,240978,11781,5592
2,MN,Minnesota,4.472812,695760,713072,1408833,213977,103229,1050239,151544,216731,25242
3,OH,Ohio,10.320152,1170650,1248659,2419310,174778,164686,1491755,86006,877366,31808
4,AR,Arkansas,15.573963,280144,296433,576577,61550,77813,384733,22062,149608,9381
5,OR,Oregon,8.165825,707443,729066,1436509,185753,201498,1235819,117279,72150,38597
6,TX,Texas,18.107234,6972438,7138331,14110770,2899610,6311431,10508923,924552,2130242,154497
7,ND,North Dakota,4.169715,95235,94255,189490,11492,5234,172068,5576,8177,7142
8,PA,Pennsylvania,9.110052,1102940,1197619,2300560,287987,387081,1245618,159308,851200,31849
9,CT,Connecticut,9.02008,424687,446332,871019,222850,309992,505674,48311,231822,10729


In [233]:
countries_sql = """
    SELECT country_code,
        country_name as country,
        average_temperature
    FROM country_data AS cd
    JOIN country_weather AS cw ON cd.country_name = cw.country
"""

countries_table = spark.sql(countries_sql)
countries_table.limit(10).toPandas()

Unnamed: 0,country_code,country,average_temperature
0,384,Chad,27.120466
1,529,Anguilla,26.610492
2,693,Paraguay,23.237968
3,158,Russia,-5.521882
4,216,Yemen,26.253597
5,391,Senegal,27.967375
6,130,Sweden,2.386332
7,414,Kiribati,26.736865
8,603,Guyana,25.93092
9,243,Burma,23.706197


In [243]:
times_sql = """
    SELECT DISTINCT date,
        dayofmonth(date) as day,
        weekofyear(date) as week,
        month(date) as month,
        year(date) as year,
        dayofweek(date) as weekday
    FROM
        (
            SELECT arrival_date AS date FROM immigration_data
                UNION
            SELECT departure_date AS date FROM immigration_data
        )
"""

times_table = spark.sql(times_sql)
times_table.limit(10).toPandas()

Unnamed: 0,date,day,week,month,year,weekday
0,2016-04-25,25,17,4,2016,2
1,2016-07-26,26,30,7,2016,3
2,2016-05-03,3,18,5,2016,3
3,2016-08-31,31,35,8,2016,4
4,2016-08-15,15,33,8,2016,2
5,2016-07-17,17,28,7,2016,1
6,2016-07-03,3,26,7,2016,1
7,2016-08-23,23,34,8,2016,3
8,2016-05-26,26,21,5,2016,5
9,2016-06-02,2,22,6,2016,5


##### Creating facts table

In [244]:
immigrations_sql = """
    SELECT cd.country_code as citizenship_country, 
        cdd.country_code as residence_country, 
        arrival_date,
        tm.travel_mode_method as travel_mode, 
        state, 
        departure_date, 
        passenger_age, 
        vp.visa_purpose as visa_purpose,
        passenger_birthyear,
        gender,
        id.visa_type
    FROM immigration_data AS id
    JOIN country_data AS cd ON cd.country_code = id.citizenship_country
    JOIN country_data AS cdd ON cdd.country_code = id.residence_country
    JOIN state_data AS sd ON sd.state_code = id.state
    JOIN visa_purposes AS vp ON vp.visa_code = id.visa_purpose
    JOIN travel_modes AS tm ON tm.travel_mode_code = id.travel_mode
"""

immigrations_table = spark.sql(immigrations_sql)
immigrations_table.limit(10).toPandas()

Unnamed: 0,citizenship_country,residence_country,arrival_date,travel_mode_method,state,departure_date,passenger_age,visa_purpose,passenger_birthyear,gender,visa_type
0,514,514,2016-04-08,Not reported,VA,2016-08-12,46,Business,1970,F,B1
1,112,112,2016-04-17,Not reported,MI,2016-04-24,39,Business,1977,M,WB
2,268,268,2016-04-17,Not reported,CA,2016-05-27,52,Business,1964,F,E1
3,117,117,2016-04-26,Not reported,IL,2016-05-04,35,Business,1981,F,WB
4,111,111,2016-04-03,Not reported,WA,2016-06-10,50,Business,1966,M,WB
5,260,260,2016-04-22,Not reported,AL,2016-05-19,40,Business,1976,M,B1
6,260,260,2016-04-22,Not reported,AL,2016-04-23,55,Business,1961,M,B1
7,260,260,2016-04-22,Not reported,AL,2016-04-28,36,Business,1980,M,B1
8,260,260,2016-04-22,Not reported,AL,2016-04-28,39,Business,1977,M,B1
9,260,260,2016-04-22,Not reported,AL,2016-07-21,41,Business,1975,M,B1


##### Persisting the tables

In [246]:
#write to parquet

states_table.write.parquet("./tables/states")
countries_table.write.parquet("./tables/countries")
times_table.write.parquet("./tables/times")
immigrations_table.write.parquet("./tables/immigrations")

#### 4.2 Data Quality Checks
To check if our dimension tables were created correctly, we will check each of them to see if any field has PK as null, which should not be allowed and should not happen if the pipeline executed correctly.
 
Run Quality Checks

In [249]:
# Registering created tables in spark

states_table.createOrReplaceTempView("states")
countries_table.createOrReplaceTempView("countries")
times_table.createOrReplaceTempView("times")
immigrations_table.createOrReplaceTempView("immigrations")

# Perform quality checks here
spark.sql("SELECT COUNT(*) FROM states WHERE state_code == NULL").show() # Should be 0
spark.sql("SELECT COUNT(*) FROM countries WHERE country_code == NULL").show() # Should be 0
spark.sql("SELECT COUNT(*) FROM times WHERE date == NULL").show() # Should be 0

+--------+
|count(1)|
+--------+
|       0|
+--------+

+--------+
|count(1)|
+--------+
|       0|
+--------+

+--------+
|count(1)|
+--------+
|       0|
+--------+



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

##### States table

This table represents states from United States

Column | Meaning
--- | --- 
state_code | State abbreviation
state | State name
average_temperature | Average temperature for the state. Data was obtained by aggregating average temperatures from different years.
male_population | Male population for state. Was obtained by aggregating data from different cities from the same state.
female_population | Female population for state. Was obtained by aggregating data from different cities from the same state.
total_population | Total population for state. Was obtained by aggregating data from different cities from the same state.
foreign_born | Number of foreigns born at the state. Was obtained by aggregating data from different cities from the same state.
hispanic_or_latino | Number of hispanics or latinos at the state. Was obtained through aggregation.
white | Number of whites at the state. Was obtained through aggregation.
asian | Number of asians at the state. Was obtained through aggregation.
black_or_african_american | Number of blacks or african americans at the state. Was obtained through aggregation.
american_indian_and_alaska_native | Number of american indians and alaska natives at the state. Was obtained through aggregation.

##### Countries table

This table represents countries from the world and their average temperatures

Column | Meaning
--- | --- 
country_code | Code representing the country
country | Country name
average_temperature | Average temperature for the country. Data was obtained by aggregating average temperatures from different years.

##### Times table

This table represents dates

Column | Meaning
--- | --- 
date | A date in the format YYYY-mm-DD.
day | Day of the date.
week | Week of the month of the date.
month | Month of the date.
year | Year of the date.
weekday | Weekday of the date.

##### Immigrations table

This is our facts table. It contains information about entries and departures at the US

Column | Meaning
--- | --- 
citizenship_country | The country code of the country of citizenship of the immigrant.
residence_country | The country code of the country of residence of the immigrant.
state | State by which the immigrant arrived. References state_code at states table.
arrival_date | Date in which the immigrant arrived.
departure_date | Date in which the immigrant departed.
travel_mode | Travel mode of the immigrant.
visa_purpose | Purpose of the visa of the immigrant, e.g. Pleasure.
passenger_age | Passenger age.
passenger_birthyear | Passenger birthyear.
gender | Passenger gender.
visa_type | Visa type, e.g. WB.

#### Example questions and queries to answer them

##### Get the top 5 states in which passengers resident from Brazil arrive, and their average temperature

In [256]:
spark.sql("""
SELECT COUNT(*) as count, s.state, first(s.average_temperature) as average_temperature
FROM immigrations as i
JOIN countries as c ON c.country_code = i.residence_country
JOIN states as s ON s.state_code = i.state
WHERE c.country LIKE 'Brazil'
GROUP BY s.state
ORDER BY count DESC  
LIMIT 5
""").show()

+-----+----------+-------------------+
|count|     state|average_temperature|
+-----+----------+-------------------+
|38969|   Florida| 21.501561590688663|
|14055|  New York| 7.1836184674575305|
| 9206|California| 14.327677288821446|
| 5637|    Nevada|  9.814444803695169|
| 3042|     Texas|   18.1072339784946|
+-----+----------+-------------------+



#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

##### Rationale for tool choice

Pandas was used to explore and clean data because it is easy to use and the data sets cleaned with it were not very big, so it was a great tool for the job. To process the data and clean immigration data, spark was used, because it is powerful and easy to scale if needed.

##### When should data be updated

For our purpose, data should be updated quarterly. It would allow us to not spend much on processing the data, but the travel agency could still get insights and decide on what to sell and to whom

##### What to do if data was increased by 100x

Since we are already using Spark, we could migrate to an EMR instance on AWS and allow it to scale as needed

##### The data populates a dashboard on a daily basis

We could use Apache Airflow to set a daily schedule

##### The database needed to be accessed by 100+ people

We could migrate to Redshift on AWS and scale on the cloud, or could in which ways data is usually queried to create pre aggregations on Cassandra, for example.