# Modeling U.S. arrivals
### Udacity Data Engineering Nanodegree Capstone

#### Project Summary
This project aims to model non-citizen arrivals to the United States by month, visa type, and country of residency.

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import upper, col
from pyspark.sql.functions import date_format
import datetime
import pandas as pd

In [3]:
spark = SparkSession \
    .builder \
    .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
    .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .appName("Capstone") \
    .getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
This is the pilot for a large, ground-truth dataset that links U.S. arrivals to the temperatures in the visitors' countries of residence. This dataset, if leveraged in the right way, could indicate climate change-induced travel. The entire year 2016 of aggregated U.S. arrivals is available, as well as 2002-2013 of aggregated temperature data per country. For the latter dataset, 10 full years are used to provide more recent relevant data.

#### Describe and Gather Data 
For the modeling in this project, I used the following datasets:

I94 immigration data for non-citizen arrivals to the United States (source: [US National Tourism and Trade Office](https://www.trade.gov/national-travel-and-tourism-office)). All non-citizens arriving in the U.S. must return an i94 form, and information of interest from this dataset includes the airport of arrival, visa type, and country of residence. This dataset's documentation provided the country information in countries.csv.

World temperature data (source: [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)). Provides average surface temperature data per country from the 1700s through 2013.

Airport codes ([source](https://datahub.io/core/airport-codes#data)). Reports airport codes and the airport's official name and location.

ISO country codes ([source](https://datahub.io/core/country-list#data)). Reports a country's name and its two-letter ISO code.

##### Writing i94 dataset parquet files

In [4]:
import os
os.listdir('../../data/18-83510-I94-Data-2016/')

['i94_apr16_sub.sas7bdat',
 'i94_sep16_sub.sas7bdat',
 'i94_nov16_sub.sas7bdat',
 'i94_mar16_sub.sas7bdat',
 'i94_jun16_sub.sas7bdat',
 'i94_aug16_sub.sas7bdat',
 'i94_may16_sub.sas7bdat',
 'i94_jan16_sub.sas7bdat',
 'i94_oct16_sub.sas7bdat',
 'i94_jul16_sub.sas7bdat',
 'i94_feb16_sub.sas7bdat',
 'i94_dec16_sub.sas7bdat']

In [None]:
# read in all immigration data, filter to columns wanted, append together
immigration_first = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
immigration_df = immigration_first.select(['i94yr', 'i94mon', 'i94res','i94port','i94visa','cicid'])

for filename in os.listdir('../../data/18-83510-I94-Data-2016/')[1:]:
    df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/' + filename)
    filtered_df = df.select(['i94yr', 'i94mon', 'i94res','i94port','i94visa','cicid'])
    
    immigration_df = immigration_df.union(filtered_df)

In [None]:
#write to parquet
immigration_df.write.parquet('sas_full_dataset')

##### Read in written parquet

In [5]:
# read in immigration parquet
immigration_df = spark.read.parquet('sas_full_dataset')
immigration_df.printSchema()

root
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- cicid: double (nullable = true)



In [6]:
immigration_df.take(5)

[Row(i94yr=2016.0, i94mon=8.0, i94res=696.0, i94port='LOS', i94visa=2.0, cicid=6320527.0),
 Row(i94yr=2016.0, i94mon=8.0, i94res=112.0, i94port='SFR', i94visa=1.0, cicid=6320528.0),
 Row(i94yr=2016.0, i94mon=8.0, i94res=135.0, i94port='ORL', i94visa=2.0, cicid=6320529.0),
 Row(i94yr=2016.0, i94mon=8.0, i94res=135.0, i94port='MIA', i94visa=2.0, cicid=6320530.0),
 Row(i94yr=2016.0, i94mon=8.0, i94res=112.0, i94port='SEA', i94visa=2.0, cicid=6320531.0)]

In [7]:
# read in world temperature data -- measurements listed in C
temp = spark.read.option("header",True).option("inferSchema",True).csv('../../data2/GlobalLandTemperaturesByCity.csv')
temp.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [8]:
temp.take(5)

[Row(dt=datetime.datetime(1743, 11, 1, 0, 0), AverageTemperature=6.068, AverageTemperatureUncertainty=1.7369999999999999, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt=datetime.datetime(1743, 12, 1, 0, 0), AverageTemperature=None, AverageTemperatureUncertainty=None, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt=datetime.datetime(1744, 1, 1, 0, 0), AverageTemperature=None, AverageTemperatureUncertainty=None, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt=datetime.datetime(1744, 2, 1, 0, 0), AverageTemperature=None, AverageTemperatureUncertainty=None, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt=datetime.datetime(1744, 3, 1, 0, 0), AverageTemperature=None, AverageTemperatureUncertainty=None, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E')]

In [9]:
# read in airports data and iso country codes
codes = spark.read.option("header",True).option("inferSchema",True).csv('airport-codes_csv.csv')

iso_codes = spark.read.option("header",True).option("inferSchema",True).csv('iso_2alpha.csv')

In [10]:
codes.printSchema()
codes.take(5)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



[Row(ident='00A', type='heliport', name='Total Rf Heliport', elevation_ft=11, continent='NA', iso_country='US', iso_region='US-PA', municipality='Bensalem', gps_code='00A', iata_code=None, local_code='00A', coordinates='-74.93360137939453, 40.07080078125'),
 Row(ident='00AA', type='small_airport', name='Aero B Ranch Airport', elevation_ft=3435, continent='NA', iso_country='US', iso_region='US-KS', municipality='Leoti', gps_code='00AA', iata_code=None, local_code='00AA', coordinates='-101.473911, 38.704022'),
 Row(ident='00AK', type='small_airport', name='Lowell Field', elevation_ft=450, continent='NA', iso_country='US', iso_region='US-AK', municipality='Anchor Point', gps_code='00AK', iata_code=None, local_code='00AK', coordinates='-151.695999146, 59.94919968'),
 Row(ident='00AL', type='small_airport', name='Epps Airpark', elevation_ft=820, continent='NA', iso_country='US', iso_region='US-AL', municipality='Harvest', gps_code='00AL', iata_code=None, local_code='00AL', coordinates='-86.

In [11]:
iso_codes.printSchema()
iso_codes.take(5)

root
 |-- Name: string (nullable = true)
 |-- Code: string (nullable = true)



[Row(Name='Afghanistan', Code='AF'),
 Row(Name='Åland Islands', Code='AX'),
 Row(Name='Albania', Code='AL'),
 Row(Name='Algeria', Code='DZ'),
 Row(Name='American Samoa', Code='AS')]

In [12]:
# read in country codes assembled from i94 documentation (I94_SAS_Labels_Descriptions.SAS)
countries_df = spark.read.option("header",True).option("inferSchema",True).csv('countries.csv')
countries_df = countries_df.select(['country','code'])
countries_df.printSchema()

root
 |-- country: string (nullable = true)
 |-- code: integer (nullable = true)



In [13]:
countries_df.take(5)

[Row(country='MEXICO Air Sea, and Not Reported (I-94, no land arrivals)', code=582),
 Row(country='AFGHANISTAN', code=236),
 Row(country='ALBANIA', code=101),
 Row(country='ALGERIA', code=316),
 Row(country='ANDORRA', code=102)]

### Step 2: Explore and Assess the Data
#### Explore the Data 
Exploration and experimentation with the data can be found in exploration.ipynb.

#### Cleaning Steps
1. Fix country names in temperature and countries data
2. Fix country names in iso code data

In [14]:
# temperature data

# fix country values so temp, country json, and iso codes match as much as possible
# need replacement dictionaries
temp_dict = {
            'BOSNIA AND HERZEGOVINA':'BOSNIA-HERZEGOVINA',
            'CHINA': 'CHINA, PRC',
            "CÔTE D'IVOIRE": 'IVORY COAST',
            'GUINEA BISSAU': 'GUINEA-BISSAU',
            'CONGO (DEMOCRATIC REPUBLIC OF THE)': 'CONGO',
            'BURMA': 'MYANMAR'
}
temp_cleaning = temp.select("*", upper(col('Country'))).drop('Country')
temp_cleaning = temp_cleaning.na.replace(temp_dict, subset='upper(Country)').withColumnRenamed('upper(Country)', 'country')

countries_dict = {
    'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)': 'MEXICO',
    'INVALID: ': '',
    'BURMA': 'MYANMAR'
}
countries_cleaning = countries_df.na.replace(countries_dict, subset='country')

# Note: Cambodia does not have a known code and visitor data is not being well tracked. Requires further investigation.

iso_dict = {
    'CURAÇAO': 'CURACAO',
    'IRAN, ISLAMIC REPUBLIC OF': 'IRAN',
    'VIRGIN ISLANDS, BRITISH': 'BRITISH VIRGIN ISLANDS',
    'TANZANIA, UNITED REPUBLIC OF': 'TANZANIA',
    'MACEDONIA, THE FORMER YUGOSLAV REPUBLIC OF': 'MACEDONIA',
    'TAIWAN, PROVINCE OF CHINA': 'TAIWAN',
    'CONGO, THE DEMOCRATIC REPUBLIC OF THE': 'CONGO',
    'VENEZUELA, BOLIVARIAN REPUBLIC OF': 'VENEZUELA',
    "KOREA, DEMOCRATIC PEOPLE'S REPUBLIC OF": 'NORTH KOREA',
    'FALKLAND ISLANDS (MALVINAS)': 'FALKLAND ISLANDS',
    'VIET NAM': 'VIETNAM',
    'BRUNEI DARUSSALAM': 'BRUNEI',
    'BOLIVIA, PLURINATIONAL STATE OF': 'BOLIVIA',
    'MICRONESIA, FEDERATED STATES OF': 'MICRONESIA, FED. STATES OF',
    'VIRGIN ISLANDS, U.S.': 'U.S. VIRGIN ISLANDS',
    'MOLDOVA, REPUBLIC OF': 'MOLDOVA',
    'KOREA, REPUBLIC OF': 'SOUTH KOREA',
    'PALESTINE, STATE OF': 'PALESTINE',
    'SAINT HELENA, ASCENSION AND TRISTAN DA CUNHA': 'ST. HELENA',
    'BOSNIA AND HERZEGOVINA': 'BOSNIA-HERZEGOVINA',
    'SAMOA': 'WESTERN SAMOA',
    "CÔTE D'IVOIRE": 'IVORY COAST',
    'CHINA': 'CHINA, PRC',
    'HOLY SEE (VATICAN CITY STATE)': 'HOLY SEE/VATICAN',
    'FAROE ISLANDS': 'FAROE ISLANDS (PART OF DENMARK)',
    'SAINT LUCIA': 'ST. LUCIA',
    "LAO PEOPLE'S DEMOCRATIC REPUBLIC": 'LAOS',
    'MACAO': 'MACAU',
    'TIMOR-LESTE': 'EAST TIMOR',
    'SAINT MARTIN (FRENCH PART)': 'SAINT MARTIN',
    'SINT MAARTEN (DUTCH PART)': 'SAINT MAARTEN',
    'SAINT KITTS AND NEVIS': 'ST. KITTS-NEVIS',
    'BONAIRE, SINT EUSTATIUS AND SABA': 'BONAIRE, ST EUSTATIUS, SABA',
    'COCOS (KEELING) ISLANDS': 'COCOS ISLANDS',
    'SYRIAN ARAB REPUBLIC': 'SYRIA',
    'RUSSIAN FEDERATION': 'RUSSIA',
    'WALLIS AND FUTUNA': 'WALLIS AND FUTUNA ISLANDS',
    'NORTHERN MARIANA ISLANDS': 'MARIANA ISLANDS, NORTHERN',
    'ANTIGUA AND BARBUDA': 'ANTIGUA-BARBUDA',
    'HEARD ISLAND AND MCDONALD ISLANDS': 'HEARD AND MCDONALD IS.',
    'SAINT BARTHÉLEMY': 'SAINT BARTHELEMY',
    'MAYOTTE': 'MAYOTTE (AFRICA - FRENCH)',
    'RÉUNION': 'REUNION',
    'BOUVET ISLAND': 'BOUVET ISLAND (ANTARCTICA/NORWAY TERR.)',
    'SAINT VINCENT AND THE GRENADINES': 'ST. VINCENT-GRENADINES',
    'SAINT PIERRE AND MIQUELON': 'ST. PIERRE AND MIQUELON',
    'UNITED STATES MINOR OUTLYING ISLANDS': 'MINOR OUTLYING ISLANDS - USA',
    'SOUTH SUDAN': 'REPUBLIC OF SOUTH SUDAN',
    'BRITISH INDIAN OCEAN TERRITORY': 'INDIAN OCEAN TERRITORY'
}

iso_cleaning = iso_codes.select("*", upper(col('Name')))
iso_cleaning = iso_cleaning.replace(iso_dict, subset='Name') \
    .withColumnRenamed('upper(Name)', 'country') \
    .withColumnRenamed('Code', '2acode')

In [15]:
iso_cleaning.show()

+-------------------+------+-------------------+
|               Name|2acode|            country|
+-------------------+------+-------------------+
|        Afghanistan|    AF|        AFGHANISTAN|
|      Åland Islands|    AX|      ÅLAND ISLANDS|
|            Albania|    AL|            ALBANIA|
|            Algeria|    DZ|            ALGERIA|
|     American Samoa|    AS|     AMERICAN SAMOA|
|            Andorra|    AD|            ANDORRA|
|             Angola|    AO|             ANGOLA|
|           Anguilla|    AI|           ANGUILLA|
|         Antarctica|    AQ|         ANTARCTICA|
|Antigua and Barbuda|    AG|ANTIGUA AND BARBUDA|
|          Argentina|    AR|          ARGENTINA|
|            Armenia|    AM|            ARMENIA|
|              Aruba|    AW|              ARUBA|
|          Australia|    AU|          AUSTRALIA|
|            Austria|    AT|            AUSTRIA|
|         Azerbaijan|    AZ|         AZERBAIJAN|
|            Bahamas|    BS|            BAHAMAS|
|            Bahrain

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

The model for this project is a relatively normalized relational database, where redundancies have been eliminated and information that wasn't useful to the kinds of questions I wanted to ask was not used. Multiple joins are often necessary to answer questions with this data, and thus I am not intending this data to function in a business environment, where queries and decisions may need to happen fast. Instead, this database serves as a ground truth dataset, a snapshot in time, which I intended to be used for personal projects or academic research. However, this database may be converted into a snowflake schema focused around immigration arrivals as a fact table that grows over time should the need arise.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

After the loading and cleaning steps outlined above, the arrivals data must be aggregated by our variables of interest (i.e. country of residency, month, year, port of entry, and visa category) and each country available should have the temperatures of its potentially many cities aggregated so there is just 1 mean temperature per country per month. Data that is not aggregated in this way is less useful for the questions I'd like to help answer, mostly about how climate change may influence travel over time. All tables should, at the end of this pipeline, have a country code listed in them, and they should match throughout the dataset so joins are possible.

Once our aggregations are complete and joins are set up, our Spark dataframes should be converted to Pandas dataframes and their columns' data types specified so the data is more easily inserted into the Postgres database. This Postgres database needs to start from scratch -- if a database already exists, it and its tables are deleted so that clean table creations and inserts can happen.

After our inserts, the database is committed, and we can check that everything ran as expected by comparing the outcomes of `SELECT COUNT(*)` statements to the values provided by our Pandas dataframes.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [16]:
# immigration data

# group by columns of interest
immigration_grouped = immigration_df.groupBy(['i94yr', 'i94mon', 'i94res','i94port','i94visa']).count()
immigration_grouped.printSchema()

# reorder columns for a cleaner table
immigration_grouped = immigration_grouped.select(['i94res', 'i94visa', 'count', 'i94yr', 'i94mon', 'i94port']) \
    .withColumnRenamed('i94res', 'country_id') \
    .withColumnRenamed('i94visa', 'visa_type') \
    .withColumnRenamed('i94yr', 'year') \
    .withColumnRenamed('i94mon', 'month') \
    .withColumnRenamed('i94port', 'port')
immigration_grouped.printSchema()

root
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: long (nullable = false)

root
 |-- country_id: double (nullable = true)
 |-- visa_type: double (nullable = true)
 |-- count: long (nullable = false)
 |-- year: double (nullable = true)
 |-- month: double (nullable = true)
 |-- port: string (nullable = true)



In [17]:
immigration_grouped.take(5)

[Row(country_id=110.0, visa_type=2.0, count=1, year=2016.0, month=11.0, port='MRC'),
 Row(country_id=213.0, visa_type=1.0, count=849, year=2016.0, month=8.0, port='SEA'),
 Row(country_id=266.0, visa_type=1.0, count=302, year=2016.0, month=8.0, port='LOS'),
 Row(country_id=273.0, visa_type=1.0, count=154, year=2016.0, month=8.0, port='CHI'),
 Row(country_id=113.0, visa_type=1.0, count=2, year=2016.0, month=8.0, port='CIN')]

In [18]:
# temperature data

# convert timestamp from type object to datetime for easy year/month extraction
temp_cleaning = temp_cleaning.withColumn('month', date_format('dt', 'M'))
temp_cleaning = temp_cleaning.withColumn('year', date_format('dt', 'y'))

# find avg temp per country per month
temp_avg = temp_cleaning.select(['AverageTemperature', 'country', 'year', 'month']) \
    .groupBy(['country', 'year', 'month']).mean()
temp_avg.show()

+-------+----+-----+-----------------------+
|country|year|month|avg(AverageTemperature)|
+-------+----+-----+-----------------------+
|DENMARK|1749|    7|                   null|
|DENMARK|1783|    5|                12.3845|
|DENMARK|1791|   10|                  7.784|
|DENMARK|1803|   10|      7.717499999999999|
|DENMARK|1822|    3|                 5.2275|
|DENMARK|1828|   12|                 1.5605|
|DENMARK|1829|    6|     15.796499999999998|
|DENMARK|1840|    3|     0.4474999999999999|
|DENMARK|1846|    1|                 0.5315|
|DENMARK|1890|   12|                 -2.749|
|DENMARK|1904|    7|     16.805500000000002|
|DENMARK|1909|    2|                -1.4415|
|DENMARK|1919|    9|     13.520499999999998|
|DENMARK|1935|    8|     16.619999999999997|
|DENMARK|1946|   12|    0.32899999999999996|
|DENMARK|1989|    5|                12.3165|
|DENMARK|2010|    9|     13.213999999999999|
| TURKEY|1746|    3|                   null|
| TURKEY|1755|    3|      6.710619047619048|
| TURKEY|1

In [19]:
# drop null avg temps, not useful
temp_avg = temp_avg.na.drop(subset='avg(AverageTemperature)')

# calculate F for Americans
temp_avg = temp_avg.withColumn('AverageTemperatureF', (col('avg(AverageTemperature)') * 9/5) + 32) \
    .withColumnRenamed('avg(AverageTemperature)', 'AverageTemperature')

# get country codes for temperature data table insertion & filtering
temp_avg_ccode = temp_avg.join(countries_cleaning, 'country')

# drop extra US code (407 wanted)
temp_avg_ccode = temp_avg_ccode.filter(temp_avg_ccode.code != 583)

# limit to the last 10 years of the dataset for relevance (2012 is max full year)
temp_avg_ccode = temp_avg_ccode.filter(temp_avg_ccode.year >= 2002)

# filter and reorder columns for cleaner table
temp_avg_ccode = temp_avg_ccode.select(['code', 'year', 'month', 'AverageTemperature', 'AverageTemperatureF']) \
        .withColumnRenamed('AverageTemperature', 'avg_temp') \
        .withColumnRenamed('AverageTemperature', 'avg_tempF') 

In [20]:
temp_avg_ccode.show()

+----+----+-----+------------------+-------------------+
|code|year|month|          avg_temp|AverageTemperatureF|
+----+----+-----+------------------+-------------------+
| 108|2010|    9|13.213999999999999| 55.785199999999996|
| 264|2005|    5|16.885254901960785| 62.393458823529414|
| 264|2007|    7|25.741058823529414|  78.33390588235295|
| 264|2013|    8|24.869686274509803|  76.76543529411765|
| 155|2006|   10| 6.699666666666667|            44.0594|
| 155|2012|    7|           23.6534|           74.57612|
| 129|2002|    9|          19.85918|          67.746524|
| 129|2002|   12|          10.79008|          51.422144|
| 112|2007|    3|6.6928888888888896| 44.047200000000004|
| 112|2007|   10|  8.76053086419753|  47.76895555555556|
| 112|2009|    8|19.184518518518516|  66.53213333333333|
| 388|2006|    6|26.399307692307694|  79.51875384615386|
| 388|2009|   10|26.565846153846152|  79.81852307692307|
| 135|2008|    1| 6.502838235294119| 43.705108823529415|
| 135|2010|    2|2.776926470588

In [21]:
# airport data

# slice to columns of interest, to establish where each airport is
codes_filtered = codes.select(['ident', 'municipality', 'iso_country', 'iso_region']) \
    .withColumnRenamed('iso_country', '2acode')

# merge with iso_codes and countries df to get country name and standard id across the dataset
codes_iso = codes_filtered.join(iso_cleaning, on='2acode')

codes_countries = codes_iso.join(countries_cleaning, on='country')

# drop extra US code (407 wanted)
codes_countries = codes_countries.filter(codes_countries.code != 583)

# filter and reorder columns for clean table
codes_countries = codes_countries.select(['ident', 'municipality', 'code', 'iso_region']) \
    .withColumnRenamed('ident', 'port') \
    .withColumnRenamed('code', 'country_id') \
    .withColumnRenamed('iso_region', 'region')
codes_countries.show()

# We're losing some data with countries info that doesn't entirely overlap

+-------+----------------+----------+------+
|   port|    municipality|country_id|region|
+-------+----------------+----------+------+
|    03N|   Utirik Island|       472|MH-UTI|
|    AAD|           Adado|       397| SO-GA|
|    ABE|        Cheshire|       135|GB-ENG|
|    ABL|       Hampshire|       135|GB-ENG|
|    ABP| Atkamba Mission|       441|PG-WPD|
|    ABW|            Abau|       441|PG-CPM|
|AD-0001|      La Massana|       102| AD-04|
| AD-ALV|Andorra La Vella|       102| AD-07|
|    ADC|       Andekombe|       441|PG-EHG|
|    ADJ|       Caithness|       135|GB-SCT|
|    ADV|        El Daein|       350| SD-DE|
|AE-0001|           Batha|       261| SA-04|
|AE-0002|           Dubai|       296| AE-DU|
|AE-0003|            null|       296| AE-DU|
|AE-0004|            null|       296| AE-DU|
|AE-0005|            null|       296| AE-UQ|
|AE-0006|       Jebel Ali|       296| AE-DU|
|AE-0007|            null|       296| AE-DU|
|AE-0008|            null|       296| AE-DU|
|AE-0009| 

In [22]:
codes_countries.printSchema()

root
 |-- port: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- country_id: integer (nullable = true)
 |-- region: string (nullable = true)



In [23]:
# countries data

# reorder columns for clean table
countries_cleaning = countries_cleaning.select(['code', 'country'])

In [24]:
countries_cleaning.printSchema()

root
 |-- code: integer (nullable = true)
 |-- country: string (nullable = true)



#### Convert tables to pandas for Postgres insertion

In [25]:
immigration_grouped.count()

224941

In [26]:
temp_avg_ccode.count()

21572

In [27]:
immigration_grouped_p = immigration_grouped.toPandas()

In [28]:
temp_avg_ccode_p = temp_avg_ccode.toPandas()

In [29]:
codes_countries_p = codes_countries.toPandas()
countries_cleaning_p = countries_cleaning.toPandas()

In [30]:
# convert columns to proper types
immigration_grouped_p['country_id'] = immigration_grouped_p['country_id'].astype('int32')
immigration_grouped_p['visa_type'] = immigration_grouped_p['visa_type'].astype('int32')
immigration_grouped_p['count'] = immigration_grouped_p['count'].astype('int32')
immigration_grouped_p['year'] = immigration_grouped_p['year'].astype('int32')
immigration_grouped_p['month'] = immigration_grouped_p['month'].astype('int32')

immigration_grouped_p.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 224941 entries, 0 to 224940
Data columns (total 6 columns):
country_id    224941 non-null int32
visa_type     224941 non-null int32
count         224941 non-null int32
year          224941 non-null int32
month         224941 non-null int32
port          224941 non-null object
dtypes: int32(5), object(1)
memory usage: 6.0+ MB


In [31]:
immigration_grouped_p.head()

Unnamed: 0,country_id,visa_type,count,year,month,port
0,110,2,1,2016,11,MRC
1,213,1,849,2016,8,SEA
2,266,1,302,2016,8,LOS
3,273,1,154,2016,8,CHI
4,113,1,2,2016,8,CIN


In [32]:
temp_avg_ccode_p['year'] = temp_avg_ccode_p['year'].astype('int32')
temp_avg_ccode_p['month'] = temp_avg_ccode_p['month'].astype('int32')

temp_avg_ccode_p.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 21572 entries, 0 to 21571
Data columns (total 5 columns):
code                   21572 non-null int32
year                   21572 non-null int32
month                  21572 non-null int32
avg_temp               21572 non-null float64
AverageTemperatureF    21572 non-null float64
dtypes: float64(2), int32(3)
memory usage: 589.9 KB


In [33]:
temp_avg_ccode_p.head()

Unnamed: 0,code,year,month,avg_temp,AverageTemperatureF
0,108,2010,9,13.214,55.7852
1,264,2005,5,16.885255,62.393459
2,264,2007,7,25.741059,78.333906
3,264,2013,8,24.869686,76.765435
4,155,2006,10,6.699667,44.0594


In [34]:
codes_countries_p.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 24566 entries, 0 to 24565
Data columns (total 4 columns):
port            24566 non-null object
municipality    20500 non-null object
country_id      24566 non-null int32
region          24566 non-null object
dtypes: int32(1), object(3)
memory usage: 671.8+ KB


In [35]:
codes_countries_p.head()

Unnamed: 0,port,municipality,country_id,region
0,03N,Utirik Island,472,MH-UTI
1,AAD,Adado,397,SO-GA
2,ABE,Cheshire,135,GB-ENG
3,ABL,Hampshire,135,GB-ENG
4,ABP,Atkamba Mission,441,PG-WPD


In [36]:
countries_cleaning_p.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 289 entries, 0 to 288
Data columns (total 2 columns):
code       289 non-null int32
country    289 non-null object
dtypes: int32(1), object(1)
memory usage: 3.5+ KB


In [37]:
countries_cleaning_p.head()

Unnamed: 0,code,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


#### Delete/create/insert into database (preliminary)

In [38]:
import psycopg2
from sql_queries import create_table_queries, drop_table_queries

In [39]:
# connect to default database
conn = psycopg2.connect("host=127.0.0.1 dbname=studentdb user=student password=student")
conn.set_session(autocommit=True)
cur = conn.cursor()

# create capstone database with UTF8 encoding based on fresh template0
cur.execute("DROP DATABASE IF EXISTS capstonedb")
cur.execute("CREATE DATABASE capstonedb WITH ENCODING 'utf8' TEMPLATE template0")

# close connection to default database
conn.close()    

# connect to capstone database
conn = psycopg2.connect("host=127.0.0.1 dbname=capstonedb user=student password=student")
cur = conn.cursor()

In [40]:
# DROP TABLES
arrivals_drop = "DROP TABLE IF EXISTS arrivals;"
airports_drop = "DROP TABLE IF EXISTS airports;"
countries_drop = "DROP TABLE IF EXISTS countries;"
temp_drop = "DROP TABLE IF EXISTS temp;"

drop_table_queries = [arrivals_drop, airports_drop, countries_drop, temp_drop]

for query in drop_table_queries:
    cur.execute(query)
    conn.commit()

In [41]:
# CREATE TABLES
arrivals_create = ("""CREATE TABLE IF NOT EXISTS arrivals (
arrival_id serial PRIMARY KEY,
country_id int,
visa_type int,
count int,
year int NOT NULL,
month int NOT NULL,
port varchar
);
""")

airports_create = ("""CREATE TABLE IF NOT EXISTS airports (
port varchar PRIMARY KEY,
municipality varchar,
country_id int,
region varchar
);
""")

countries_create = ("""CREATE TABLE IF NOT EXISTS countries (
country_id int PRIMARY KEY,
country_name varchar
);
""")

temp_create = ("""CREATE TABLE IF NOT EXISTS temp (
temp_id serial PRIMARY KEY,
country_id int,
year int NOT NULL,
month int NOT NULL,
avg_temp float,
avg_tempF float
);
""")

create_table_queries = [arrivals_create, airports_create, countries_create, temp_create]

for query in create_table_queries:
    cur.execute(query)
    conn.commit()

In [42]:
# INSERT RECORDS
arrivals_insert = """INSERT INTO arrivals 
(
country_id,
visa_type,
count,
month,
year,
port
) \
VALUES (%s, %s, %s, %s, %s, %s)
"""

airports_insert = """INSERT INTO airports 
(
port,
municipality,
country_id,
region
) \
VALUES (%s, %s, %s, %s)
"""

countries_insert = """INSERT INTO countries 
(
country_id,
country_name
) \
VALUES (%s, %s)
"""

temp_insert = """INSERT INTO temp 
(
country_id,
year,
month,
avg_temp,
avg_tempF
) \
VALUES (%s, %s, %s, %s, %s)
"""

In [43]:
# arrivals

for i, row in immigration_grouped_p.iterrows():
    cur.execute(arrivals_insert, list(row))

In [44]:
# airports

for i, row in codes_countries_p.iterrows():
    cur.execute(airports_insert, list(row))

In [45]:
# countries

for i, row in countries_cleaning_p.iterrows():
    cur.execute(countries_insert, list(row))

In [46]:
# temp

for i, row in temp_avg_ccode_p.iterrows():
    cur.execute(temp_insert, list(row))

In [47]:
conn.commit()
conn.close()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [48]:
import psycopg2

# if not already connected, connect to capstone database
conn = psycopg2.connect("host=127.0.0.1 dbname=capstonedb user=student password=student")
cur = conn.cursor()

In [49]:
# arrivals should have 224941 lines and should look like the source df

cur.execute("""SELECT COUNT(*) FROM arrivals;""")
print(cur.fetchone()[0])

cur.execute("""SELECT * FROM arrivals LIMIT 5;""")
print(cur.fetchall())

224941
[(1, 110, 2, 1, 11, 2016, 'MRC'), (2, 213, 1, 849, 8, 2016, 'SEA'), (3, 266, 1, 302, 8, 2016, 'LOS'), (4, 273, 1, 154, 8, 2016, 'CHI'), (5, 113, 1, 2, 8, 2016, 'CIN')]


In [50]:
# airports should have 24566 lines
cur.execute("""SELECT COUNT(*) FROM airports;""")
print(cur.fetchone()[0])

cur.execute("""SELECT * FROM airports LIMIT 5;""")
print(cur.fetchall())

24566
[('03N', 'Utirik Island', 472, 'MH-UTI'), ('AAD', 'Adado', 397, 'SO-GA'), ('ABE', 'Cheshire', 135, 'GB-ENG'), ('ABL', 'Hampshire', 135, 'GB-ENG'), ('ABP', 'Atkamba Mission', 441, 'PG-WPD')]


In [51]:
# countries should have 289 lines
cur.execute("""SELECT COUNT(*) FROM countries;""")
print(cur.fetchone()[0])

cur.execute("""SELECT * FROM countries LIMIT 5;""")
print(cur.fetchall())

289
[(582, 'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)'), (236, 'AFGHANISTAN'), (101, 'ALBANIA'), (316, 'ALGERIA'), (102, 'ANDORRA')]


In [52]:
# temp should have 21572 lines
cur.execute("""SELECT COUNT(*) FROM temp;""")
print(cur.fetchone()[0])

cur.execute("""SELECT * FROM temp LIMIT 5;""")
print(cur.fetchall())

21572
[(1, 108, 2010, 9, 13.214, 55.7852), (2, 264, 2005, 5, 16.8852549019608, 62.3934588235294), (3, 264, 2007, 7, 25.7410588235294, 78.333905882353), (4, 264, 2013, 8, 24.8696862745098, 76.7654352941177), (5, 155, 2006, 10, 6.69966666666667, 44.0594)]


In [53]:
# close the connection when done
conn.close()

#### 4.3 Data dictionary 
The data dictionary is provided at data_dictionary.tsv, and includes the variable name, the source, and description.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Tool choice

This is meant to be an infrequently-updated final dataset used for research purposes, and can be run fairly quickly with the use of Spark. The aggregated and clean dataframes resulting from the Spark pipeline are converted into Pandas dataframes and are then loaded into a Postgres database to allow for joins between the different tables. This setup allows the database to be most useful to people asking different questions of the data.

This notebook format is more accessible to a wider range of individuals who might use the data, and can give them some familiarity with the dataset and how it works before using it themselves.

#### Data updating

Arrivals data should be added to on a monthly basis -- this dataset is meant to be aggregated on a full months' worth of data and any incomplete months would not be comparable to the rest of the data. Temperature data may also be useful if added on a month-by-month basis per country, to track arrivals over time as they relate to the temperature at the traveler's residence.

##### Alternate scenarios

If the data was increased by 100x: The data reading and output from these pipelines could no longer be stored in memory even on a parallelized single machine. All Spark dataframe work would need to be be run with a cluster, such as AWS EMR.

If the pipelines were run on a daily basis by 7am: Airflow scheduling is called for in a use case such as this. An SLA may be provided to the DAG so its end time is monitored and does not pass 7am. Airflow may be combined with AWS EMR and the Apache Livy RESTful interface to more easily run Spark jobs in parallel.

If the database needed to be accessed by 100+ people: This is a perfect use case for the database being hosted on cloud-managed services such as AWS Redshift, which can allow the pool of connections and queries to be distributed among multiple servers and their CPUs. If the number of connections grows, whatever hardware is being used may be scaled up (more CPUs) or out (more servers) without investing in on-premises setup and maintenance.