## Us Weather and Immigragration Study
### Data Engineering Capstone Project

#### Project Summary
In this project an ETL Pipeline is created to to combine data from 4 sifferent data sets: immigration, temperature, demographics, and airports to asses immigration paterns in the US.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Imports
import pandas as pd
import os
import psycopg2
import boto3
from pyspark.sql import SparkSession
from io import StringIO # python3; python2: BytesIO 

In [3]:
# get environment variables for connecting to AWS
#AWS_SECRET = os.environ['AWS_SECRET']
#AWS_KEY = os.environ['AWS_KEY']
#DB_USER = os.environ['DB_USER']
#DB_PASSWORD = os.environ['DB_PASSWORD']
#ARN = os.environ['ARN']

AWS_SECRET = 'AKIAXBEQDW5I6X2BDYF7'
AWS_KEY = 'q4JwS2TEUaznc1+uLjcUFPR/XhsUrGchhZedCYEI'
DB = 'dev'
DB_USER = 'dwhuser'
DB_PASSWORD = 'dwhBub42'
HOST = 'redshift-cluster-1.ca7m8qui9aaj.us-west-2.redshift.amazonaws.com'
ARN='arn:aws:iam::483486054225:role/dwhRole'

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, immigration data sets are combined with airport and demographic data sets to evaluate flight patternd of immigrants into the United States.  The data is used in aggregation to evaluate the total number of immigrant visits by airport type, airport, state, and the majority ethnicity of the city.  First, the data sets are explored, cleaned, and uploaded to s3 storage.  Then the storage table are coppied to staging tables in Amazon Redshift.  Final, the staging tables are queried to make fact and dimension tables as well as tables with aggregate values.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

The following datasets were used for this project:

I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A single month of United states immigration data was used.  The data set contains imformation on immigration suych as information on the immigrants, the immigration status, and the trip information.  The data sets can be found here:
link - https://travel.trade.gov/research/reports/i94/historical/2016.html

World Temperature Data: This dataset came from Kaggle. It contains date, average temperature, temperature error, city, Country, lattitude, and longitude
link - https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data

US City Demograhphic Data: This data comes from OpenSoft.  This data contains demogrpahic imformation for US cities.
link - https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/

Airport Code Table: This dataset comes from datahub. This is a simple table of airport codes and corresponding cities.
link - https://datahub.io/core/airport-codes#data





### Read in datasets

In [None]:
# Read Temperature Data
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
pd_temp_df = pd.read_csv(fname)

In [None]:
# Create Spark Session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
#df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
#write to parquet
#df_spark.write.parquet("sas_data")

In [None]:
# Read Imigration Data
sprk_im_df = spark.read.parquet("sas_data")

In [None]:
# Read Demographic Data
pd_demographic_df = pd.read_csv('us-cities-demographics.csv')

In [34]:
# Read in Airport Data
pd_airport_df = pd.read_csv('airport-codes_csv.csv')

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [52]:
# helper function to write dfs to S3 storage for staging
def write_to_staging(df, bucket, csv_name):
    """Function that writes a df to csv in S3
    Args:
        df (dataframe): pandas dataframe
        bucket (str): S3 bucket name
        csv_name (str): name for output csv
    """
    csv_buffer = StringIO()
    df.to_csv(csv_buffer, index=False)
    s3_resource = boto3.resource('s3',
                                aws_access_key_id=AWS_SECRET, 
                                aws_secret_access_key=AWS_KEY, 
                                region_name='us-west-2')
    s3_resource.Object(bucket, csv_name).put(Body=csv_buffer.getvalue())

### Temperature

In [None]:
pd_temp_df.shape

In [None]:
pd_temp_df.columns

In [None]:
# countries = pd_temp_df['Country'].unique()
# countries.sort()
# countries

In [None]:
pd_temp_df = pd_temp_df[pd_temp_df['Country']=='United States']
pd_temp_df.shape

In [None]:
# remove row swith null values
pd_temp_df = pd_temp_df.dropna()
pd_temp_df.shape

In [None]:
pd_temp_df.head(2)

In [None]:
# format lat and lon
pd_temp_df['Latitude'] = pd_temp_df['Latitude'].str[:-1] 
pd_temp_df['Longitude'] = '-' + pd_temp_df['Longitude'].str[:-1] 

In [None]:
# convert from metric to english units
pd_temp_df['AverageTemperature'] = pd_temp_df['AverageTemperature'].apply(lambda x: (1.8*x)+32) 
pd_temp_df['AverageTemperatureUncertainty'] = pd_temp_df['AverageTemperatureUncertainty'].apply(lambda x: (1.8*x)+32) 

In [None]:
# remove country
pd_temp_df = pd_temp_df.drop('Country', axis=1)

In [None]:
pd_temp_df.head(2)

In [None]:
temperature_data_dict = {
                          'dt': 'date' , 
                          'AverageTemperature': 'average temperature in Ferenheit' , 
                          'AverageTemperatureUncertainty': 'temperature uncertainty in Ferenheit', 
                          'City': 'City', 
                          'Latitude': 'latitude', 
                          'Longitude': 'longitude'
                        }

In [None]:
pd_temp_df['dt']

#### We can see that the temperature data only goes to 2013, but the immigration data is from 2016 so we will not use the temperature data in future analysis

## Imigration

In [None]:
#for col in sprk_im_df.limit(1).toPandas().columns:
#    print(col)

In [None]:
# create a table view to query gainst
sprk_im_df.createOrReplaceTempView("immigration")

In [None]:
# we do not need to keep i94yr as it has a single value
years = spark.sql('''
    SELECT 
        DISTINCT i94yr
    FROM
        immigration
''')
years.toPandas()

In [None]:
# we do not need to keep i94month as it has a single value
months = spark.sql('''
    SELECT 
        DISTINCT i94mon
    FROM
        immigration
''')
months.toPandas()

In [None]:
i94visa = spark.sql('''
    SELECT 
        DISTINCT i94visa
    FROM
        immigration
''')
i94visa.toPandas()

In [None]:
# visatype = spark.sql('''
#     SELECT 
#         DISTINCT visatype
#     FROM
#         immigration
# ''')
# visatype.toPandas()

In [None]:
# Create a data dict for kept columns and use to remove unneccesary columns
immigration_data_dict = {'i94cit': 'city code' , 
                          'i94port': 'port code' , 
                          'i94mode':  'transportation code', 
                          'arrdate': 'arrival date',
                          'i94res': 'country code for immigrant', 
                          'i94addr': 'state code', 
                          'depDate': 'departure date',
                          'i94bir': 'immigrant age', 
                          'i94visa': 'visa type',
                          'gender': 'gender'}

imigration_keep_cols = list(immigration_data_dict.keys())

sprk_im_df = sprk_im_df.select(imigration_keep_cols)

sprk_im_df.limit(3).toPandas()



In [None]:
pd_im_df = sprk_im_df.toPandas()

In [None]:
pd_im_df.shape

In [None]:
# drop rows with null values
pd_im_df = pd_im_df.dropna()

In [None]:
pd_im_df['i94port'].unique()

In [None]:
# drop any duplicate rows
pd_im_df = pd_im_df.drop_duplicates()

In [None]:
# write the cleaned imigration to csv in staging
write_to_staging(pd_im_df, 'qscapstone', 'immigration.csv')

### Airport 

In [41]:
pd_airport_df.head(2)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,state
440,07FA,small_airport,Ocean Reef Club Airport,8.0,,US,US-FL,Key Largo,07FA,OCA,07FA,"-80.274803161621, 25.325399398804",FL
594,0AK,small_airport,Pilot Station Airport,305.0,,US,US-AK,Pilot Station,,PQS,0AK,"-162.899994, 61.934601",AK


In [40]:
# parse state out of iso_region
pd_airport_df['state'] = pd_airport_df['iso_region'].str[-2:]

In [36]:
pd_airport_df.shape

(55075, 12)

In [None]:
#pd_airport_df['iso_country'].unique()

In [42]:
pd_airport_df = pd_airport_df[(pd_airport_df['iso_country']=='US') & (pd_airport_df['iata_code'].notnull())]

In [43]:
pd_airport_df.shape

(2019, 13)

In [39]:
pd_airport_df.head(2)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
440,07FA,small_airport,Ocean Reef Club Airport,8.0,,US,US-FL,Key Largo,07FA,OCA,07FA,"-80.274803161621, 25.325399398804"
594,0AK,small_airport,Pilot Station Airport,305.0,,US,US-AK,Pilot Station,,PQS,0AK,"-162.899994, 61.934601"


In [44]:
pd_airport_df['same_id'] = pd_airport_df['ident'] == pd_airport_df['local_code']

In [45]:
pd_airport_df['same_id'].sum()

210

In [46]:
pd_airport_df[pd_airport_df['same_id']==False].head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,state,same_id
3143,2IG4,small_airport,Ed-Air Airport,426.0,,US,US-IN,Oaktown,I20,OTN,I20,"-87.4997024536, 38.851398468",IN,False
3643,2Z1,seaplane_base,Entrance Island Seaplane Base,0.0,,US,US-AK,Entrance Island,,HBH,,"-133.43848, 57.412201",AK,False
10470,AHT,closed,Amchitka Army Airfield,215.0,,US,US-AK,Amchitka Island,PAHT,AHT,,"179.259166667, 51.3777777778",AK,False
11498,ARX,closed,Asbury Park Neptune Air Terminal,95.0,,US,US-NJ,Asbury Park,,ARX,,"-74.0908333333, 40.2193055556",NJ,False
11676,AUS,closed,Austin Robert Mueller Municipal,,,US,US-TX,,KAUS,AUS,,"-97.6997852325, 30.2987223546",TX,False


In [47]:
im_ports = pd_im_df['i94port'].unique()
im_ports.sort()
im_ports

NameError: name 'pd_im_df' is not defined

In [None]:
iata_codes = pd_airport_df['iata_code'].unique()
iata_codes.sort()
iata_codes

In [None]:
# Here we can see that we can join the immigration data to the airport data on pd_im_df.i94port = pd_airport_df.iata_code
len(list(set(iata_codes).intersection(set(im_ports))))

In [None]:
pd_airport_df.head(1)

In [48]:
# split coordinates int lattitude and longitude
pd_airport_df['latitude'] = pd_airport_df['coordinates'].str.split(',').str[1]
pd_airport_df['longitude'] = pd_airport_df['coordinates'].str.split(',').str[0]

In [50]:
# drop unneccessary columns
air_data_dict = {
    'ident': 'pkey dentifier', 
    'type': 'type of airport', 
    'name': 'airport name', 
    'iso_region': 'airport region', 
    'state': 'state',
    'municipality': 'airport municipality', 
    'iata_code': 'code for airport, maps to i94port in imigration',
    'latitude': 'latitude',
    'longitude': 'longitude'  
}

air_keep_cols = list(air_data_dict.keys())
pd_airport_df = pd_airport_df[air_keep_cols]
pd_airport_df.head()

Unnamed: 0,ident,type,name,iso_region,state,municipality,iata_code,latitude,longitude
440,07FA,small_airport,Ocean Reef Club Airport,US-FL,FL,Key Largo,OCA,25.325399398804,-80.274803161621
594,0AK,small_airport,Pilot Station Airport,US-AK,AK,Pilot Station,PQS,61.934601,-162.899994
673,0CO2,small_airport,Crested Butte Airpark,US-CO,CO,Crested Butte,CSE,38.851918,-106.928341
1088,0TE7,small_airport,LBJ Ranch Airport,US-TX,TX,Johnson City,JCY,30.251800537100003,-98.6224975586
1402,13MA,small_airport,Metropolitan Airport,US-MA,MA,Palmer,PMX,42.2233009338,-72.31140136719999


In [53]:
# write airports to s3 for staging
pd_airport_df = pd_airport_df.drop_duplicates()
write_to_staging(pd_airport_df, 'qscapstone', 'airports.csv')

### Demographic 

In [None]:
pd_demographic_df.head(1)
# heare we can see that we will need to perform some string parsing 
# to get this dataset into a usable form

In [None]:
# parse out the table columns and reformat certain characters
col_string = pd_demographic_df.columns[0]
original_col = col_string
# split column names out
cols = col_string.split(';')
# replace spaces with underscores
for i in range(len(cols)):
    cols[i] = cols[i].replace(' ', '_')
    cols[i] = cols[i].replace('-', '_')

cols

In [None]:
# expand the single column into a dataframe
pd_demographic_df = pd_demographic_df[original_col].str.split(';',expand=True)
pd_demographic_df.columns = cols

In [None]:
# write demographics to csv in s3 storage
pd_demographic_df = pd_demographic_df.drop_duplicates()
write_to_staging(pd_demographic_df, 'qscapstone', 'demographics.csv')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The data model for this project will conform to a star schema where the facts table is the immigration data which is connected to dimension tables for airports and demographics.  The airports and demographics tables are used to perform aggregation to see how many immigrants visited via different airport types and airports as well as aggregation on the states they visited and aggregation on the majority ethnicity in the cities they visited.

#### 3.2 Mapping Out Data Pipelines
1) Clean data sets and create staging tables in s3 Bucket "qscapstone"
    immigration.csv
    airports.csv
    demographics.csv
    
2) Upload cleaned data to s3 storage
3) copy storage csv to staging tables in Amazon Redshift
3) Create immigration_facts facts table
3) Create airports_dimension table which joins to immigration_facts
4) Create demographics_dimension table which joins to immigration_facts
5) Perform aggregation on Airports
6) Perform aggregation on Demographics 
7) Run tests to ensure data integrity/pipline success

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [8]:
def connect_to_reshift(host, dbname, user, password, port):
    """Function that returns a psycopg2 db connection
    Args:
        host (str): host
        dbname (str): database
        password (str): user password
        port (int): 
    """
    connection_string = f'host={host} dbname={dbname} user={user} password={password} port={port}'
    conn = psycopg2.connect(connection_string)
    
    return conn       

In [9]:
def drop_tables(conn, tables):
    """Function that drops all tables
    Args:
        conn (connection): psycopg2 connection
    Returns:
        None
    """
    
    cur = conn.cursor()
    
    for table in tables:
        try:
            query = 'DROP TABLE ' + table
            cur.execute(query)
        except:
            conn.rollback()
        
    conn.commit()
    cur.close()

In [10]:
# tables = ['staging_airports', 'staging_demographics', 'staging_immigration']
# drop_tables(conn, ['airports_dimension', 'demographics_dimension', 'immigration_facts'])

In [75]:
def create_tables(conn):
    '''Function that creates tables in redshift
    Args:
    Returns:
        None
    '''
    
    # staging tables
    create_staging_immigration = '''
    CREATE TABLE IF NOT EXISTS staging_immigration(
        im_id integer identity(0,1) PRIMARY KEY,
        i94cit varchar, 
        i94port varchar NOT NULL, 
        i94mode varchar,
        arrdate varchar,
        i94res varchar,
        i94addr varchar,
        depDate varchar,
        i94bir varchar,
        i94visa varchar,
        gender varchar
    );
    '''
    
    create_staging_airports = '''
    CREATE TABLE IF NOT EXISTS staging_airports(
        ident varchar PRIMARY KEY,
        type varchar,
        name varchar, 
        iso_region varchar, 
        state varchar,
        municipality varchar, 
        iata_code varchar, 
        latitude float, 
        longitude float
    );
    '''
    
    create_staging_demographics = '''
    CREATE TABLE IF NOT EXISTS staging_demographics(
         city_id integer identity(0,1) PRIMARY KEY,
         City varchar,
         State varchar,
         Median_Age real,
         Male_Population integer,
         Female_Population integer,
         Total_Population integer,
         Number_of_Veterans integer,
         Foreign_born integer,
         Average_Household_Size real,
         State_Code varchar,
         Race varchar,
         Count integer,
         UNIQUE(City, State)
    );
    '''
    
    # facts table
    create_immigration_facts = '''
    CREATE TABLE IF NOT EXISTS immigration_facts(
        im_id integer PRIMARY KEY,
        i94port varchar NOT NULL,
        city_id integer NOT NULL,
        i94cit varchar, 
        i94mode varchar,
        arrdate varchar,
        i94res varchar,
        i94addr varchar,
        depDate varchar,
        i94bir varchar,
        i94visa varchar,
        gender varchar
    );
    '''
    
    # dimension tables
    create_airports_dimension = '''
    CREATE TABLE IF NOT EXISTS airports_dimension(
        im_id integer NOT NULL PRIMARY KEY,
        ident varchar NOT NULL,
        type varchar,
        name varchar, 
        iso_region varchar, 
        state varchar,
        municipality varchar, 
        iata_code varchar, 
        latitude float, 
        longitude float
    );
    '''
    
    create_demographics_dimension = '''
    CREATE TABLE IF NOT EXISTS demographics_dimension(
         im_id integer NOT NULL PRIMARY KEY,
         city_id integer NOT NULL,
         City varchar,
         State varchar,
         Median_Age real,
         Male_Population integer,
         Female_Population integer,
         Total_Population integer,
         Number_of_Veterans integer,
         Foreign_born integer,
         Average_Household_Size real,
         State_Code varchar,
         Race varchar,
         Count integer,
         UNIQUE(City, State)
    );
    '''
    
    # aggregations
    create_type_counts = '''
    CREATE TABLE IF NOT EXISTS type_counts(
        type varchar PRIMARY KEY,
        visit_counts integer NOT NULL
    )
    '''
    
    create_airport_counts = '''
    CREATE TABLE IF NOT EXISTS airport_counts(
        name varchar PRIMARY KEY,
        visit_counts integer NOT NULL
    )
    '''
    
    create_state_counts = '''
    CREATE TABLE IF NOT EXISTS state_counts(
        State_Code varchar PRIMARY KEY,
        visit_counts integer NOT NULL
    )
    '''
    
    create_race_counts = '''
    CREATE TABLE IF NOT EXISTS race_counts(
        Race varchar PRIMARY KEY,
        visit_counts integer NOT NULL
    )
    '''
    
    cur = conn.cursor()
    
    create_queries = [
                      create_staging_immigration, 
                      create_staging_airports, 
                      create_staging_demographics,
                      create_immigration_facts, 
                      create_airports_dimension, 
                      create_demographics_dimension,
                      create_type_counts,
                      create_airport_counts,
                      create_state_counts,
                      create_race_counts
    ]
    
    for query in create_queries:
        try:
            cur.execute(query)
        except:
            conn.rollback()
            print('error in making table: ', query)
            
            
    conn.commit()
    cur.close()

In [56]:
# conn = connect_to_reshift(HOST, DB, DB_USER, DB_PASSWORD, 5439)
# create_tables(conn)

In [57]:
# cur.execute('SElect * from staging_demographics')
# result = cur.fetchone()
# result

In [58]:
def copy_staging(conn, bucket, csv_files, tables):
    '''Function that copies csv files to staging tables
    Args:
        conn (connection): pyscopg2 connection
        bucket (str): aws bucket
        csv_files ([str]): list of csv_files
        tables ([str]): corresponding tables for copying
    Returns:
        None
    '''
    
    staging_copy = """
    COPY {}
    FROM '{}'
    IAM_ROLE '{}'
    REGION 'us-west-2'
    CSV IGNOREHEADER 1;
    """
    
    cur =conn.cursor()
                    
    for i in range(len(csv_files)):
        path = 's3://{}/{}'.format(bucket, csv_files[i])
        try:
            query = staging_copy.format(tables[i], path, ARN)
            cur.execute(query)
        except:
            conn.rollback()
            print('The following query failed: ', query)
    
    conn.commit()
    cur.close()

In [59]:
# conn = connect_to_reshift(HOST, DB, DB_USER, DB_PASSWORD, 5439)
# copy_staging(conn, 'qscapstone', ['airports.csv', 'demographics.csv', 'immigration.csv'], 
#              ['staging_airports', 'staging_demographics', 'staging_immigration'])

In [94]:
def insert_facts_table(conn):
    """Function that insertst facts table
    Args:
        conn (conection): psycopg2 connection
    Returns
        None
    """
    
    cur = conn.cursor()
    
    query = '''
    INSERT INTO immigration_facts
    (SELECT 
        si.im_id,
        si.i94port,
        sd.city_id,
        si.i94cit,
        si.i94mode,
        si.arrdate,
        si.i94res,
        si.i94addr,
        si.depDate,
        si.i94bir,
        si.i94visa,
        si.gender
    FROM
        staging_immigration AS si INNER JOIN 
        staging_airports AS sa ON si.i94port = sa.iata_code  AND sa.state = si.i94addr INNER JOIN
        staging_demographics AS sd ON sa.municipality = sd.city and sa.state = sd.state_code);
    '''
    
    try:
        cur.execute(query)
    except:
        conn.rollback()
        
    conn.commit()
    cur.close()   

In [61]:
# insert_facts_table(conn)

In [95]:
def insert_dimensions_table(conn):
    """Function that dimensions facts table
    Args:
        conn (connection): psycopg2 connection
    Returns
        None
    """
    
    airports_query = '''
    INSERT INTO airports_dimension 
    (
    SELECT 
        imf.im_id,
        sa.ident,
        sa.type,
        sa.name,
        sa.iso_region,
        sa.municipality,
        sa.iata_code,
        sa.latitude,
        sa.longitude
    FROM
        immigration_facts AS imf INNER JOIN staging_airports AS sa
        ON imf.i94port = sa.iata_code AND imf.i94addr = sa.state
    )
    '''
    
    demographics_query = '''
    INSERT INTO demographics_dimension
    (
    SELECT
        imf.im_id,
        sd.city_id,
        sd.City,
        sd.State,
        sd.Median_Age,
        sd.Male_Population,
        sd.Female_Population,
        sd.Total_Population,
        sd.Number_of_Veterans,
        sd.Foreign_born,
        sd.Average_Household_Size,
        sd.State_Code,
        sd.Race,
        sd.Count
    FROM
        immigration_facts AS imf INNER JOIN staging_demographics AS sd
        ON imf.city_id = sd.city_id AND imf.i94addr = sd.state_code
    )
    '''
    
    queries = [airports_query, demographics_query]
    
    cur = conn.cursor()
    for query in queries:
        try:
            cur.execute(query)
        except Exception as e:
            print('query failed: ', query)
            print(e)
            conn.rollback()
    
    conn.commit()
    cur.close()

In [71]:
def perform_aggregation(conn):
    """Function that performs aggregation on dimensions tables
    Args:
        conn (connection): psycopg2 connection
    Returns
        None
    """
    
    group_type = '''
    INSERT INTO type_counts
    (
    SELECT
        type,
        COUNT(*) as num_visits
    FROM airports_dimension
    GROUP BY type
    )
    '''
    
    group_name = '''
    INSERT INTO airport_counts
    (
    SELECT
        name,
        COUNT(*) as num_visits
    FROM airports_dimension
    GROUP BY name
    )
    '''
    
    group_state = '''
    INSERT INTO state_counts
    (
    SELECT
        State_Code,
        COUNT(*) as num_visits
    FROM demographics_dimension
    GROUP BY State_Code
    )
    '''
    
    group_race = '''
    INSERT INTO race_counts
    (
    SELECT
        Race,
        COUNT(*) as num_visits
    FROM demographics_dimension
    GROUP BY Race
    )
    '''
    
    queries = [group_type, group_name, group_state, group_race]
    
    cur = conn.cursor()
    
    for query in queries:
        try:
            cur.execute(query)
        except:
            print('query failed: ', query)
            conn.rollback()
    
    conn.commit()
    cur.close()

In [64]:
# perform_aggregation(conn)

In [96]:
def Pipeline():
    '''Function to run a data pipeline
    Args:
        None
    Returns:
        None
    '''
    # connect to Redshift
    print('Connecting to Redshift...')
    conn = connect_to_reshift(HOST, DB, DB_USER, DB_PASSWORD, 5439) 
    
    # drop tables
    tables = [
        'staging_immigration',
        'staging_airports',
        'staging_demographics',
        'immigration_facts',
        'airports_dimension',
        'demographics_dimension',
        'type_counts', 
        'airport_counts', 
        'state_counts', 
        'race_counts'
    ]
    
    print('Dropping Tables...')
    drop_tables(conn, tables)
    
    print('Creating Tables...')
    # create tables
    create_tables(conn)
    
    print('Copying Staging Tables...')
    # copy staging tables
    copy_staging(conn, 'qscapstone', ['airports.csv', 'demographics.csv', 'immigration.csv'], 
             ['staging_airports', 'staging_demographics', 'staging_immigration'])
    
    print('Inserting Facts Table...')
    # insert facts table
    insert_facts_table(conn)
    
    print('Inserting Dimension Tables...')
    # insert dimension tables
    insert_dimensions_table(conn)
    
    print('Inserting Aggregate Tables...')
    # create aggregation tables
    perform_aggregation(conn)
    
    print('Done')

In [97]:
Pipeline()

Connecting to Redshift...
Dropping Tables...
Creating Tables...
Copying Staging Tables...
Inserting Facts Table...
Inserting Dimension Tables...
Inserting Aggregate Tables...
Done


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here
def check_data():
    '''Function to perform ETL
    Args:
        None
    Returns
        (bool): success or failure
    '''
    conn = connect_to_reshift(HOST, DB, DB_USER, DB_PASSWORD, 5439) 
    
    # check for rows in all tables
    tables = [
              'staging_airports', 
              'staging_demographics', 
              'staging_immigration',
              'immigration_facts',
              'airports_dimension',
              'demographics_dimension',
              'type_counts',
              'airport_counts',
              'state_counts',
              'race_counts'
             ]
    
    cur = conn.cursor()
    
    for table in tables:
        query = 'SELECT COUNT(*) FROM ' + table
        try:
            cur.execute(query)
            result = cur.fetchone()
            if(result[0] <= 0):
                print('table is missing rows: ', table)
                return False
        except Exception as e:
            print('query failed: ', query)
            print(e)
            conn.rollback()
            
    # make sure data isn't being repeated
    airports_dimension = 'SELECT COUNT(*) FROM airports_dimension'
    cur.execute(airports_dimension)
    airports_counts = cur.fetchone()[0]
    
    airports_staging = 'SELECT count(*) FROM staging_airports'
    cur.execute(airports_staging)
    airports_staging_counts = cur.fetchone()[0]
    
    if airports_counts < airports_staging_counts:
        print('Error in airport dimensions')
        return False
            
    cur.close()
    
    return True

In [113]:
check_data()

ProgrammingError: relation "airports_staging" does not exist


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

In [None]:
### Table Wise Data Dictionary
staging_immigration:
        im_id integer - primary key identifying immigration row
        i94cit varchar - city key
        i94port varchar - port
        i94mode varchar - i94mode
        arrdate varchar - arrival date
        i94res varchar - resident country code
        i94addr varchar - resident address
        depDate varchar - date
        i94bir varchar - birth year
        i94visa varchar - type of visa
        gender varchar - gender
   
staging_airports:
        ident varchar - primary key identifying airport
        type varchar - type of airport
        name varchar  - name of airport
        iso_region varchar - region code
        municipality varchar  - city
        iata_code varchar - airport code, used to join to immigration
        latitude float - lattitude
        longitude float - longitude
   
staging_demographics:
        city_id integer - primary key for cities
        City varchar - city name
        State varchar - state
        Median_Age real - median age
        Male_Population integer - numbe rof males
        Female_Population integer - number of females
        Total_Population integer - total population
        Number_of_Veterans integer - number of veterans
        Foreign_born integer - number of foreign born veterans
        Average_Household_Size real - average household size
        State_Code varchar - state code
        Race varchar - majority race
        Count integer - number of majority race
    
immigration_facts:
        im_id integer - primary key identifying immigration row
        i94cit varchar - city key
        i94port varchar - port
        city_id - id for city, joins to demographics
        i94mode varchar - i94mode
        arrdate varchar - arrival date
        i94res varchar - resident country code
        i94addr varchar - resident address
        depDate varchar - date
        i94bir varchar - birth year
        i94visa varchar - type of visa
        gender varchar - gender
    
airports_dimension:
        im_id - immigration id, joins to immigration
        ident varchar - primary key identifying airport
        type varchar - type of airport
        name varchar  - name of airport
        iso_region varchar - region code
        municipality varchar  - city
        iata_code varchar - airport code, used to join to immigration
        latitude float - lattitude
        longitude float - longitude
    
demographics_dimension:
        im_id - immigration id, joins to immigration
        city_id integer - primary key for cities
        City varchar - city name
        State varchar - state
        Median_Age real - median age
        Male_Population integer - numbe rof males
        Female_Population integer - number of females
        Total_Population integer - total population
        Number_of_Veterans integer - number of veterans
        Foreign_born integer - number of foreign born veterans
        Average_Household_Size real - average household size
        State_Code varchar - state code
        Race varchar - majority race
        Count integer - number of majority race
    
type_counts:
        type varchar - type of airport
        visit_counts integer - total number of visits
    
airport_counts:
        name varchar - airport name
        visit_counts integer - total number of visits 
    
state_counts:
        State_Code varchar - state code
        visit_counts integer - total number of visits 
    
race_counts:
        Race varchar - majority race in city
        visit_counts integer - total number of visits 

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

### Project Description:
In this project, data from four different data sets was cleaned, evaluated, and combined to make a database that can be used to asses immigration patterns in the united states.  In the first part of the project, each of these data sets were cleaned and then relevant data was uploaded as csv files to an s3 storage bucket to create a quick and efficient way to get the data into an amazon redshift database.  After uploading the data to s3 storage, a database was created in amazon Redshift.  Redshift was used to do its columnar storage which allows quick analytical quires to run.  The original csv data was inserted into staging tables.  Next, fact tables and dimmension tables were created by querying the staging tables.  After the Staging tables were made, aggregation analysis was run to examine the total number of immigrant visits relative to the type of airport, the airport name, the state, and the majority race of the city.

### Pipeline Scheduling

The immigration dataset for this project consists of a single month of air travel immigration data for the US.  Therefore, the pipeline should be run every month after the data sets have been collected.  Additional analytical queries can be run on the database in an adhoc manner whenever necessary in order to gain further insights into the data.

### If The Data Sets Changed 
#### The data was increased by 100x.

The s3 storage used should be sufficient to handle these data demmands.  The database would be fine because redshift is able to dynamically scale to handle the necessary workload.

#### The data populates a dashboard that must be updated on a daily basis by 7am every day.
In this case, we would want to use a data pipeline scheduler such as Airflow to schedule jobs to run at the end of each day.  Trial runs would need to be run to assess the timeframe of the pipeline.  Once a timeframe was established, SLAs could be used to ensure that the pipeline is running within the expected tie duration.  If the jobs ran past the SLA, emails could be sent to the DB admin to indicate that tere is a problem with the pipeline.

#### The database needed to be accessed by 100+ people.
Redshift should be able to scale and handle the load.  However, if it is not, Apache cassandra could be used instead.


