# Refugees in the Age of Gloabl Warming
### Data Engineering Capstone Project

#### Project Summary
This project focuses on monitoring refugee and population information around the world based on temperature changes over time.

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Installs
# pip3 install -U country_converter 
# pip3 install -U pandasql 
# pip3 install -U us

# Imports 
import pandas as pd
import numpy as np
import os
from datetime import datetime
from io import StringIO
import boto3
import psycopg2
import configparser
config = configparser.ConfigParser()
from sqlalchemy.engine import create_engine
# Source: https://github.com/konstantinstadler/country_converter
import country_converter as coco
from pandasql import sqldf
# Source: https://github.com/unitedstates/python-us
import us
# Source: https://stackoverflow.com/questions/23668427/pandas-three-way-joining-multiple-dataframes-on-columns
from functools import reduce

### Step 1: Scope the Project and Gather Data

#### Scope 
What is your end solution look like? What tools did you use? etc>

The plan is to build a data warehouse for analytical processes, so analysts can design recurring and ad hoc reports over time using SQL. There is a strong emphasis in ensuring the warehouse is easy to interpret, performant, and quality assured.
 
#### Data Sources and Content

There are four source datasets:
 1. City_temperature.csv
     - Summary: average daily temperature for all major cities in the world from 1995 - 2020
     - Source: University of Dayton - separate txt files available for each city [here](https://academic.udayton.edu/kissock/http/Weather/default.htm). The data is available for research and non-commercial purposes only. Refer to [this page](https://academic.udayton.edu/kissock/http/Weather/default.htm) for license.
     - Secondary source: SRK via Kaggle - [link](https://www.kaggle.com/sudalairajkumar/daily-temperature-of-major-cities)
 2. Country_population_total_long.csv
     - Summary: annual population counts by country from 1960 - 2017
     - Source: The World Bank - [link](https://data.worldbank.org/indicator/SP.POP.TOTL)
     - Secondary source: Devakumar kp via Kaggle - [link](https://www.kaggle.com/imdevskp/world-population-19602018?select=population_total_long.csv)
 3. UNdata_City_Population_20210315.csv
     - Summary: annual population counts by city from 1970 - 2020 (contains gaps in 1970's)
     - Source: UN Data - [link](https://data.un.org/Data.aspx?d=POP&f=tableCode%3A240)
 4. UNdata_Refugees_20210217.csv
     - Summary: annual refugee counts from 1975 - 2016 by country of residence and country of origin
     - Source: UN Data - [link](http://data.un.org/Data.aspx?d=UNHCR&f=indID%3aType-Ref)

### Read in Each Dataset

#### Temperature Data

In [2]:
temp_df = pd.read_csv('Data/temperature_data/city_temperature.csv', engine = 'python')
temp_df.head()

Unnamed: 0,Region,Country,State,City,Month,Day,Year,AvgTemperature
0,Africa,Algeria,,Algiers,1,1,1995,64.2
1,Africa,Algeria,,Algiers,1,2,1995,49.4
2,Africa,Algeria,,Algiers,1,3,1995,48.8
3,Africa,Algeria,,Algiers,1,4,1995,46.4
4,Africa,Algeria,,Algiers,1,5,1995,47.9


#### Population Counts by Country and Year

In [3]:
country_pop_df = pd.read_csv('Data/country_population_data/country_population_total_long.csv', engine = 'python')
country_pop_df.head()

Unnamed: 0,Country Name,Year,Count
0,Aruba,1960,54211
1,Afghanistan,1960,8996973
2,Angola,1960,5454933
3,Albania,1960,1608800
4,Andorra,1960,13411


#### Population Counts by City and Year

In [4]:
city_pop_df = pd.read_csv('Data/city_population_data/UNdata_City_Population_20210315.csv', engine = 'python')
city_pop_df.head()

Unnamed: 0,Country or Area,Year,Area,Sex,City,City type,Record Type,Reliability,Source Year,Value,Value Footnotes
0,Åland Islands,2019,Total,Both Sexes,MARIEHAMN,City proper,Estimate - de jure,"Final figure, complete",2020,11711.0,1
1,Åland Islands,2019,Total,Male,MARIEHAMN,City proper,Estimate - de jure,"Final figure, complete",2020,5606.0,1
2,Åland Islands,2019,Total,Female,MARIEHAMN,City proper,Estimate - de jure,"Final figure, complete",2020,6105.0,1
3,Åland Islands,2018,Total,Both Sexes,MARIEHAMN,City proper,Estimate - de jure,"Final figure, complete",2019,11709.0,1
4,Åland Islands,2018,Total,Male,MARIEHAMN,City proper,Estimate - de jure,"Final figure, complete",2019,5620.5,1


#### Refugee Counts by Year, Country of Residence, and Country of Origin

In [5]:
refugee_df = pd.read_csv('Data/refugee_data/UNdata_Refugees_20210317.csv', engine = 'python')
refugee_df.head()

Unnamed: 0,Country or territory of asylum or residence,Country or territory of origin,Year,Refugees,Refugees assisted by UNHCR,Total refugees and people in refugee-like situations,Total refugees and people in refugee-like situations assisted by UNHCR
0,Afghanistan,Iraq,2016,1.0,1.0,1.0,1.0
1,Afghanistan,Islamic Rep. of Iran,2016,33.0,33.0,33.0,33.0
2,Afghanistan,Pakistan,2016,59737.0,59737.0,59737.0,59737.0
3,Albania,China,2016,11.0,11.0,11.0,11.0
4,Albania,Dem. Rep. of the Congo,2016,3.0,3.0,3.0,3.0


### Step 2: Explore, Assess, and Clean the Data

#### Cleaning Steps Based on Exploring the Data in the Proceeding Step
Document steps necessary to clean the data

In [6]:
# Cleaning steps based on exploring the data in the proceeding step

# Temperature data

# There are records with a day value of 0 and year values of 200 and 201, 
# and missing termperatuers are represented as -99. Exclude all of them.
temp_df = temp_df[(temp_df.Day > 0) & (temp_df.Year >= 1995) & (temp_df.AvgTemperature != -99)]

# Add a date field
temp_df['Date'] = pd.to_datetime(temp_df[["Year", "Month", "Day"]]).dt.date

# Add a date key
temp_df['DateKey'] = temp_df.Date.apply(lambda x: x.strftime('%Y%m%d'))

# Correct spelling
temp_df.Country[temp_df.Country == 'Equador'] = 'Ecuador'

# Create a state field that contains no null values so it can be grouped on
temp_df['StateNoNull'] = temp_df['State'].mask(pd.isnull, 'State')

# There are records with duplicate content, but two different temperatures listed.
# Replace exisitng average temperature field with max to resolve duplication.
# Source: https://laptrinhx.com/sql-like-window-functions-in-pandas-1608955182/
temp_df['AvgTemperature'] = temp_df.groupby(['Country', 'StateNoNull', 'City', 'Date'])['AvgTemperature']\
                        .transform('max')

# Drop state field that contains no null values
temp_df = temp_df.drop(columns=['StateNoNull'])

# Drop duplicates
temp_df = temp_df.drop_duplicates()

# Population Counts by Country and Year

# Rename columns to be more descriptive
country_pop_df.columns = ['Country', 'Year', 'Country_Population']

country_pop_df = country_pop_df.drop_duplicates()

# Population Counts by City and Year

# Filter on sex to only include both sexes since other sources don't include this breakdown.
# Note I verfied there are 4,751 distinct cities and all cities have a both sexes row
city_pop_df[['City']].nunique()
city_pop_df[['Sex','City']].groupby(['Sex']).nunique()
city_pop_df = city_pop_df[city_pop_df.Sex == 'Both Sexes']

# By filtering the sex breakdown to one value, the field can be dropped
city_pop_df = city_pop_df.drop(columns=['Sex'])

# Check how many inputs are in the Area column
city_pop_df[['Area']].drop_duplicates()

# Remove Area coulmn with there being only one input
city_pop_df = city_pop_df.drop(columns=['Area'])

# Change population format to integer
city_pop_df['Value'] = city_pop_df['Value'].apply(np.int64)

# Rename columns for naming consistencies
city_pop_df.columns = ['Country',
                       'Year',
                       'City',
                       'City_Type',
                       'Record_Type',
                       'Reliability',
                       'Source_Year',
                       'City_Population',
                       'Population_Notes']

# Drop duplicates
city_pop_df = city_pop_df.drop_duplicates()

# Refugee Counts by Year, Country of Residence, and Country of Origin

# Rename columns for naming consistencies
refugee_df.columns = ['Asylum_Country',
                      'Origin_Country',
                      'Year',
                      'Refugees',
                      'Refugees_Assisted_by_UNHCR',
                      'Refugee_Like_Population',
                      'Refugee_Like_Population_Assisted_by_UNHCR']

# Simplify name
refugee_df.Asylum_Country[refugee_df.Asylum_Country == 'Serbia (and Kosovo: S/RES/1244 (1999))'] = 'Serbia'
refugee_df.Origin_Country[refugee_df.Origin_Country == 'Serbia (and Kosovo: S/RES/1244 (1999))'] = 'Serbia'

# Drop duplicates
refugee_df = refugee_df.drop_duplicates()

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  app.launch_new_instance()
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy


#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [7]:
def summary_stats(df, GroupByList, SelectList, AggList = ['min', 'max', 'nunique']):
    """
    Returns summary level information
    
    Inputs:
    df - dataframe
    GroupByList - columns to summarize on
    SelectList - columns to return
    AggList - how to summarize the results
    """
    return df.groupby(GroupByList)[SelectList].agg(AggList).head(50)

# Temperature data

# Note that there are seven regions
summary_stats(df = temp_df,
              GroupByList = ['Region'],
              SelectList = ['Date', 'AvgTemperature'])

# Very consistent going back to 1995
summary_stats(df = temp_df,
              GroupByList = ['Year'],
              SelectList = ['City'],
              AggList = ['nunique'])

# Country populations over time data

summary_stats(df = country_pop_df,
              GroupByList = ['Country'],
              SelectList = ['Year', 'Country_Population'])

# Very consistent going back to 1960
summary_stats(df = country_pop_df,
              GroupByList = ['Year'],
              SelectList = ['Country'],
              AggList = ['nunique'])

# City populations over time data

# There are gaps in time.
summary_stats(df = city_pop_df,
              GroupByList = ['Country'],
              SelectList = ['Year', 'City_Population'])

# By year 2000, data collection is much more complete accross cities
summary_stats(df = city_pop_df,
              GroupByList = ['Year'],
              SelectList = ['City'],
              AggList = ['nunique'])

# Refugee Counts by Year, Country of Residence, and Country of Origin

# Takeaways from the following three comparisons:
# UNHCR related counts are populated substantially less
# Refugees vs. refugee-like: they differ slightly
summary_stats(df = refugee_df,
              GroupByList = ['Asylum_Country'],
              SelectList = ['Refugees', 'Refugees_Assisted_by_UNHCR'])


summary_stats(df = refugee_df,
              GroupByList = ['Asylum_Country'],
              SelectList = ['Refugee_Like_Population', 'Refugee_Like_Population_Assisted_by_UNHCR'])


summary_stats(df = refugee_df,
              GroupByList = ['Asylum_Country'],
              SelectList = ['Refugee_Like_Population', 'Refugees'])

# There are gaps in time.
summary_stats(df = refugee_df,
              GroupByList = ['Asylum_Country'],
              SelectList = ['Year'])

# Seems consistent by year, especially considering new countries forming over time
summary_stats(df = refugee_df,
              GroupByList = ['Year'],
              SelectList = ['Asylum_Country'],
              AggList = ['nunique'])


Unnamed: 0_level_0,Asylum_Country
Unnamed: 0_level_1,nunique
Year,Unnamed: 1_level_2
1975,50
1976,53
1977,72
1978,82
1979,88
1980,90
1981,92
1982,94
1983,97
1984,96


In [8]:
def duplicate_check(df, DuplicateList, SortList):
    """
    Returns how consistent information is populated over time
    
    Inputs:
    df - dataframe
    DuplicateList - columns to check for duplicates
    SortList - columns to sort by
    """
    return df[df.duplicated(DuplicateList, keep=False)].sort_values(by = SortList)

# Originally 94 duplicates, but after fix the above, no duplicates remain
duplicate_check(df = temp_df,
                DuplicateList = ['State', 'City', 'Date'],
                SortList = ['City', 'Date'])

# No duplicates
duplicate_check(df = country_pop_df,
                DuplicateList = ['Country', 'Year'],
                SortList = ['Country', 'Year'])

# No duplicates
duplicate_check(df = city_pop_df,
                DuplicateList = ['City', 'Year', 'City_Type', 'Record_Type',
                                 'Reliability', 'Source_Year', 'Population_Notes'],
                SortList = ['City', 'Year'])

# No duplicates
duplicate_check(df = refugee_df,
                DuplicateList = ['Asylum_Country', 'Origin_Country', 'Year'],
                SortList = ['Asylum_Country', 'Origin_Country', 'Year'])


Unnamed: 0,Asylum_Country,Origin_Country,Year,Refugees,Refugees_Assisted_by_UNHCR,Refugee_Like_Population,Refugee_Like_Population_Assisted_by_UNHCR


In [9]:
def summary_stats_in_region(GroupBy, Region):
    """
    Returns summary information within a specified region
    
    Inputs:
    GroupBy - column to summarizer on
    Region - region to filter on
    
    Input Options:
    GroupBy: City, Country, State
    Region: Africa, Asia, Australia/South Pacific,
            Europe, Middle East, North America,
            South/Central America & Carribean
    """
    return temp_df[[GroupBy, 'Date', 'AvgTemperature']].where(temp_df.Region == Region) \
               .groupby([GroupBy]) \
               .agg(['min', 'max', 'nunique'])

# Note: there are some time gaps in cities.
summary_stats_in_region(GroupBy = 'Country', Region = 'South/Central America & Carribean')

Unnamed: 0_level_0,Date,Date,Date,AvgTemperature,AvgTemperature,AvgTemperature
Unnamed: 0_level_1,min,max,nunique,min,max,nunique
Country,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2
Argentina,1995-01-01,2020-05-13,9231,35.3,90.9,507
Bahamas,1995-01-01,2020-05-12,9161,58.7,91.8,293
Barbados,1995-01-01,2018-05-20,8349,74.2,88.0,120
Belize,1995-01-03,2020-05-12,8867,64.6,92.9,233
Bermuda,1995-01-01,2010-08-22,5558,51.1,85.4,307
Bolivia,1995-01-01,2020-05-13,9225,32.8,63.4,211
Brazil,1995-01-01,2020-05-13,9208,44.8,93.4,407
Colombia,1995-01-01,2020-05-13,9196,46.7,66.7,154
Costa Rica,1995-01-01,2020-05-13,9130,63.1,85.6,160
Cuba,1995-01-02,2020-05-12,9094,46.9,88.3,301


In [10]:
def null_check(df, ColumnList):
    """
    Returns rows that contain null values within coulmn list
    
    Inputs:
    df - dataframe
    ColumnList - columns to check for nulls
    """
    return df.loc[pd.isnull(df[ColumnList]).any(1),:]

# Only state has nulls, which is to be expected.
null_check(df = temp_df,
           ColumnList = ['Region', 'Country', 'City', 'Date', 'AvgTemperature'])

# No null values detected.
null_check(df = country_pop_df,
           ColumnList = ['Country', 'Year', 'Country_Population'])

# No null values detected.
null_check(df = city_pop_df,
           ColumnList = ['Country', 'Year', 'City', 'City_Type', 'Record_Type',
                         'Reliability', 'Source_Year', 'City_Population'])

# No unexpected nulls. Not all refugee counts populate, which is okay.
null_check(df = refugee_df,
           ColumnList = ['Asylum_Country', 'Origin_Country', 'Year',
                         'Refugees', 'Refugee_Like_Population'])


Unnamed: 0,Asylum_Country,Origin_Country,Year,Refugees,Refugees_Assisted_by_UNHCR,Refugee_Like_Population,Refugee_Like_Population_Assisted_by_UNHCR
2453,Israel,Dem. Rep. of the Congo,2016,,,208.0,50.0
2941,Malaysia,Kenya,2016,,,1.0,1.0
2953,Malaysia,Rep. of Moldova,2016,,,1.0,1.0
2966,Malaysia,United States,2016,,,1.0,1.0
2967,Malaysia,Viet Nam,2016,,,1.0,1.0
4043,Saudi Arabia,Liberia,2016,,,7.0,7.0
4091,Serbia,Various,2016,,,1150.0,1150.0
7889,Israel,Dem. Rep. of the Congo,2015,,,208.0,50.0
8361,Malaysia,Kenya,2015,,,1.0,1.0
9441,Saudi Arabia,Liberia,2015,,,7.0,7.0


#### Define a country naming standard across data sources

In [11]:
# Pull unique country and city values from each source data set
unique_refugee_origin = refugee_df[['Origin_Country']].drop_duplicates()

unique_refugee_asylum = refugee_df[['Asylum_Country']].drop_duplicates()

unique_citypop_country = city_pop_df[['Country']].drop_duplicates()

unique_countrypop = country_pop_df[['Country']].drop_duplicates()

unique_temp_country = temp_df[['Country']].drop_duplicates()

In [12]:
def country_standard(df, rename):
    """
    Rename column to be unique across data sources
    Convert dataframe to a list
    Create a standardized country name and add it to the dataframe
    Replace values 'not found' (default designation) with source value
    
    Inputs:
    df - dataframe
    rename - new name for the column
    """
    # Rename column to be unique across data sources
    df.columns = [rename]
    
    # Convert dataframe to a list
    country_list = df[rename].tolist()
    
    # Source: https://github.com/konstantinstadler/country_converter
    # Create a standardized country name and add it to the dataframe
    standard_names = coco.convert(names = country_list, to = 'name_short')
    df['CountryStandard'] = standard_names
    
    # Replace values 'not found' (default designation) with source value
    df.CountryStandard[df.CountryStandard == 'not found'] = df[rename]

country_standard(df = unique_countrypop, rename = 'Country_CountryPop')
country_standard(df = unique_temp_country, rename = 'Country_Temp')
country_standard(df = unique_citypop_country, rename = 'Country_CityPop')
country_standard(df = unique_refugee_asylum, rename = 'Asylum_Country')
country_standard(df = unique_refugee_origin, rename = 'Origin_Country')

Channel Islands not found in regex
Caribbean small states not found in regex
Pacific island small states not found in regex
Serbia-Montenegro not found in regex
Yugoslavia not found in regex
Various not found in regex
Stateless not found in regex
Various not found in regex
Tibetans not found in regex


In [13]:
# Full outer join dataframes for mapping purposes
# Source: https://stackoverflow.com/questions/23668427/pandas-three-way-joining-multiple-dataframes-on-columns
dfs = [unique_countrypop,
       unique_temp_country,
       unique_citypop_country,
       unique_refugee_asylum,
       unique_refugee_origin]

country_map = reduce(lambda left,right: pd.merge(left, right, on = 'CountryStandard', how = 'outer'), dfs)

country_map.head()

Unnamed: 0,Country_CountryPop,CountryStandard,Country_Temp,Country_CityPop,Asylum_Country,Origin_Country
0,Aruba,Aruba,,Aruba,Aruba,
1,Afghanistan,Afghanistan,,,Afghanistan,Afghanistan
2,Angola,Angola,,,Angola,Angola
3,Albania,Albania,Albania,Albania,Albania,Albania
4,Andorra,Andorra,,Andorra,,Andorra


#### Define a city and state naming standard across data sources

In [14]:
# Pull unique country, state, and city values from each applicable source data set
unique_citypop_city = city_pop_df[['Country', 'City']].drop_duplicates()

unique_citypop_city.columns = ['Country_CityPop', 'City_CityPop']

unique_temp_city = temp_df[['Country', 'State', 'City']].drop_duplicates()

unique_temp_city.columns = ['Country_Temp', 'State_Temp', 'City_Temp']

In [15]:
# Create state and abbreviation mapping dataframe
# Source: https://github.com/unitedstates/python-us

# Place state and abbreviation in a dictionary
state = us.states.mapping('abbr', 'name')

# Convert dictionary into a dataframe
state_abbr = pd.DataFrame(list(state.items()))
state_abbr.columns = ['StateAbbreviation', 'State']

In [16]:
# Add country standard and state info to both datasets containing cities
unique_temp_city = sqldf("""
    SELECT DISTINCT 
        country_map.CountryStandard,
        state_abbr.StateAbbreviation,
        temp.*
    FROM unique_temp_city as temp
    LEFT JOIN country_map
        ON country_map.Country_Temp = temp.Country_Temp
    LEFT JOIN state_abbr
        ON temp.State_Temp = state_abbr.State
    """)

unique_citypop_city = sqldf("""
    SELECT DISTINCT 
        country_map.CountryStandard,
        State, 
        StateAbbreviation,
        CityPop.*
    FROM unique_citypop_city AS CityPop
    LEFT JOIN country_map
        ON country_map.Country_CityPop = CityPop.Country_CityPop
    LEFT JOIN state_abbr
        ON SUBSTR(CityPop.City_CityPop, length(CityPop.City_CityPop) - 2, 2) = state_abbr.StateAbbreviation
            AND country_map.CountryStandard = 'United States'
    """)

unique_citypop_city.head()

Unnamed: 0,CountryStandard,State,StateAbbreviation,Country_CityPop,City_CityPop
0,Aland Islands,,,Åland Islands,MARIEHAMN
1,Albania,,,Albania,Durrës
2,Albania,,,Albania,TIRANA
3,Algeria,,,Algeria,Adrar
4,Algeria,,,Algeria,Ain Defla


In [17]:
# Build mapping for state, city, and country standards
# The termperature source joins to the city poulation source several times
# to populate the city field as best as possible without too much hardcoding.
temp_city_map = sqldf("""
    SELECT DISTINCT 
        unique_temp_city.CountryStandard as CountryStandard,
        unique_temp_city.Country_Temp,
        --unique_citypop_city.Country_CityPop,
        unique_temp_city.State_Temp as StateStandard,
        unique_temp_city.State_Temp,
        --COALESCE(CityUS.State, CityUSPartial.State) as State_CityPop,
        unique_temp_city.City_Temp as CityStandard,
        unique_temp_city.City_Temp,
        -- unique_citypop_city.City_CityPop,
        -- CityUS.City_CityPop as CityUS,
        -- CityUSPartial.City_CityPop as CityUSPartial,
        -- partial6.City_CityPop as CityPartial6,
        -- partial5.City_CityPop as CityPartial5,
        -- partial4.City_CityPop as CityPartial4,
        -- partial3.City_CityPop as CityPartial3,
        COALESCE(unique_citypop_city.City_CityPop, CityUS.City_CityPop, CityUSPartial.City_CityPop, 
                 partial6.City_CityPop, partial5.City_CityPop, partial4.City_CityPop, partial3.City_CityPop) AS City_CityPop
    FROM unique_temp_city
    -- Join on city (case protected) and country
    LEFT JOIN unique_citypop_city
        ON UPPER(unique_citypop_city.City_CityPop) = UPPER(unique_temp_city.City_Temp)
            AND unique_citypop_city.CountryStandard = unique_temp_city.CountryStandard
    -- Strip the US abbreviation at the end of the city 
    -- and join on city, state, and country (US only)
    LEFT JOIN unique_citypop_city AS CityUS
        ON SUBSTR(UPPER(CityUS.City_CityPop),1,length(CityUS.City_CityPop) - 5) = UPPER(unique_temp_city.City_Temp)
            AND CityUS.CountryStandard = unique_temp_city.CountryStandard
            AND CityUS.State = unique_temp_city.State_Temp
            AND unique_temp_city.CountryStandard = 'United States'
            AND unique_citypop_city.City_CityPop IS NULL
    -- Join on city matching the first five characters, state, and country (US only)
    LEFT JOIN unique_citypop_city AS CityUSPartial
        ON SUBSTR(UPPER(CityUSPartial.City_CityPop),1,5) = SUBSTR(UPPER(unique_temp_city.City_Temp),1,5)
            AND CityUSPartial.CountryStandard = unique_temp_city.CountryStandard
            AND CityUSPartial.State = unique_temp_city.State_Temp
            AND unique_temp_city.CountryStandard = 'United States'
            AND unique_citypop_city.City_CityPop IS NULL
            AND CityUS.City_CityPop IS NULL
    -- Join on city matching the first six characters, state, and country (non US)
    LEFT JOIN unique_citypop_city AS partial6
        ON SUBSTR(UPPER(partial6.City_CityPop),1,6) = SUBSTR(UPPER(unique_temp_city.City_Temp),1,6)
            AND partial6.CountryStandard = unique_temp_city.CountryStandard
            AND unique_temp_city.CountryStandard <> 'United States'
            AND unique_citypop_city.City_CityPop IS NULL
            AND CityUS.City_CityPop IS NULL
            AND CityUSPartial.City_CityPop IS NULL
    -- Join on city matching the first five characters, state, and country (non US)
    LEFT JOIN unique_citypop_city AS partial5
        ON SUBSTR(UPPER(partial5.City_CityPop),1,5) = SUBSTR(UPPER(unique_temp_city.City_Temp),1,5)
            AND partial5.CountryStandard = unique_temp_city.CountryStandard
            AND unique_temp_city.CountryStandard <> 'United States'
            AND unique_citypop_city.City_CityPop IS NULL
            AND CityUS.City_CityPop IS NULL
            AND CityUSPartial.City_CityPop IS NULL
            AND partial6.City_CityPop IS NULL
    -- Join on city matching the first four characters, state, and country (non US)
    LEFT JOIN unique_citypop_city AS partial4
        ON SUBSTR(UPPER(partial4.City_CityPop),1,4) = SUBSTR(UPPER(unique_temp_city.City_Temp),1,4)
            AND partial4.CountryStandard = unique_temp_city.CountryStandard
            AND unique_temp_city.CountryStandard <> 'United States'
            AND unique_citypop_city.City_CityPop IS NULL
            AND CityUS.City_CityPop IS NULL
            AND CityUSPartial.City_CityPop IS NULL
            AND partial6.City_CityPop IS NULL
            AND partial5.City_CityPop IS NULL
    -- Join on city matching the first three characters, state, and country (non US)
    LEFT JOIN unique_citypop_city AS partial3
        ON SUBSTR(UPPER(partial3.City_CityPop),1,3) = SUBSTR(UPPER(unique_temp_city.City_Temp),1,3)
            AND partial3.CountryStandard = unique_temp_city.CountryStandard
            AND unique_temp_city.CountryStandard <> 'United States'
            AND partial3.City_CityPop NOT IN ('BELMOPAN', 'Bommanahalli', 'Brugge')
            AND unique_citypop_city.City_CityPop IS NULL
            AND CityUS.City_CityPop IS NULL
            AND CityUSPartial.City_CityPop IS NULL
            AND partial6.City_CityPop IS NULL
            AND partial5.City_CityPop IS NULL
            AND partial4.City_CityPop IS NULL
    --WHERE unique_citypop_city.City_CityPop IS NULL 
    --    and CityUS is null
    --    and CityUSPartial is not null
      --  and partial6.City_CityPop IS NULL
      --  and partial5.City_CityPop IS NULL
      --  and partial4.City_CityPop IS NULL
      --  and partial3.City_CityPop IS not NULL
    ORDER BY unique_temp_city.City_Temp
    """)

# 109/321 link on initial join
# 211/321 link with second join added
# 235/321 link with third join added
# 244/321 link with fourth join added
# 249/321 link with fifth join added
# 253/321 link with sixth join added
# 264/321 link with seventh join added

In [18]:
# Join mapping to city population set to have standard naming across
citypop_city_map = sqldf("""
    SELECT DISTINCT 
        CityPop.CountryStandard,
        COALESCE(temp_city_map.StateStandard, CityPop.State) as StateStandard,
        COALESCE(temp_city_map.CityStandard, CityPop.City_CityPop) as CityStandard, 
        CityPop.Country_CityPop,
        CityPop.State as State_CityPop,
        CityPop.City_CityPop
    FROM unique_citypop_city AS CityPop
    LEFT JOIN temp_city_map
        ON CityPop.City_CityPop = temp_city_map.City_CityPop
            AND CityPop.CountryStandard = temp_city_map.CountryStandard
    """)

# Propercase the city standard, so it's clean in presentation
citypop_city_map["CityStandard"] = citypop_city_map["CityStandard"].str.title()

# Drop city name to avoid duplicate fields in the proceeding step
temp_city_map = temp_city_map.drop(columns=['City_CityPop'])

In [19]:
# Full outer join on city maps
city_map = pd.merge(temp_city_map, citypop_city_map, on=['CityStandard', 'CountryStandard', 'StateStandard'], how='outer')
city_map.head()

Unnamed: 0,CountryStandard,Country_Temp,StateStandard,State_Temp,CityStandard,City_Temp,Country_CityPop,State_CityPop,City_CityPop
0,Cote d'Ivoire,Ivory Coast,,,Abidjan,Abidjan,Côte d'Ivoire,,Abidjan
1,United States,US,Texas,Texas,Abilene,Abilene,United States of America,Texas,Abilene (TX)
2,United Arab Emirates,United Arab Emirates,,,Abu Dhabi,Abu Dhabi,,,
3,Ethiopia,Ethiopia,,,Addis Ababa,Addis Ababa,Ethiopia,,ADDIS ABABA
4,United States,US,Ohio,Ohio,Akron Canton,Akron Canton,United States of America,Ohio,Akron (OH)


In [20]:
# Remove city population source content from temperature mapping
temp_city_map = temp_city_map[['Country_Temp', 'State_Temp', 'City_Temp',
                               'CountryStandard', 'StateStandard', 'CityStandard']].drop_duplicates()

In [21]:
# Transform the source datasets to have standardized names
staging_temp = sqldf("""
    SELECT 
        temp_df.Region,
        temp_city_map.CountryStandard,
        temp_city_map.StateStandard,
        temp_city_map.CityStandard,
        temp_df.Month,
        temp_df.Day,
        temp_df.Year,
        temp_df.AvgTemperature,
        temp_df.Date,
        temp_df.DateKey
    FROM temp_df
    LEFT JOIN temp_city_map
        ON temp_df.Country = temp_city_map.Country_Temp
            AND Coalesce(temp_df.State, 'State') = Coalesce(temp_city_map.State_Temp, 'State')
            AND temp_df.City = temp_city_map.City_Temp
    """)

staging_country_pop = sqldf("""
    SELECT 
        country_map.CountryStandard,
        country_pop_df.Year,
        country_pop_df.Country_Population
    FROM country_pop_df
    LEFT JOIN country_map
        ON country_pop_df.Country = country_map.Country_CountryPop
    """)

staging_city_pop = sqldf("""
    SELECT 
        city_map.CountryStandard,
        city_map.StateStandard,
        city_pop_df.Year,
        city_map.CityStandard,
        city_pop_df.City_Type,
        city_pop_df.Record_Type,
        city_pop_df.Reliability,
        city_pop_df.Source_Year,
        city_pop_df.City_Population,
        city_pop_df.Population_Notes
    FROM city_pop_df
    LEFT JOIN citypop_city_map AS city_map
        ON city_pop_df.Country = city_map.Country_CityPop
            AND city_pop_df.City = city_map.City_CityPop
    """)

staging_refugee = sqldf("""
    SELECT 
        Asylum.CountryStandard as AsylumCountry,
        Origin.CountryStandard as OriginCountry,
        refugee_df.Year,
        refugee_df.Refugees,
        refugee_df.Refugees_Assisted_by_UNHCR,
        refugee_df.Refugee_Like_Population,
        refugee_df.Refugee_Like_Population_Assisted_by_UNHCR
    FROM refugee_df
    LEFT JOIN country_map AS Asylum
        ON refugee_df.Asylum_Country = Asylum.Asylum_Country
    LEFT JOIN country_map AS Origin
        ON refugee_df.Origin_Country = Origin.Origin_Country
    """)

# Define source_dfs list for function used below.
source_dfs = [staging_temp,
              staging_country_pop,
              staging_city_pop,
              staging_refugee]

#### Create S3 buckets that reflect the path designs designated in dwh.cfg

In [22]:
# Read in parameters needed for Redshift cluster
config = configparser.ConfigParser()
config.read('dwh.cfg')

# Define bucket name and buffer
bucket = 'capstone-refugee'
csv_buffer = StringIO()

# Connect to S3
s3 = boto3.resource('s3',
                       region_name = "us-west-2",
                       aws_access_key_id = config.get('AWS','KEY'),
                       aws_secret_access_key = config.get('AWS','SECRET')
                   )

# Create S3 buckets
# s3.create_bucket(Bucket = bucket, CreateBucketConfiguration={
#     'LocationConstraint': 'us-west-2'})

In [25]:
# Define s3_files for function below. 
# Make sure they align with source_dfs list.
s3_files = ['temperature/staging_temp.csv',
            'country_poulation/staging_country_pop.csv',
            'city_population/staging_city_pop.csv',
            'refugee/staging_refugee.csv']

def df_to_s3():
    """
    Write the dataframe to csv is buffer
    Store the csv content into the s3 path
    Go to the beginning of the buffer and reset content
    """
    
    # For each source df and s3 path
    for df, file in list(zip(source_dfs, s3_files)):
        # Write the dataframe to csv is buffer
        df.to_csv(csv_buffer, index = False)
        
        # Store the csv content into the s3 path
        s3.Object(bucket, file).put(Body = csv_buffer.getvalue())
        
        # Go to the beginning of the buffer and reset content
        csv_buffer.seek(0)
        csv_buffer.truncate(0)

# Call the function
df_to_s3()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The data warehouse will reflect a relational data model with a star schema. The relational model complements the use case of analytical processes, especially with expected changes in business requirements over time. Utilizing a dimensional model gives end users an intuitive layout, the flexibility to use SQL, and high data integrity.

Fact Tables
 1. **TemperatureFact** - records from the temperature dataset associated with daily average temperatures in cities around the world
     - TemperatureKey, AvgTemperature, DateKey, CountryKey, CityKey
 2. **CountryPopulationFact** - records from the country population dataset reflecting population counts by year and country
     - CountryPopulationKey, CountryPopulation,  CountryKey
 3. **CityPopulationFact** - records from the city population dataset reflecting population counts by year and city
     - CityPopulationKey, CityPopulation, SourceKey, CityKey,  CountryKey
 4. **RefugeeFact** - records from the refugee dataset associated with refugee and refugee-like populations by country and year
     - RefugeeKey,  RefugeePopulation, RefugeesAssistedByUNHCR, RefugeeLikePopulation, RefugeeLikePopulationAssistedByUNHCR, AsylumCountryKey, OriginCountryKey

Dimension Tables
 1. **DateDim** - dates of temperature recordings
     - DateKey, Date, Month, Day, Year
 2. **CountryDim** - country, year, and region content in climate refugee database
     - CountryKey, Country, Region, Year
 3. **CityDim** - city, year, and state content in climate refugee database
     - CityKey, City, CityType, State, Year
 4. **SourceDim** - source details in climate refugee database
     - SourceKey, SourceYear, CityPopulationNotes, RecordType, Reliability

#### 3.2 Mapping Out Data Pipelines

**Create Table Schemas Based on Conceptual Model**
 1. Write create table and drop table statements
 2. Add logic to connect the tables to the Amazon Redshift database
 3. Launch a Redshift cluster and attach an IAM role with S3 read access
 4. Add the cluster and IAM role content to dwh.cfg
 5. Test execution by verifying the empty tables exist in the Redshift database
 
**Build ETL Pipeline**
 1. Create an S3 bucket and load the source data sets into S3
 2. Stage source datasets from S3 into the analytics database
 3. Transform the staged datasets into tables reflecting the conceptual model
 4. Load the tables into the analytics database
 5. Perform data quality checks and revise accordingly as needed
 6. Once finished, delete the redshift cluster

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

In [26]:
# Table list used to drop tables
table_list = ['staging_temperatures',
              'staging_country_populations',
              'staging_city_populations',
              'staging_refugees',
              'TemperatureFact',
              'CountryPopulationFact',
              'CityPopulationFact',
              'RefugeeFact',
              'DateDim',
              'CountryDim',
              'CityDim',
              'SourceDim']

def drop_tables(cur, conn):
    """
    Drops each table if they exist for each table in table_list
    
    INPUTS:
    * cur - the cursor available
    * conn - database connection
    """
    for table in table_list:
        cur.execute("DROP TABLE IF EXISTS " + table)
        conn.commit()

In [27]:
# Create tables
create_staging_temperatures = ("""CREATE TABLE IF NOT EXISTS staging_temperatures(
                                      Region varchar NOT NULL,
                                      Country varchar NOT NULL distkey,
                                      State varchar,
                                      City varchar NOT NULL,
                                      Month int NOT NULL,
                                      Day int NOT NULL,
                                      Year int NOT NULL,
                                      AvgTemperature decimal NOT NULL,
                                      Date date NOT NULL,
                                      DateKey int NOT NULL)""")

create_staging_country_populations = ("""CREATE TABLE IF NOT EXISTS staging_country_populations(
                                             Country varchar NOT NULL distkey,
                                             Year int NOT NULL,
                                             CountryPopulation int NOT NULL)""")

create_staging_city_populations = ("""CREATE TABLE IF NOT EXISTS staging_city_populations(
                                          Country varchar NOT NULL distkey,
                                          State varchar,
                                          Year int NOT NULL,
                                          City varchar NOT NULL,
                                          CityType varchar NOT NULL,
                                          RecordType varchar NOT NULL,
                                          Reliability varchar NOT NULL,
                                          SourceYear int NOT NULL,
                                          CityPopulation int NOT NULL,
                                          PopulationNotesKey varchar NOT NULL)""")

create_staging_refugees = ("""CREATE TABLE IF NOT EXISTS staging_refugees(
                                  AsylumCountry varchar NOT NULL distkey,
                                  OriginCountry varchar NOT NULL,
                                  Year int NOT NULL,
                                  RefugeePopulation decimal,
                                  RefugeesAssistedByUNHCR decimal,
                                  RefugeeLikePopulation decimal,
                                  RefugeeLikePopulationAssistedByUNHCR decimal)""")

create_TemperatureFact = ("""CREATE TABLE IF NOT EXISTS TemperatureFact(
                                 TemperatureKey int IDENTITY(0,1),
                                 DateKey int NOT NULL REFERENCES DateDim sortkey,
                                 CountryKey int NOT NULL REFERENCES CountryDim,
                                 CityKey int NOT NULL REFERENCES CityDim,
                                 AvgTemperature decimal NOT NULL,
                                 PRIMARY KEY(TemperatureKey))""")

create_CountryPopulationFact = ("""CREATE TABLE IF NOT EXISTS CountryPopulationFact(
                                       CountryPopulationKey int IDENTITY(0,1),
                                       CountryPopulation int NOT NULL,
                                       CountryKey int NOT NULL REFERENCES CountryDim sortkey,
                                       PRIMARY KEY(CountryPopulationKey))""")

create_CityPopulationFact = ("""CREATE TABLE IF NOT EXISTS CityPopulationFact(
                                    CityPopulationKey int IDENTITY(0,1),
                                    CityPopulation int NOT NULL,
                                    CityKey int NOT NULL REFERENCES CityDim sortkey,
                                    CountryKey int NOT NULL REFERENCES CountryDim,
                                    SourceKey int NOT NULL REFERENCES SourceDim,
                                    PRIMARY KEY(CityPopulationKey))""")

create_RefugeeFact = ("""CREATE TABLE IF NOT EXISTS RefugeeFact(
                             RefugeeKey int IDENTITY(0,1),
                             RefugeePopulation int,
                             RefugeesAssistedByUNHCR int,
                             RefugeeLikePopulation int,
                             RefugeeLikePopulationAssistedByUNHCR int,
                             AsylumCountryKey int NOT NULL REFERENCES CountryDim sortkey,
                             OriginCountryKey int NOT NULL REFERENCES CountryDim,
                             PRIMARY KEY(RefugeeKey))""")

create_DateDim = ("""CREATE TABLE IF NOT EXISTS DateDim(
                         DateKey int sortkey,
                         Date date,
                         Month int,
                         Day int,
                         Year int,
                         PRIMARY KEY(DateKey))
                         diststyle all""")

create_CountryDim = ("""CREATE TABLE IF NOT EXISTS CountryDim(
                            CountryKey int sortkey IDENTITY(0,1),
                            Country varchar NOT NULL,
                            Region varchar NOT NULL,
                            Year int NOT NULL,
                            PRIMARY KEY(CountryKey))
                            diststyle all""")

create_CityDim = ("""CREATE TABLE IF NOT EXISTS CityDim(
                            CityKey int sortkey IDENTITY(0,1),
                            City varchar NOT NULL,
                            CityType varchar,
                            State varchar,
                            Year int NOT NULL,
                            PRIMARY KEY(CityKey))
                            diststyle all""")

create_SourceDim = ("""CREATE TABLE IF NOT EXISTS SourceDim(
                           SourceKey int sortkey IDENTITY(0,1),
                           Reliability varchar NOT NULL,
                           RecordType varchar NOT NULL,
                           CityPopulationNotes varchar,
                           SourceYear int NOT NULL,
                           PRIMARY KEY(SourceKey))
                           diststyle all""")

# Create table query list
create_table_queries = [create_staging_temperatures,
                        create_staging_country_populations,
                        create_staging_city_populations,
                        create_staging_refugees,
                        create_DateDim,
                        create_CountryDim,
                        create_CityDim,
                        create_SourceDim,
                        create_TemperatureFact,
                        create_CountryPopulationFact,
                        create_CityPopulationFact,
                        create_RefugeeFact]

def create_tables(cur, conn):
    """
    Creates each table if they don't already exist by executing the \
    queries in `create_table_queries` list from sql_queries.py
    
    INPUTS:
    * cur - the cursor available
    * conn - database connection
    """
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()

In [28]:
# Connect to Redshift cluster and gets cursor to it
conn = psycopg2.connect("""host={} 
                           dbname={} 
                           user={} 
                           password={} 
                           port={}""".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [29]:
# Drops all tables by calling the drop_tables function
drop_tables(cur, conn)

# Creates all tables by calling the create_tables function
create_tables(cur, conn)

In [32]:
# Staging table names
staging_tables = ['staging_temperatures',
                  'staging_country_populations',
                  'staging_city_populations',
                  'staging_refugees']

# Paths to source csv files in S3
s3_storage_paths = ['TEMP_DATA',
                    'COUNTRY_POP_DATA',
                    'CITY_POP_DATA',
                    'REFUGEE_DATA']

def load_staging_tables(cur, conn):
    """
    Copy source datasets from S3 to Redshift

    INPUTS:
    * cur - the cursor available
    * conn - database connection
    """
    for table, path in list(zip(staging_tables, s3_storage_paths)):
        cur.execute(("""COPY {} FROM {}
                        CREDENTIALS 'aws_iam_role={}'
                        csv REGION 'us-west-2'
                        IGNOREHEADER 1
                     """).format(table,
                                 config.get('S3', path),
                                 config.get('IAM_ROLE', 'ARN')))
        conn.commit()

In [33]:
# Loads all staging tables by calling the load_staging_tables function
load_staging_tables(cur, conn)

In [120]:
# Transform the staged datasets into tables reflecting the conceptual model

insert_DateDim = ("""
    INSERT INTO DateDim(DateKey, Date, Month, Day, Year)
    SELECT DISTINCT
        DateKey,
        Date,
        Month,
        Day,
        Year
    FROM staging_temperatures
    """)

insert_CountryDim = ("""
    INSERT INTO CountryDim(Country, Region, Year)
    SELECT DISTINCT
        Country,
        max(Region) over(partition by Country) as Region,
        Year
    FROM (SELECT DISTINCT 
              Country,
              Region,
              Year 
          FROM staging_temperatures
          UNION
          SELECT DISTINCT 
              Country,
              '' as Region,
              Year 
          FROM staging_city_populations
          UNION
          SELECT DISTINCT 
              Country,
              '' as Region,
              Year 
          FROM staging_country_populations
          UNION
          SELECT DISTINCT 
              AsylumCountry,
              '' as Region,
              Year 
          FROM staging_refugees
          UNION
          SELECT DISTINCT 
              OriginCountry,
              '' as Region,
              Year 
          FROM staging_refugees
         ) AS CountryInfo  
    """)

insert_CityDim = ("""
    INSERT INTO CityDim(City, CityType, State, Year)
    SELECT DISTINCT
        Coalesce(city_pop.City, temp.City) as City,
        CityType,
        Coalesce(city_pop.State, temp.State) as State,
        Coalesce(city_pop.Year, temp.Year) as Year
    FROM staging_city_populations as city_pop
    FULL OUTER JOIN staging_temperatures as temp
        ON city_pop.City = temp.City
            AND Coalesce(city_pop.State, 'State') = Coalesce(temp.State, 'State')
            AND city_pop.Year = temp.Year
    """)

insert_SourceDim = ("""
    INSERT INTO SourceDim(Reliability, RecordType, CityPopulationNotes, SourceYear)
    SELECT DISTINCT
        Reliability,
        RecordType,
        PopulationNotesKey as CityPopulationNotes,
        SourceYear
    FROM staging_city_populations
    """)

insert_TemperatureFact = ("""
    INSERT INTO TemperatureFact(DateKey, CountryKey, CityKey, AvgTemperature)
    SELECT DISTINCT
        DateDim.DateKey,
        CountryDim.CountryKey,
        CityDim.CityKey,
        temp.AvgTemperature
    FROM staging_temperatures as temp
    LEFT JOIN DateDim
        ON temp.Date = DateDim.Date
    LEFT JOIN CountryDim
        ON temp.Country = CountryDim.Country
            AND temp.Year = CountryDim.Year
    LEFT JOIN CityDim
        ON temp.City = CityDim.City
            AND Coalesce(temp.State, 'State') = Coalesce(CityDim.State, 'State')
            AND temp.Year = CityDim.Year
    """)

insert_CountryPopulationFact = ("""
    INSERT INTO CountryPopulationFact(CountryPopulation, CountryKey)
    SELECT DISTINCT
        country_pop.CountryPopulation,
        CountryDim.CountryKey
    FROM staging_country_populations as country_pop
    LEFT JOIN CountryDim
        ON country_pop.Country = CountryDim.Country
            AND country_pop.Year = CountryDim.Year
    """)

insert_CityPopulationFact = ("""
    INSERT INTO CityPopulationFact(CityPopulation, CityKey, CountryKey, SourceKey)
    SELECT
        city_pop.CityPopulation,
        CityDim.CityKey,
        CountryDim.CountryKey,
        SourceDim.SourceKey
    FROM staging_city_populations as city_pop
    LEFT JOIN SourceDim
        ON city_pop.Reliability = SourceDim.Reliability
            AND city_pop.RecordType = SourceDim.RecordType
            AND city_pop.PopulationNotesKey = SourceDim.CityPopulationNotes
            AND city_pop.SourceYear = SourceDim.SourceYear
    LEFT JOIN CountryDim
        ON city_pop.Country = CountryDim.Country
            AND city_pop.Year = CountryDim.Year
    LEFT JOIN CityDim
        ON city_pop.City = CityDim.City
            AND Coalesce(city_pop.State, 'State') = Coalesce(CityDim.State, 'State')
            AND city_pop.Year = CityDim.Year
    """)

insert_RefugeeFact = ("""
    INSERT INTO RefugeeFact(RefugeePopulation, RefugeesAssistedByUNHCR, RefugeeLikePopulation,
                            RefugeeLikePopulationAssistedByUNHCR, AsylumCountryKey, OriginCountryKey)
    SELECT DISTINCT
        staging_refugees.RefugeePopulation,
        staging_refugees.RefugeesAssistedByUNHCR,
        staging_refugees.RefugeeLikePopulation,
        staging_refugees.RefugeeLikePopulationAssistedByUNHCR,
        Asylum.CountryKey as AsylumCountryKey,
        Origin.CountryKey as OriginCountryKey
    FROM staging_refugees 
    LEFT JOIN CountryDim AS Asylum
        ON staging_refugees.AsylumCountry = Asylum.Country
    LEFT JOIN CountryDim AS Origin
        ON staging_refugees.OriginCountry = Origin.Country
    """)

insert_table_queries = [insert_DateDim,
                        insert_CountryDim,
                        insert_CityDim,
                        insert_SourceDim,
                        insert_TemperatureFact,
                        insert_CountryPopulationFact,
                        insert_CityPopulationFact,
                        insert_RefugeeFact]

In [121]:
def insert_tables(cur, conn, config):
    """
    Insert data into created fact and dimension tables
    
    INPUTS:
    * cur - the cursor available
    * conn - database connection
    * config - parameters to Redshift cluster
    """
    
    for query in insert_table_queries:
        cur.execute(query)
        conn.commit()

In [124]:
# Call insert table function
insert_tables(cur, conn, config)

In [123]:
# Close the connection
conn.close()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from.