# Project Title
### Data Engineering Capstone Project

#### Project Summary
For this project my goal is to perform ETL (extraction, transformation, and loading)
of data from a variety of given sources into a backend (Redshift) database, creating a
data model that allows for analysis of immigration data, with various additional metadata
sources acting as dimensions to enrich the immigration facts.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
import configparser
import psycopg2

### Step 1: Scope the Project and Gather Data

#### Scope 
The overall scope of the project involves examining the given data sources, determining the best
transformations for that data, applying those, then loading it into a backend set of tables in
Amazon Redshift. With the loaded data, along with a representative data model for analysts to work
off of, this will act as an analytics platform to perform research around immigration into the
United States, answering questions such as:

1) what are the top areas/countries that are immigrating to the United States?
2) what types of vias are they immigrating with?
3) what are the top destinations for immigrants coming to the United States?
4) do certain immigrant nationalities prefer certain regions in the United States?
5) are there certain criteria (similar temperature, etc.) driving immigration patterns?

By building our this platform, all these questions, and more, can be researched.

#### Describe and Gather Data 
The datasets for this project consist of the following:

##### I94 Immigration Data
This data comes from the [I-94 Visitor Arrivals Program](https://travel.trade.gov/research/reports/i94/historical/2016.html), run by the National Travel and Tourism Office (NTTO). From their website:
"The National Travel and Tourism Office (NTTO) works cooperatively with the U.S. Department of Homeland Security (DHS)/U.S. Customs and Border Protection (CBP) to release I-94 Visitor Arrivals Program data, providing a comprehensive count of all visitors (overseas all travel modes plus Mexico air and sea) entering the United States."

In [2]:
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration_df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [None]:
# !!! This commands errors out in the environment. Without viable testing,
# utilizing Spark became untenable. Support has known about this issue
# for over a year, without resolution. !!! #

# spark = SparkSession.builder.\
#     config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").\
#     enableHiveSupport().getOrCreate()

This dataset will act as our main fact data. For this EDA (exploratory data analysis), we will be
only looking at April of 2016.

#### World Temperature Data
This dataset came from Kaggle, which in turn has sourced it from the Berkeley Earth project. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

From Berkeley Earth's site: "The Berkeley Earth averaging process generates a variety of Output data including a set of gridded temperature fields, regional averages, and bias-corrected station data. Source data consists of the raw temperature reports that form the foundation of our averaging system. Source observations are provided as originally reported and will contain many quality control and redundancy issues. Intermediate data is constructed from the source data by merging redundant records, identifying a variety of quality control problems, and creating monthly averages from daily reports when necessary."

In [3]:
global_temp_df = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')

#### U.S. City Demographic Data
This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

From OpenSoft's site: "This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. 

This data comes from the US Census Bureau's 2015 American Community Survey."

In [4]:
us_city_demo_df = pd.read_csv('us-cities-demographics.csv', sep=';')

#### Airport Code Table
This is a simple table of airport codes and corresponding cities, sourced from datahub.io. It comes from [here](https://datahub.io/core/airport-codes#data). 

From their website: "'airport-codes.csv' contains the list of all airport codes, the attributes are identified in datapackage description. Some of the columns contain attributes identifying airport locations, other codes (IATA, local if exist) that are relevant to identification of an airport.

Original source url is http://ourairports.com/data/airports.csv (stored in archive/data.csv)"

In [5]:
airport_df = pd.read_csv('airport-codes_csv.csv')

#### SAS Label Descriptions

By utilizing this text file, we can come up with a number of dimensions surrounding the
different codes present in the I94 dataset. By building these out, we save the analyst
(or, further down the analytics pipeline, a business user reading a report) from having to
look up given codes for country of citizenship, etc.

As we can source multiple dimensions from the single text file, we'll build a function to
read a given set of rows, and process them into a dataset:

In [6]:
def create_df_from_lables(start_range, end_range, columns):
    """Parses value list from I94 SAS Label Descriptions, returns dataframe
    Args:
        value_list: list containing codes and descriptions
        columns (list): list of column names for returned data frame.
    Return:
        Dataframe
    """
    sas_labels = ''

    with open('I94_SAS_Labels_Descriptions.SAS') as f:
        sas_labels = f.read()

    value_list = sas_labels.split('\n')
    value_list = value_list[start_range:end_range]

    codes = []
    descriptions = []

    for line in value_list:

        if '=' in line:
            code, desc = line.split('=')
            code = code.strip()
            desc = desc.strip()

            if code[0] == "'":
                code = code[1:-1]

            if desc[0] == "'":
                desc = desc[1:-1]

            codes.append(code)
            descriptions.append(desc)

    final_list = list(zip(codes,descriptions))

    return_df = pd.DataFrame(final_list, columns=columns)

    return return_df

Utilizing this function, we can now build out additional dimensions surrounding
country, port of entry, travel mode, U.S. states, and visa type:

In [7]:
country_df = create_df_from_lables(9, 298, ['country_code', 'country_desc'])

ports_df = create_df_from_lables(302, 962, ['port_code', 'port_desc'])

travel_mode_df = create_df_from_lables(972, 976, ['travel_mode_code', 'travel_mode_desc'])

state_df = create_df_from_lables(981, 1036, ['state_code', 'state_full_name'])

visa_type_df = create_df_from_lables(1046, 1049, ['visa_type_code', 'visa_type_desc'])

### Step 2: Explore and Assess the Data

#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

##### I94 Immigration Data

In [9]:
print(immigration_df.count())
print(immigration_df.info())

pd.options.display.max_columns = None
immigration_df.head(50)

cicid       3096313
i94yr       3096313
i94mon      3096313
i94cit      3096313
i94res      3096313
i94port     3096313
arrdate     3096313
i94mode     3096074
i94addr     2943941
depdate     2953856
i94bir      3095511
i94visa     3096313
count       3096313
dtadfile    3096312
visapost    1215063
occup          8126
entdepa     3096075
entdepd     2957884
entdepu         392
matflag     2957884
biryear     3095511
dtaddto     3095836
gender      2682044
insnum       113708
airline     3012686
admnum      3096313
fltno       3076764
visatype    3096313
dtype: int64
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3096313 entries, 0 to 3096312
Data columns (total 28 columns):
cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port     object
arrdate     float64
i94mode     float64
i94addr     object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile    object
visapost    object
occup       object

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,57.0,1.0,1.0,20160401.0,,,O,O,,M,1959.0,09302016,,,AZ,92471040000.0,602.0,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,63.0,2.0,1.0,20160401.0,,,O,K,,M,1953.0,09302016,,,AZ,92471400000.0,602.0,B2
7,20.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,57.0,2.0,1.0,20160401.0,,,O,K,,M,1959.0,09302016,,,AZ,92471610000.0,602.0,B2
8,21.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20553.0,46.0,2.0,1.0,20160401.0,,,O,O,,M,1970.0,09302016,,,AZ,92470800000.0,602.0,B2
9,22.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20562.0,48.0,1.0,1.0,20160401.0,,,O,O,,M,1968.0,09302016,,,AZ,92478490000.0,608.0,B1


Dataframe count and info gives as the overall layout of the dataset, as well as the number of
records in total (3,096,313), as well as non-null counts for each of the given fields. Many of the
'important' fields (from a research perspective) as full populated, like immigrant citizenship and
residency, but others, like occupancy, are predominantly empty, limiting its usefulness in analytics.

We can also see that there are a number of codes in this dataset, for items like country of citizenship, 
that we should create dimension tables around. This will make it much easier for analysts to both analyze
and report on these features.

##### I94 Immigration Data - Countries

In [10]:
print(country_df.count())
print(country_df.info())

pd.options.display.max_columns = None
country_df.head()

country_code    289
country_desc    289
dtype: int64
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 289 entries, 0 to 288
Data columns (total 2 columns):
country_code    289 non-null object
country_desc    289 non-null object
dtypes: object(2)
memory usage: 4.6+ KB
None


Unnamed: 0,country_code,country_desc
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


This dimension will contain all the country codes from the I94 data,
as well as their corresponding country descriptions.

##### I94 Immigration Data - Ports

In [11]:
print(ports_df.count())
print(ports_df.info())

pd.options.display.max_columns = None
ports_df.head()

port_code    660
port_desc    660
dtype: int64
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 660 entries, 0 to 659
Data columns (total 2 columns):
port_code    660 non-null object
port_desc    660 non-null object
dtypes: object(2)
memory usage: 10.4+ KB
None


Unnamed: 0,port_code,port_desc
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


This dimension will contain all the port codes from the I94 data,
as well as their corresponding port descriptions.

##### I94 Immigration Data - Travel Mode

In [12]:
print(travel_mode_df.count())
print(travel_mode_df.info())

pd.options.display.max_columns = None
travel_mode_df.head()

travel_mode_code    4
travel_mode_desc    4
dtype: int64
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4 entries, 0 to 3
Data columns (total 2 columns):
travel_mode_code    4 non-null object
travel_mode_desc    4 non-null object
dtypes: object(2)
memory usage: 144.0+ bytes
None


Unnamed: 0,travel_mode_code,travel_mode_desc
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported'


This dimension will contain all the mode of travel code from the 
I94 data, as well as their corresponding mode descriptions.

##### I94 Immigration Data - U.S. States

In [13]:
print(state_df.count())
print(state_df.info())

pd.options.display.max_columns = None
state_df.head()

state_code         55
state_full_name    55
dtype: int64
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55 entries, 0 to 54
Data columns (total 2 columns):
state_code         55 non-null object
state_full_name    55 non-null object
dtypes: object(2)
memory usage: 960.0+ bytes
None


Unnamed: 0,state_code,state_full_name
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


This dimension will contain all the state codes from the I94 data,
as well as their corresponding state descriptions.

##### I94 Immigration Data - Visa Type

In [14]:
print(visa_type_df.count())
print(visa_type_df.info())

pd.options.display.max_columns = None
visa_type_df.head()

visa_type_code    3
visa_type_desc    3
dtype: int64
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 2 columns):
visa_type_code    3 non-null object
visa_type_desc    3 non-null object
dtypes: object(2)
memory usage: 128.0+ bytes
None


Unnamed: 0,visa_type_code,visa_type_desc
0,1,Business
1,2,Pleasure
2,3,Student


This dimension will contain all the visa codes from the I94 data,
as well as their corresponding visa descriptions.

##### Global Temperature Data

In [15]:
print(global_temp_df.count())
print(global_temp_df.info())

pd.options.display.max_columns = None
global_temp_df.head(20)

dt                               8599212
AverageTemperature               8235082
AverageTemperatureUncertainty    8235082
City                             8599212
Country                          8599212
Latitude                         8599212
Longitude                        8599212
dtype: int64
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB
None


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


Dataframe count and info gives as the overall layout of the dataset, as well as the number of
records in total (8,599,212), as well as non-null counts for each of the given fields. Many of the
'important' fields (from a research perspective) as full populated, like city and country, while others are into the 95% range for population.



For our needs, this dataset is far too vast, from a temporal perspective, for what we're trying to analyze. Even taking into account things like temperature trending over time driving immigration, the fact that the data goes back to the mid-1700s makes the set too unweildly for fast analytics. While we could argue almost any date, limiting the data to the 1970s forwards shrinks it by quite a bit (XXX record,only XX% of the original size), but also give 50+ of trending data.

#### U.S. City Demographic Data

In [16]:
print(us_city_demo_df.info())

pd.options.display.max_columns = None
us_city_demo_df.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB
None


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


Looking at this demographic data, we can see that almost all of the fields in the set are populated. 

#### Airport Code Table

In [17]:
print(airport_df.count())
print(airport_df.info())

pd.options.display.max_columns = None
airport_df.sample(n=50)

ident           55075
type            55075
name            55075
elevation_ft    48069
continent       27356
iso_country     54828
iso_region      55075
municipality    49399
gps_code        41030
iata_code        9189
local_code      28686
coordinates     55075
dtype: int64
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB
None


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
43736,SJTC,medium_airport,Bauru - Arealva Airport,1949.0,SA,BR,BR-SP,Bauru,SBAE,JTC,,"-49.0502866745, -22.166859140899998"
13840,CAS2,small_airport,Moose Lake (Lodge) Airport,3500.0,,CA,CA-BC,Moose Lake,CAS2,,CAS2,"-125.40899658203125, 53.073299407958984"
38896,PL-0096,small_airport,CheÅmÅ¼yca Airstrip,323.0,EU,PL,PL-U-A,CheÅmÅ¼yca,,,,"19.4611503, 53.6704867"
54491,YXQU,heliport,Quirindi Hospital Helipad,1365.0,OC,AU,AU-NSW,Quirindi,YXQU,,YXQU,"150.676606, -31.50107"
15609,CPD5,seaplane_base,Paudash Lake Seaplane Base,1126.0,,CA,CA-ON,Paudash,CPD5,,CPD5,"-78.036835, 44.959206"
35303,MY54,small_airport,Cloverleaf-East Bemidji Airport,1445.0,,US,US-MN,Bemidji,MY54,,MY54,"-94.81220245361328, 47.43830108642578"
27686,KJKL,medium_airport,Julian Carroll Airport,1381.0,,US,US-KY,Jackson,KJKL,,JKL,"-83.31729888916016, 37.59389877319336"
30061,KT03,small_airport,Tuba City Airport,4513.0,,US,US-AZ,Tuba City,KT03,TBC,T03,"-111.383003235, 36.0928001404"
12809,BW-0019,small_airport,Etsha Airport,3205.0,AF,BW,BW-NW,Etsha,,,,"22.261864, -19.113219"
50864,VE-0102,small_airport,Kanaripo Airport,302.0,SA,VE,VE-Z,,,,,"-66.94999694824219, 4.066667079925537"


Looking at this set, we can see that the majority of the airport fields are populated. We can
also see that this file contains international airports as well, while the immigration data only
looks at US ports on ingress. Based on this, we will want to subset this dataset to include only
US airports before loading it into Redshift.

#### Cleaning Steps
In order to make the data as usuable as possible, as well as reduce the data footprint 
(giving us both speed of query as well as storage cost savings), we'll want to remove the
columns that won't be adding much value to our future analytics.

In [18]:
# drop underpopulated columns from immigration data
immigration_df.drop(columns=["insnum", "entdepu", "occup", "visapost"], inplace=True)

In [19]:
# drop airports without a designated iata code (primary key)
airport_df.dropna(subset=['iata_code'], inplace=True)

In [20]:
# get only temp data after 1/1/1970
global_temp_subset_df = global_temp_df[(global_temp_df['dt'] > '1969-12-31')]

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
insert pic here
For this model, I chose a star schema, with our I94 immigration data as our fact, and the other
related data sets as our dimensions

#### 3.2 Mapping Out Data Pipelines
For our pipelines, I currently have Python doing the heavy lifting of ingesting the data
from the different data sets we have, including the 'sas7bdat' files. Given the sheer amount of
records we have (3+ million for one month of immigration data alone), I would typically be
leveraging Spark to ingest, as we've done in previous projects, but there was an unavoidable
bug with Spark in the Jupyter environment that Support was unable to mitigate, forcing me to
use straight Pandas instead.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [23]:
config = configparser.ConfigParser()
config.read('dwh.cfg')

conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

KeyboardInterrupt: 

In [None]:
fct_immigration_insert = """
INSERT INTO fct_immigration (cicid, year, month, cit, res, port, arrdate, travel_mode, addr, depdate, bir, visa, count,  
    dtadfile, entdepa, entdepd, matflag, biryear, dtaddto, gender, airline, admnum, fltno, visa_type)
    VALUES (%s, %s, %s, %s, %s, %s,%s, %s, %s,%s, %s, %s,%s, %s, %s,%s, %s, %s,%s, %s, %s,%s, %s, %s)"""

for index, row in immigration_df.iterrows():
    cur.execute(fct_immigration_insert, list(row.values))
    conn.commit()

In [None]:
dim_global_temp_insert = """
INSERT INTO dim_global_temp (avg_temp, avg_temp_unc, city, country, lat, long) VALUES (%s, %s, %s, %s, %s, %s)"""

for index, row in global_temp_df.iterrows():
    cur.execute(dim_global_temp_insert, list(row.values))
    conn.commit()

In [None]:
dim_us_city_demo_insert = """
INSERT INTO dim_us_city_demo (city, state, median_age, male_pop, female_pop, total_pop, veteran_cnt, foreign_cnt,
avg_household_sz, state_code, race, count) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""

for index, row in us_city_demo_df.iterrows():
    cur.execute(dim_us_city_demo_insert, list(row.values))
    conn.commit()

In [None]:
dim_airports_insert = """
INSERT INTO dim_airports (airport_id, type, name, elevation_ft, region, muni, gps_code, local_code, coords) 
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)"""

for index, row in immigration_df.iterrows():
    cur.execute(dim_airports_insert, list(row.values))
    conn.commit()

In [None]:
dim_i94_country_insert = """
INSERT INTO dim_i94_country (country_cd, country_desc) VALUES (%s, %s)"""

for index, row in country_df.iterrows():
    cur.execute(dim_i94_country_insert, list(row.values))
    conn.commit()

In [None]:
dim_i94_ports_insert = """
INSERT INTO dim_i94_ports (port_cd, port_desc) VALUES (%s, %s)"""

for index, row in ports_df.iterrows():
    cur.execute(dim_i94_ports_insert, list(row.values))
    conn.commit()

In [None]:
dim_i94_travel_mode_insert = """
INSERT INTO dim_i94_travel_mode (travel_mode_cd, travel_mode_desc) VALUES (%s, %s)"""

for index, row in travel_mode_df.iterrows():
    cur.execute(dim_i94_travel_mode_insert, list(row.values))
    conn.commit()

In [None]:
dim_i94_state_insert = """
INSERT INTO dim_i94_state (state_cd, state_full_nm) VALUES (%s, %s)"""

for index, row in state_df.iterrows():
    cur.execute(dim_i94_state_insert, list(row.values))
    conn.commit()

In [None]:
dim_i94_visa_type_insert = """
INSERT INTO dim_i94_visa_type (visa_type_cd, visa_type_desc) VALUES (%s, %s)"""

for index, row in visa_type_df.iterrows():
    cur.execute(dim_i94_visa_type_insert, list(row.values))
    conn.commit()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
cur.execute("SELECT COUNT(*) FROM fct_immigration")
conn.commit()
if cur.rowcount < 1:
    print("No data found in table fct_immigration")
    
cur.execute("SELECT COUNT(*) FROM dim_global_temp")
conn.commit()
if cur.rowcount < 1:
    print("No data found in table dim_global_temp")
    
cur.execute("SELECT COUNT(*) FROM dim_airports")
conn.commit()
if cur.rowcount < 1:
    print("No data found in table dim_airports")
    
cur.execute("SELECT COUNT(*) FROM dim_us_city_demo")
conn.commit()
if cur.rowcount < 1:
    print("No data found in table dim_us_city_demo")
    
cur.execute("SELECT COUNT(*) FROM dim_i94_country")
conn.commit()
if cur.rowcount < 1:
    print("No data found in table dim_i94_country")

cur.execute("SELECT COUNT(*) FROM dim_i94_ports")
conn.commit()
if cur.rowcount < 1:
    print("No data found in table dim_i94_ports")
    
cur.execute("SELECT COUNT(*) FROM dim_i94_travel_mode")
conn.commit()
if cur.rowcount < 1:
    print("No data found in table dim_i94_travel_mode")
    
cur.execute("SELECT COUNT(*) FROM dim_i94_state")
conn.commit()
if cur.rowcount < 1:
    print("No data found in table dim_i94_state")
    
cur.execute("SELECT COUNT(*) FROM dim_i94_visa_type")
conn.commit()
if cur.rowcount < 1:
    print("No data found in table dim_i94_visa_type")

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

From a scalability standpoint, Redshift / Spark / Pandas was the way to go for this project. The
ability to scale both the processing technology (Spark), as well as the backend database tectnology
(Redshift), meant that we would always have the ability to scale out, in terms of nodes, as well as
up, by leveraging the power of AWS to add additional compute power if necessary.

Because a lot of these datasets are time-based, and are not updated daily, I wouldn't suggest that
the ETL process be run on a scheduled basis. Rather, I would run an update when a given dataset
was updated at the source (i.e., a new month/year of immigration data is released, etc.). This would
both ensure that the data the analysts are working with is the most current available, as well as 
saving the business money by not having Spark clusters spun up needlessly.

If the data was increased 100x, or if 100+ people need to access the data, we are in the clear with
utilizing Redshift as our backend platform. Redshift's ability to scale means that we can store
petabytes of information, and handle almost unlimited user connections, without having to change
technologies, or indeed introduce any downtime. The ability to spin up additional nodes means
we can scale up, or down, as the business demands.

For the other listed scenario, where there is a dashboard that must be updated on a daily basis 
by 7am every day, I would suggest utilizing a job scheduling technology like Airflow. Any aggregation
or additional transformation jobs that needed to be run to populate the semantic layer that the
dashboard runs against could easily be handled by Airflow, running on an EC2 instance and reading
and writing from/to Redshift. This would also save the business on data egress charges.