# US Visitor Demographics by State
### Data Engineering Capstone Project

#### Project Summary
The goal of this project was to create an ETL pipeline that joins i94 Immigration data with U.S. Demographics, then enriches the data with historical temperature data and airport codes. This aggregation allows us to see if any of the following impact immigration patterns:

 * Warmer or cooler temperatures.
 * Population demographics of people.
 
The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Project Write Up

# Quick start
1. Edit 'dwh.cfg' and add your 'AWS Key', 'AWS Secret', 'S3 Bucket'.
2. Run 'stage_to_s3.py' to upload data to your S3 instance.
3. Run 'IaC.ipynb' to create your RedShift instance.
4. Run 'etl.py' to stage and ingest the data to RedShift.
![etl_py_success](./images/etl_py_success.png)

In [1]:
# Imports and installs
import boto3
import configparser
import os
import pandas as pd
import psycopg2
from pyspark.sql import SparkSession
from sql_queries import drop_table_queries \
                      , create_table_queries \
                      , copy_table_queries \
                      , insert_table_queries \
                      , staging_checks \
                      , insert_checks

config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['KEY']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['SECRET']

HOST = config.get("CLUSTER", "HOST")
DB_NAME = config.get("CLUSTER", "DB_NAME")
DB_USER = config.get("CLUSTER", "DB_USER")
DB_PASSWORD = config.get("CLUSTER", "DB_PASSWORD")
DB_PORT = config.get("CLUSTER", "DB_PORT")
ARN_IAM_ROLE = config.get("IAM_ROLE", "arn")

s3 = boto3.resource('s3',
                    region_name="us-west-2"
                   )

s3_bucket = config.get("S3","BUCKET")
s3_raw_data = 's3a://' + s3_bucket + '/raw'

pd.DataFrame({"Param":
                  ["HOST", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT", "ARN_IAM_ROLE", "S3_BUCKET", "S3_RAW_DATA"],
              "Value":
                  [HOST, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT, ARN_IAM_ROLE, s3_bucket, s3_raw_data]
             })

Unnamed: 0,Param,Value
0,HOST,dwhcluster.cck8vmzifioi.us-west-2.redshift.ama...
1,DB_NAME,dwhcapstone
2,DB_USER,dwhuser
3,DB_PASSWORD,Passw0rd
4,DB_PORT,5439
5,ARN_IAM_ROLE,'arn:aws:iam::473886897808:role/dwhRole'
6,S3_BUCKET,udacity-dend-capstone-nj
7,S3_RAW_DATA,s3a://udacity-dend-capstone-nj/raw


### Step 1: Scope the Project and Gather Data

#### Scope 
This ETL will extract raw data from S3 into staging tables in RedShift. The data will then be transformed and loaded into the final fact and dimension tables to allow for immigration data analysis.

#### Describe and Gather Data 
* **I94 Immigration Data:** This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. Source: US National Tourism and Trade Office https://travel.trade.gov/research/reports/i94/historical/2016.html See **/raw_data/I94_SAS_Labels_Descriptions.SAS** for a detailed description of the columns.

* **World Temperature Data:** This dataset came from Kaggle.

* **U.S. City Demographic Data:** This data comes from OpenSoft.

* **Airport Code Table:** This is a simple table of airport codes and corresponding cities.

### Load data to S3 (Optional after first run)

The below code only needs to be run once to prepare your S3 Bucket

In [2]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11") \
    .enableHiveSupport().getOrCreate()

In [3]:
i94_jan16_sub =spark.read.format('com.github.saurfang.sas.spark').load('/data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat')
i94_feb16_sub =spark.read.format('com.github.saurfang.sas.spark').load('/data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat')
i94_mar16_sub =spark.read.format('com.github.saurfang.sas.spark').load('/data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat')
i94_apr16_sub =spark.read.format('com.github.saurfang.sas.spark').load('/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
i94_may16_sub =spark.read.format('com.github.saurfang.sas.spark').load('/data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat')
i94_jun16_sub =spark.read.format('com.github.saurfang.sas.spark').load('/data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat')
i94_jul16_sub =spark.read.format('com.github.saurfang.sas.spark').load('/data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat')
i94_aug16_sub =spark.read.format('com.github.saurfang.sas.spark').load('/data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat')
i94_sep16_sub =spark.read.format('com.github.saurfang.sas.spark').load('/data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat')
i94_oct16_sub =spark.read.format('com.github.saurfang.sas.spark').load('/data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat')
i94_nov16_sub =spark.read.format('com.github.saurfang.sas.spark').load('/data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat')
i94_dec16_sub =spark.read.format('com.github.saurfang.sas.spark').load('/data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat')

In [4]:
columns_to_drop = ['validres','delete_days','delete_mexl','delete_dup','delete_visa','delete_recdup']
i94_jun16_sub = i94_jun16_sub.drop(*columns_to_drop)

In [5]:
df_i94 = [i94_jan16_sub,
          i94_feb16_sub,
          i94_mar16_sub,
          i94_apr16_sub,
          i94_may16_sub,
          i94_jun16_sub,
          i94_jul16_sub,
          i94_aug16_sub,
          i94_sep16_sub,
          i94_oct16_sub,
          i94_nov16_sub,
          i94_dec16_sub]

df_i94_names = ['i94_jan16_sub',
                'i94_feb16_sub',
                'i94_mar16_sub',
                'i94_apr16_sub',
                'i94_may16_sub',
                'i94_jun16_sub',
                'i94_jul16_sub',
                'i94_aug16_sub',
                'i94_sep16_sub',
                'i94_oct16_sub',
                'i94_nov16_sub',
                'i94_dec16_sub']

count=0
for i,df in enumerate(df_i94):
    df.write.mode('ignore').parquet(s3_raw_data + '/i94_immigration_data/' + df_i94_names[count])
    print('Upload of ' + df_i94_names[count] + '...complete')
    count += 1

Upload of i94_jan16_sub...complete
Upload of i94_feb16_sub...complete
Upload of i94_mar16_sub...complete
Upload of i94_apr16_sub...complete
Upload of i94_may16_sub...complete
Upload of i94_jun16_sub...complete
Upload of i94_jul16_sub...complete
Upload of i94_aug16_sub...complete
Upload of i94_sep16_sub...complete
Upload of i94_oct16_sub...complete
Upload of i94_nov16_sub...complete
Upload of i94_dec16_sub...complete


In [6]:
s3.meta.client.upload_file('/data2/GlobalLandTemperaturesByCity.csv', s3_bucket, 'raw/world_temperature_data/GlobalLandTemperaturesByCity.csv')
s3.meta.client.upload_file('./raw_data/us-cities-demographics.csv', s3_bucket, 'raw/us_city_demographic_data/us-cities-demographics.csv')
s3.meta.client.upload_file('./raw_data/airport-codes_csv.csv', s3_bucket, 'raw/airport_code_data/airport-codes_csv.csv')

In [7]:
s3.meta.client.upload_file('./lookup_data/i94addrl.csv', s3_bucket, 'lookup/i94addrl.csv')
s3.meta.client.upload_file('./lookup_data/i94cntyl.csv', s3_bucket, 'lookup/i94cntyl.csv')
s3.meta.client.upload_file('./lookup_data/i94model.csv', s3_bucket, 'lookup/i94model.csv')
s3.meta.client.upload_file('./lookup_data/i94prtl.csv', s3_bucket, 'lookup/i94prtl.csv')
s3.meta.client.upload_file('./lookup_data/i94prtl_enriched.csv', s3_bucket, 'lookup/i94prtl_enriched.csv')
s3.meta.client.upload_file('./lookup_data/i94visal.csv', s3_bucket, 'lookup/i94visal.csv')

### Step 2: Explore and Assess the Data

#### Cleaning Steps
**Airport Code Table**
1. state_cd - New field derived from 'iso_region'.
2. lattitude - New field derived from 'coordinates'.
3. longitude - New field derived from 'coordinates'.

**I94 Immigration Data**
1. i94cit - Normalize values not found in look up table to '999'.
2. i94res - Normalize values not found in look up table to '999'.
3. i94port - Normalize values not found in look up table to 'XXX'.
4. i94mode - Normalize values not found in look up table to '9'.
5. i94addr - Normalize values not found in look up table to '99'.
6. i94visa - Normalize values not found in look up table to '2'.

**U.S. City Demographic Data:**
1. 'American Indian and Alaska Native' - Derived from 'count'.
2. 'Asian' - Derived from 'count'.
3. 'Black or African-American' - Derived from 'count'.
4. 'Hispanic or Latino' - Derived from 'count'.
5. 'White' - Derived from 'count'.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
![table_diagram](./images/table_diagram.png)

#### 3.2 Mapping Out Data Pipelines
1. Upload data to S3.
2. Load data from S3 to RedShift Staging Tables.
3. Copy data from RedShift Staging Tables to Fact and Dimension Tables.

### Step 4: Run Pipelines to Model the Data and perform Data Quality Checks
#### 4.1 Create the data model
See 'sql_queries.py'

In [8]:
def drop_tables(cur, conn):
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()

def create_tables(cur, conn):
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()

def load_staging_tables(cur, conn):
    for query in copy_table_queries:
        cur.execute(query)
        conn.commit()

def insert_tables(cur, conn):
    for query in insert_table_queries:
        cur.execute(query)
        conn.commit()
        
def quality_checks(cur, conn, checks):
    passed_count = 0
    failed_count = 0
    passed_tests = []
    failed_tests = []

    for check in checks:
        chk_sql = check.get('check_sql')
        exp_result = check.get('expected_result')

        cur.execute(chk_sql)
        result = cur.fetchall()
    
        if exp_result != result[0][0]:
            failed_count += 1
            failed_tests.append((chk_sql, exp_result))
        else:
            passed_count += 1
            passed_tests.append((chk_sql, exp_result))
        
    if passed_count > 0:
        print("PASSED QUALITY CHECKS:")
        for passed_test in passed_tests:
            print(passed_test)
        
    if failed_count > 0:
        print("FAILED QUALITY CHECKS:")
        for failed_test in failed_tests:
            print(failed_test)
        raise Exception('Staging Data check FAILED!')

In [9]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}"\
                        .format(HOST, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT))
cur = conn.cursor()

print('Begin ETL:')

drop_tables(cur, conn)
print('\n' + 'drop_tables...COMPLETE')

create_tables(cur, conn)
print('\n' + 'create_tables...COMPLETE')

%time load_staging_tables(cur, conn) #Approximate Staging Time: 2min for ~50 Million Records with 2 Nodes
print('\n' + 'load_staging_tables...COMPLETE')

print('')
quality_checks(cur, conn, staging_checks)
print('\n' + 'staging_quality_checks...COMPLETE')

%time insert_tables(cur, conn) #Approximate Insert Time: 1min for ~50 Million Records with 2 Nodes
print('\n' + 'insert_tables...COMPLETE')

print('')
quality_checks(cur, conn, insert_checks)
print('\n' + 'insert_quality_checks...COMPLETE')

conn.close()
print('\n' + 'End of ETL' + '\n')

Begin ETL:

drop_tables...COMPLETE

create_tables...COMPLETE
CPU times: user 21.6 ms, sys: 5.45 ms, total: 27.1 ms
Wall time: 1min 57s

load_staging_tables...COMPLETE

PASSED QUALITY CHECKS:
('SELECT COUNT(*) FROM staging_airport_codes', 55075)
('SELECT COUNT(*) FROM staging_i94_immigration', 40790529)
('SELECT COUNT(*) FROM staging_us_city_demographics', 2891)
('SELECT COUNT(*) FROM staging_world_temperatures', 8599212)
('SELECT COUNT(*) FROM i94addrl', 55)
('SELECT COUNT(*) FROM i94cntyl', 289)
('SELECT COUNT(*) FROM i94model', 4)
('SELECT COUNT(*) FROM i94prtl', 697)
('SELECT COUNT(*) FROM i94visal', 3)

staging_quality_checks...COMPLETE
CPU times: user 16.3 ms, sys: 3.61 ms, total: 19.9 ms
Wall time: 1min 29s

insert_tables...COMPLETE

PASSED QUALITY CHECKS:
('SELECT COUNT(*) FROM airport_codes WHERE ident IS NULL', 0)
('SELECT COUNT(*) FROM i94_immigration', 40790529)
('SELECT COUNT(*) FROM us_city_demographics WHERE state_code IS NULL', 0)
('SELECT COUNT(*) FROM world_temperature

#### 4.3 Data dictionary 
See 'data_dictionary.pdf' for details on columns and data-types. The create table queries can be found in 'sql_queries.py' under the 'CREATE TABLE QUERIES' header.

#### Step 5: Complete Project Write Up
**1. Rationale for the choice of tools and technologies for the project.**
* For performance, I went with loading all data to S3, then ingesting into RedShift using the same AWS region.
* Python and Spark were used to locally process and upload to S3.
* RedShift was used to read from S3, clean, and process into the final fact & dimension tables.

**2. How often the data should be updated and why.**
* The data would be processed on a monthly basis to account for new i94 Immigration and weather data.

**3. How to approach the problem differently under the following scenarios:**
* The data was increased by 100x.
    * RedShift would be able to handle this increase with more nodes to better utilize RedShift features of: Massively parallel processing, Columnar data storage, Data compression, Query optimizer, Result caching, Compiled code.
* The data populates a dashboard that must be updated on a daily basis by 7am every day.
    * To avoid daily table truncate and refreshing, we would convert the ETL and model into a slowly changing dimension pipeline and only capture new data or existing data changes (see https://en.wikipedia.org/wiki/Slowly_changing_dimension for details).
* The database needed to be accessed by 100+ people.
    * The data would need to be moved to an in-memory database such as Spark to avoid read/write performance issues that are inherent with traditional databases including RedShift. We would still utilize S3 and RedShift to store the data, but it would be staged to Spark where it would be accessed by users quickly.