# ETL Project: Technical Report
### World Development Indicators in PostgreSQL

## Overview

**Team Members:**
* Hai Pham
* Stephen Eldridge

**Data Sets:**<br>
Data is sourced from http://wdi.worldbank.org/table as CSV files. A total of 3 data sets are used:
* “Size of the economy” (wv1.csv)
* “Global goals: ending poverty and improving lives” (wv2.csv)
* “Global goals: promoting sustainability” (wv3.csv) 

**ETL Objectives:**<br>
We were interested in the data presented in the World Bank’s World Development Indicators series of data sets. In particular, we thought it would be interesting for someone to analyze the relationships between different indicators used in the various data sets. To this end, we decided to take a few of the data sets and perform ETL to create a relational database that would allow indicators from different data sets to be compared and analyzed in future projects.

Our method for accomplish this was:
* Create a new PostgreSQL database in pgAdmin called world_development_indicators with 3 tables: economy_size, ending_poverty, promoting_sustainability
* Extract all 3 CSV files to a different Pandas dataframe in a Jupyter Notebook.
* Transform each dataframe to match the schema for the PostgreSQL world_development_indicator_db database.
* Load the transformed dataframes to PostgreSQL world_development_indicator_db using the Pandas to_sql function in the Jupyter Notebook.
* Run a SQL query in pgAdmin to combine fields from each of the tables.


## Extract

We acquired the three CSV files for our data set from http://wdi.worldbank.org/table. In a Jupyter Notebook, we used the Pandas read_csv function to import them into dataframes.


In [None]:
#read economic data data file

wv1 = pd.read_csv("Resources\WV1.csv",header=[0,3])
wv1.head()

In [None]:
#read poverty data file

wv2 = pd.read_csv("Resources\WV2.csv",header=[0,3])
wv2.head()

In [None]:
#read sustainability data file

wv3 = pd.read_csv("Resources\WV3.csv",header=[0,3])
wv3.head()

## Transform


### Headings

In each file, the column headings were spread over multiple rows. We combined the two most salient rows into our headings. We then formatted column headings to be more amenable to a SQL database, replacing spaces with underscores and converting to lowercase. Finally, Certain columns needed to be renamed due to inconsistencies in the import from the CSV and minor issues with incompatible characters.

See the example code below.

In [None]:
wv1.columns=wv1.columns.map("_".join)
wv1.head()

In [None]:
#clean header names

wv1.columns = [x.lower().replace(" ","_").replace(",","") for x in wv1.columns]
                    
wv1.head()

In [None]:
wv1_df=wv1.rename(columns ={"unnamed:_0_level_0_unnamed:_0_level_1":"country","unnamed:_7_level_0_2019":"ppp_gni_per_capita_2019",\
                          "gross_domestic_product_2019":"gross_domestic_product_growth_2019", "unnamed:_9_level_0_2019":"per_capita_gdp_growth_2019"})
wv1_df.head()

### Data cleanup

Each file contained empty "notes" rows in the original CSV, which we dropped from our final dataframes. We also replaced invalid data and characters within our cells to ensure the string values could later be converted to float values.

In [None]:
#drop empty rows

wv1_df.drop(wv1.index[227:400],inplace=True)
wv2_df.drop(wv2.index[226:400],inplace=True)
wv3_df.drop(wv3.index[226:400],inplace=True)

wv1_df

In [None]:
#drop invalid cell data

wv1_df1=wv1_df.replace(to_replace="..", value="NaN")
wv2_df1=wv2_df.replace(to_replace="..", value="NaN")
wv3_df1=wv3_df.replace(to_replace="..", value="NaN")

wv1_df1

In [None]:
wv1_df2=wv1_df1.replace(",", "", regex=True)
wv2_df2=wv2_df1.replace(",", "", regex=True)
wv3_df2=wv3_df1.replace(",", "", regex=True)

wv3_df2

### Data consistency

To ensure the data entered into the tables could be compared and analyzed accurately, we converted all figures that had been listed in thousands, millions, billions etc. to absolute values. All percentages and rates per thousand or hundred thousand were converted to decimal values. The resulting numbers were all listed as floats, rounded to three decimal places.

In [None]:
wv1_df2['population_2019'] = round(1000000 * wv1_df2['population_2019'].astype(float), 3)
wv1_df2['surface_area_2019'] = round(1000 * wv1_df2['surface_area_2019'].astype(float), 3)
wv1_df2['population_density_2019'] = round(wv1_df2['population_density_2019'].astype(float), 3)
wv1_df2['gross_national_income_atlas_method_2019'] = round(1000000000 * wv1_df2['gross_national_income_atlas_method_2019'].astype(float), 3)
wv1_df2['gross_national_income_per_capita_atlas_method_2019'] = round(wv1_df2['gross_national_income_per_capita_atlas_method_2019'].astype(float), 3)
wv1_df2['purchasing_power_parity_gross_national_income_2019'] = round(1000000000 * wv1_df2['purchasing_power_parity_gross_national_income_2019'].astype(float), 3)
wv1_df2['ppp_gni_per_capita_2019'] = round(wv1_df2['ppp_gni_per_capita_2019'].astype(float), 3)
wv1_df2['gross_domestic_product_growth_2019'] = round(0.01 * wv1_df2['gross_domestic_product_growth_2019'].astype(float), 3)
wv1_df2['per_capita_gdp_growth_2019'] = round(0.01 * wv1_df2['per_capita_gdp_growth_2019'].astype(float), 3)

In [None]:
wv2_df2['percentage_share_of_income_or_consumption_2007_18'] = round(0.01 * wv2_df2['percentage_share_of_income_or_consumption_2007_18'].astype(float), 3)
wv2_df2['prevalence_of_child_malnutrition_2020'] = round(0.01 * wv2_df2['prevalence_of_child_malnutrition_2020'].astype(float), 3)
wv2_df2['maternal_mortality_ratio_2017'] = round(0.00001 * wv2_df2['maternal_mortality_ratio_2017'].astype(float), 3)
wv2_df2['under_five_mortality_rate_2019'] = round(0.001 * wv2_df2['under_five_mortality_rate_2019'].astype(float), 3)
wv2_df2['incidence_of_hiv_ages_15_49_per1000_2020'] = round(0.001 * wv2_df2['incidence_of_hiv_ages_15_49_per1000_2020'].astype(float), 3)
wv2_df2['incidence_of_tuberculosis_2019'] = round(0.00001 * wv2_df2['incidence_of_tuberculosis_2019'].astype(float), 3)
wv2_df2['mortality_caused_by_road_traffic_injury_2016'] = round(0.00001 * wv2_df2['mortality_caused_by_road_traffic_injury_2016'].astype(float), 3)
wv2_df2['primary_completion_rate_2018'] = round(0.01 * wv2_df2['primary_completion_rate_2018'].astype(float), 3)
wv2_df2['vulnerable_employment_2019'] = round(0.01 * wv2_df2['vulnerable_employment_2019'].astype(float), 3)
wv2_df2['contributing_family_workers_and_own_account_workers_female'] = round(0.01 * wv2_df2['contributing_family_workers_and_own_account_workers_female'].astype(float), 3)
wv2_df2['labor_productivity_growth_2015_18'] = round(0.01 * wv2_df2['labor_productivity_growth_2015_18'].astype(float), 3)

In [None]:
wv3_df2['people_using_safely_managed_drinking_water_services_2017'] = round(0.01 * wv3_df2['people_using_safely_managed_drinking_water_services_2017'].astype(float), 3)
wv3_df2['people_using_safely_managed_sanitation_services_2017'] = round(0.01 * wv3_df2['people_using_safely_managed_sanitation_services_2017'].astype(float), 3)
wv3_df2['access_to_electricity_2017'] = round(0.01 * wv3_df2['access_to_electricity_2017'].astype(float), 3)
wv3_df2['renewable_energy_consumption_2015'] = round(0.01 * wv3_df2['renewable_energy_consumption_2015'].astype(float), 3)
wv3_df2['expenditures_for_rd_2015'] = round(0.01 * wv3_df2['expenditures_for_rd_2015'].astype(float), 3)
wv3_df2['urban_population_living_in_slums_2014'] = round(0.01 * wv3_df2['urban_population_living_in_slums_2014'].astype(float), 3)
wv3_df2['ambient_pm2_5_air_pollution_2016'] = round(wv3_df2['ambient_pm2_5_air_pollution_2016'].astype(float), 3)
wv3_df2['adjusted_net_savings_2017'] = round(0.01 * wv3_df2['adjusted_net_savings_2017'].astype(float), 3)
wv3_df2['carbon_dioxide_emissions_2014'] = round(wv3_df2['carbon_dioxide_emissions_2014'].astype(float), 3)
wv3_df2['nationally_protected_terrestrial_and_marine_areas_2018'] = round(0.01 * wv3_df2['nationally_protected_terrestrial_and_marine_areas_2018'].astype(float), 3)
wv3_df2['intentional_homicides_2015'] = round(0.00001 * wv3_df2['intentional_homicides_2015'].astype(float), 3)
wv3_df2['internet_use_2017'] = round(0.01 * wv3_df2['internet_use_2017'].astype(float), 3)

## Load

To store our data, we created a relational database using PostgreSQL. For this example, we decided to create one table for each set of data to preserve the existing data structure as much as possible. However, were we to apply this to a future project, we may decide to consolidate to a single  table for greater simplicity.

### Creating tables

We created our tables using a SQL query, as below. We chose to use `country` as the primary key in each table, as each record in each table is tied to a unique country or group of countries. For a larger dataset, we would have put the country field in its own table and used references to it in each of our tables. However, with only about 230 records in each table, it didn't seem necessary for this exercise, so we chose again to maintain the existing data structure where possible.

In [None]:
-- Create tables for raw data to be loaded into
CREATE TABLE economy_size (
    country TEXT PRIMARY KEY,
    population_2019 FLOAT,
    surface_area_2019 FLOAT,
    population_density_2019 FLOAT,
    gross_national_income_atlas_method_2019 FLOAT,
    gross_national_income_per_capita_atlas_method_2019 FLOAT,
    purchasing_power_parity_gross_national_income_2019 FLOAT,
    ppp_gni_per_capita_2019 FLOAT,
    gross_domestic_product_growth_2019 FLOAT,
    per_capita_gdp_growth_2019 FLOAT
);

CREATE TABLE ending_poverty (
    country TEXT PRIMARY KEY,
    percentage_share_of_income_or_consumption_2007_18 FLOAT,
    prevalence_of_child_malnutrition_2020 FLOAT,
    maternal_mortality_ratio_2017 FLOAT,
    under_five_mortality_rate_2019 FLOAT,
    incidence_of_hiv_ages_15_49_per1000_2020 FLOAT,
    incidence_of_tuberculosis_2019 FLOAT,
    mortality_caused_by_road_traffic_injury_2016 FLOAT,
    primary_completion_rate_2018 FLOAT,
    vulnerable_employment_2019 FLOAT,
    contributing_family_workers_and_own_account_workers_female FLOAT,
    labor_productivity_growth_2015_18 FLOAT
);

CREATE TABLE promoting_sustainability (
    country TEXT PRIMARY KEY,
    people_using_safely_managed_drinking_water_services_2017 FLOAT,
    people_using_safely_managed_sanitation_services_2017 FLOAT,
    access_to_electricity_2017 FLOAT,
    renewable_energy_consumption_2015 FLOAT,
    expenditures_for_rd_2015 FLOAT,
    urban_population_living_in_slums_2014 FLOAT,
    ambient_pm2_5_air_pollution_2016 FLOAT,
    adjusted_net_savings_2017 FLOAT,
    carbon_dioxide_emissions_2014 FLOAT,
    nationally_protected_terrestrial_and_marine_areas_2018 FLOAT,
    intentional_homicides_2015 FLOAT,
    internet_use_2017 FLOAT
);

### Loading data into tables

We used an SQAlchemy engine to connect to our database in PostgreSQL. For each table, we appended records where a country did not already exist, and dropped the index because with `country` serving as primary key, it was superfluous.

In [None]:
#create postgres connection

rds_connection_string = username + ":" + password + "@localhost:5432/world_development_indicators"
engine = create_engine(f'postgresql://{rds_connection_string}')

In [None]:
engine.table_names()

In [None]:
#write data to economy_size table

wv1_df2.to_sql(name='economy_size', con=engine, if_exists='append', index=False)

In [None]:
#write data to ending_poverty table

wv2_df2.to_sql(name='ending_poverty', con=engine, if_exists='append', index=False)

In [None]:
#write data to promoting_sustainability table

wv3_df2.to_sql(name='promoting_sustainability', con=engine, if_exists='append', index=False)

### Testing with a join

To ensure our database functioned as expected, we wrote a join in SQL that would pull columns from each database based on the table's `country` field. This allowed us to be certain that the data fields had populated with the expected values, and that our goal of making it possible to compare datapoints from across the different data sets was accomplished. That code is below.

In [None]:

-- Query to check successful load
SELECT * FROM economy_size;

SELECT * FROM ending_poverty;

SELECT * FROM promoting_sustainability;

-- Join tables
SELECT es.country, es.population_2019, es.gross_domestic_product_growth_2019, ep.percentage_share_of_income_or_consumption_2007_18, ep.prevalence_of_child_malnutrition_2020, ep.under_five_mortality_rate_2019, ps.people_using_safely_managed_drinking_water_services_2017, ps.ambient_pm2_5_air_pollution_2016
FROM economy_size as es
INNER JOIN ending_poverty as ep
ON es.country = ep.country
INNER JOIN promoting_sustainability as ps
ON es.country = ps.country
;