In [None]:
# Phils Section Start

# ETL (Extract-Transform-Load) Project

## Import Dependencies

* Run the following cell to import the necessary packages to run through this etl.
* The only external package is **d6tstack**. So, you will need to install this in your virtual environment.
  * For more information on this package and how to install it, see <https://pypi.org/project/d6tstack/>.

In [None]:
# Dependencies
import pandas as pd
import numpy as np

# Database credentials
from config import username, password

# Used for making database connection.
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import Session
from sqlalchemy.engine import reflection
from sqlalchemy.schema import (
        MetaData,
        Table,
        DropTable,
        ForeignKeyConstraint,
        DropConstraint,
        )

# Used to abstract classes into tables.
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.automap import automap_base

# Used to declare column types.
from sqlalchemy import Column, Integer, String, Float, ForeignKey

# Used to load pandas dataframe into sql.
import d6tstack.utils
import time

# Ignore warnings.
import warnings
warnings.filterwarnings('ignore')

# Used for reading, writing to, and zipping files/folders.
from pathlib import Path
import os, zipfile
import shutil
import glob

## Constants

These are variables for items in this notebook that never change, such as csv file names, table names, database name, database connection string, etc.

In [None]:
# Data csv files
COMMODITY_TRADE_CSV = os.path.join(".", "Resources", "commodity_trade_statistics_data.csv")
FINANCIAL_AID_CSV =  os.path.join(".", "Resources", "june-9-data-csv-1.csv")
COMMODITY_CODES_CSV = os.path.join(".", "Resources", "un_comtrade_commodity_classifications.csv")

# Database/tables
DB_NAME = "trade_db"
COMMODITY_TABLE = "commodity"
COMMODITY_CATEGORY_TABLE = "commodity_category"
COMMODITY_CODE_TABLE = "commodity_code"
COUNTRY_TABLE = "country"
FINANCIAL_AID_TABLE = 'financial_aid'
COMMODITY_EXPORTS_TABLE = 'commodity_exports'
COMMODITY_IMPORTS_TABLE = 'commodity_imports'

# Database connection string for loading data into database.
cfg_uri_psql = f"postgresql+psycopg2://{username}:{password}@localhost/{DB_NAME}"

## Unzip data files into the Resources folder

* Before running this cell, create a folder in the project root directory (the same directory as this notebook) called **Resources**.
* Then, manually download the data zip files from the team google drive folder and place inside the **Resources** folder you just created.
* After that, you can run this cell. Running the following cell will extract the data zip files into the **Resources** folder you created, which will contain the csv files needed for this project.

In [None]:
# Running this cell will unzip the data files in the Resources folder.
extension = ".zip"
extracted_dir_name = "."

# Get the current working directory.
# Need to be in the root directory of this project for this to work.
cwd_dir_name = os.getcwd()
print(f"The current working directory is {cwd_dir_name}.")

os.chdir("Resources") # change directory from working dir to dir with the zip file(s) .
# This should be the "Resources" folder.
dir_name = os.getcwd()
print(f"You are now in the following directory: {dir_name}.")

for item in os.listdir(dir_name): # loop through the items in the directory.
    if item.endswith(extension): # check for ".zip" extension"
        try:
            file_name = os.path.abspath(item) # get full path of files
            zip_ref = zipfile.ZipFile(file_name) # create zipfile object
            unzipped_directory = os.path.join(extracted_dir_name) # reference to the directory where the zip files will be extracted.
            zip_ref.extractall(unzipped_directory) # extract file to dir
            zip_ref.close() # close file
            print(f"Successfully unzipped {item} into the following folder:{dir_name}.")
        except:
            print(f"Error trying to unzip data file(s).")
            print(f"Make sure that the files are closed and you have the correct file/folder permissions.")
            
# Go up one directory into the project root directory.
os.chdir(os.path.normpath(os.getcwd() + os.sep + os.pardir))
print(os.path.normpath(os.getcwd() + os.sep + os.pardir))

## Store commodity csv into pandas dataframe

In [None]:
commodity_trade_df = pd.read_csv(COMMODITY_TRADE_CSV, low_memory=False, encoding ="utf-8")

commodity_trade_df.to_hdf('commodity_trade.h5', key='df', mode='w')

In [None]:
# Found out this read_hsf is faster than read_csv when working with really large datasets.
commodity_trade_df = pd.read_hdf('commodity_trade.h5', 'df')

commodity_trade_df

### Rename columns for commodity dataframe

In [None]:
commodity_trade_renamed_columns = commodity_trade_df.rename(columns={
    "comm_code": "commodity_code",
    "commodity": "commodity_description",
    "flow": "trade_flow",
    "trade_usd": "trade_value_usd",  
})

commodity_trade_renamed_columns

### Drop null values from commodity dataframe

In [None]:
commodity_trade_no_null = commodity_trade_renamed_columns.dropna(how="any")

commodity_trade_no_null

### Verify commodity dataframe count

In [None]:
commodity_trade_no_null.count()

### Add auto incrementing id column to commodity dataframe

In [None]:
commodity_trade_no_null.insert(0, 'id', range(0, 0 + len(commodity_trade_no_null)))

commodity_trade_no_null

## Create new category dataframe from the commodity dataframe

In [None]:
# Create new category data frame with split value columns. 
category_df = commodity_trade_no_null["category"].str.split("_", n = 1, expand = True) 
  
# Make separate category_id column from new category data frame.
commodity_trade_no_null["category_id"]= category_df[0] 
  
# Dropping old category column. 
commodity_trade_no_null.drop(columns =["category"], inplace = True) 

commodity_trade_no_null

In [None]:
# Rename columns to be something more meaningful.
category_df = category_df.rename(columns={
    0: "category_id",
    1: "category_name"
})

# Drop duplicate categories.
category_df.drop_duplicates("category_id", inplace=True)

category_df

## Create new commodity codes dataframe

In [None]:
# Create new commodity codes dataframe that contains commodity code and commodity description columns.

# Store commodity codes classification csv into pandas dataframe
commodity_codes_df = pd.read_csv(COMMODITY_CODES_CSV, low_memory=False, encoding ="utf-8")

commodity_codes_df.to_hdf('commodity_codes.h5', key='df', mode='w')

# Found out this read_hsf is faster than read_csv when working with really large datasets.
commodity_codes_df = pd.read_hdf('commodity_codes.h5', 'df')

# Rename columns
commodity_codes_df = commodity_codes_df.rename(columns={
    "Code": "commodity_code",
    "Description": "commodity_description"
})

# Drop duplicate codes
commodity_codes_df.drop_duplicates("commodity_code", inplace=True)

# Drop unnecessary columns
commodity_codes_df = commodity_codes_df[["commodity_code", "commodity_description"]]

commodity_codes_df

### Verify commodity codes dataframe count


In [None]:
commodity_codes_df.count()

### Remove duplicate commodity_description column from commodity dataframe

In [None]:
del commodity_trade_no_null["commodity_description"]

commodity_trade_no_null

## Create new countries dataframe from the commodity dataframe

In [None]:
# Create new countries dataframe that contains country name and unique id columns.
countries_df = commodity_trade_no_null.loc[:,["country_or_area"]]

# Drop duplicate codes
countries_df.drop_duplicates("country_or_area", inplace=True)

# Drop nulls
countries_df.dropna(how="any", inplace=True)

countries_df

### Add auto-incrementing id column to countries dataframe


In [None]:
countries_df.insert(0, 'id', range(0, 0 + len(countries_df)))

countries_df

### Merge countries dataframe with commodity dataframe on country name

In [None]:
# Merge on country name using pandas.
countries_commodities_merged = pd.merge(commodity_trade_no_null, countries_df, on="country_or_area", how="outer")

# Rename columns
countries_commodities_merged = countries_commodities_merged.rename(columns={
    "id_y": "country_id",
    "id_x": "id"
})

# Drop country name column.
del countries_commodities_merged["country_or_area"]

countries_commodities_merged

## Split commodity dataframe into 2 dataframes based on trade flow - exports and imports

### Find all possible values of trade_flow column

In [None]:
countries_commodities_merged["trade_flow"].value_counts()

### Create new dataframe for commodity exports

In [None]:
exports_df = countries_commodities_merged.loc[
    (countries_commodities_merged["trade_flow"] == "Export") | 
    (countries_commodities_merged["trade_flow"] == "Re-Export")]

exports_df

### Create new dataframe for commodity imports

In [None]:
imports_df = countries_commodities_merged.loc[
    (countries_commodities_merged["trade_flow"] == "Import") | 
    (countries_commodities_merged["trade_flow"] == "Re-Import")]

imports_df

## Connect to local database

In [None]:
rds_connection_string = f"{username}:{password}@localhost:5432/{DB_NAME}"
engine = create_engine(f'postgresql://{rds_connection_string}')

## DANGEROUS: Drops everything in database

Use with caution!

In [None]:
def db_DropEverything(engine):
    # From http://www.sqlalchemy.org/trac/wiki/UsageRecipes/DropEverything

    conn = engine.connect()

    trans = conn.begin()

    inspector = inspect(engine)

    metadata = MetaData()

    tbs = []
    all_fks = []

    for table_name in inspector.get_table_names():
        fks = []
        for fk in inspector.get_foreign_keys(table_name):
            if not fk['name']:
                continue
            fks.append(
                ForeignKeyConstraint((),(),name=fk['name'])
                )
        t = Table(table_name,metadata,*fks)
        tbs.append(t)
        all_fks.extend(fks)

    for fkc in all_fks:
        conn.execute(DropConstraint(fkc))

    for table in tbs:
        conn.execute(DropTable(table))

    trans.commit()

In [None]:
db_DropEverything(engine)

## Create classes/schemas that will be associated with tables in the database.

In [None]:
# Sets an object to utilize the default declarative base in SQL Alchemy.
Base = declarative_base()

In [None]:
# Clear out db
Base.metadata.drop_all(bind=engine)

In [None]:
# Create classes and define schemas for different tables
class CommodityCategory(Base):
    __tablename__ = COMMODITY_CATEGORY_TABLE
    category_id = Column(String(255), primary_key=True, nullable=False, unique=True)
    category_name = Column(String(255), nullable=False)
    
class CommodityCode(Base):
    __tablename__ = COMMODITY_CODE_TABLE
    commodity_code = Column(String(255), primary_key=True, nullable=False, unique=True)
    commodity_description = Column(String(400), nullable=False)
    
class Country(Base):
    __tablename__ = COUNTRY_TABLE
    id = Column(Integer, primary_key=True, nullable=False, unique=True)
    country_or_area = Column(String(255), nullable=False)

class CommodityExports(Base):
    __tablename__ = COMMODITY_EXPORTS_TABLE
    id = Column(Integer, primary_key=True, nullable=False, unique=True)
    year = Column(Integer, nullable=False)
    comodity_code = Column(String(255), ForeignKey(f"{COMMODITY_CODE_TABLE}.commodity_code"), nullable=False)
    trade_flow = Column(String(255), nullable=False)
    trade_value_usd = Column(String(255), nullable=False)
    weight_kg = Column(Float, nullable=False)
    quantity_name = Column(String(255), nullable=False)
    quantity = Column(Float, nullable=False)
    category_id = Column(String(255), ForeignKey(f"{COMMODITY_CATEGORY_TABLE}.category_id"), nullable=False)
    country_id = Column(Integer, ForeignKey(f"{COUNTRY_TABLE}.id"), nullable=False)
    
class CommodityImports(Base):
    __tablename__ = COMMODITY_IMPORTS_TABLE
    id = Column(Integer, primary_key=True, nullable=False, unique=True)
    year = Column(Integer, nullable=False)
    comodity_code = Column(String(255), ForeignKey(f"{COMMODITY_CODE_TABLE}.commodity_code"), nullable=False)
    trade_flow = Column(String(255), nullable=False)
    trade_value_usd = Column(String(255), nullable=False)
    weight_kg = Column(Float, nullable=False)
    quantity_name = Column(String(255), nullable=False)
    quantity = Column(Float, nullable=False)
    category_id = Column(String(255), ForeignKey(f"{COMMODITY_CATEGORY_TABLE}.category_id"), nullable=False)
    country_id = Column(Integer, ForeignKey(f"{COUNTRY_TABLE}.id"), nullable=False)

# Create (if not already in existence) the table associated with class.
Base.metadata.create_all(engine)

## Check for tables

In [None]:
engine.table_names()

## Load final pandas dataframes into sql

### Use pandas/d6tstack to load commodity category dataframe into sql

In [None]:
start_time = time.time()
d6tstack.utils.pd_to_psql(category_df, cfg_uri_psql, COMMODITY_CATEGORY_TABLE, if_exists='append')
print("Time to load category dataframe into sql:")
print("--- %s seconds ---" % (time.time() - start_time))

### Use pandas/d6tstack to load commodity codes dataframe into sql

In [None]:
start_time = time.time()
d6tstack.utils.pd_to_psql(commodity_codes_df, cfg_uri_psql, COMMODITY_CODE_TABLE, if_exists='append', sep='\t')
print("Time to load codes dataframe into sql:")
print("--- %s seconds ---" % (time.time() - start_time))

### Use pandas/d6tstack to load country dataframe into sql

In [None]:
start_time = time.time()
d6tstack.utils.pd_to_psql(countries_df, cfg_uri_psql, COUNTRY_TABLE, if_exists='append', sep='\t')
print("Time to load country dataframe into sql:")
print("--- %s seconds ---" % (time.time() - start_time))

### Use pandas/d6tstack to load commodity exports dataframe into sql

In [None]:
start_time = time.time()
d6tstack.utils.pd_to_psql(exports_df, cfg_uri_psql, COMMODITY_EXPORTS_TABLE, if_exists='append',sep='\t')
print("Time to load commodity exports dataframe into sql:")
print("--- %s seconds ---" % (time.time() - start_time))

### Use pandas/d6tstack to load commodity imports dataframe into sql

In [None]:
start_time = time.time()
d6tstack.utils.pd_to_psql(imports_df, cfg_uri_psql, COMMODITY_IMPORTS_TABLE, if_exists='append',sep='\t')
print("Time to load commodity imports dataframe into sql:")
print("--- %s seconds ---" % (time.time() - start_time))

## Create session object to connect to database

In [None]:
session = Session(bind=engine)

## Confirm data from pandas dataframes have been added to database.

### Colllect the names of the tables within the database

In [None]:
inspector = inspect(engine)
inspector.get_table_names()

### Confirm category data has been added by querying the commedity category table

In [None]:
category_list = session.query(CommodityCategory).limit(10)
for category in category_list:
    print(f"id: {category.category_id}, category name: {category.category_name}")

In [None]:
# Print column names and types
category_columns = inspector.get_columns(COMMODITY_CATEGORY_TABLE)
for column in category_columns:
    print(column["name"], column["type"])

### Confirm country data has been added by querying the country table

In [None]:
country_list = session.query(Country).limit(10)
for country in country_list:
    print(f"id: {country.id}, country name: {country.country_or_area}")

In [None]:
# Print column names and types
country_columns = inspector.get_columns(COUNTRY_TABLE)
for column in country_columns:
    print(column["name"], column["type"])

### Confirm commodity codes data has been added by querying the commodity codes table

In [None]:
codes_list = session.query(CommodityCode).limit(10)
for code in codes_list:
    print(f"code: {code.commodity_code}, description: {code.commodity_description}")

In [None]:
# Print column names and types
code_columns = inspector.get_columns(COMMODITY_CODE_TABLE)
for column in code_columns:
    print(column["name"], column["type"])

### Confirm commodity exports data has been added by querying the exports table

In [None]:
exports_list = session.query(CommodityExports).limit(10)
for commodity in exports_list:
    print(f"commodity: {commodity.id}, trade flow: {commodity.trade_flow}")

In [None]:
# Print column names and types
exports_columns = inspector.get_columns(COMMODITY_EXPORTS_TABLE)
for column in exports_columns:
    print(column["name"], column["type"])

### Confirm commodity imports data has been added by querying the imports table

In [None]:
imports_list = session.query(CommodityImports).limit(10)
for commodity in imports_list:
    print(f"commodity: {commodity.id}, trade flow: {commodity.trade_flow}")

In [None]:
# Print column names and types
imports_columns = inspector.get_columns(COMMODITY_IMPORTS_TABLE)
for column in imports_columns:
    print(column["name"], column["type"])

## Join tables in database

### Reflect database into ORM classes

In [None]:
Base = automap_base()
Base.prepare(engine, reflect=True)
Base.classes.keys()

### Map classes

In [None]:
CE = Base.classes[COMMODITY_EXPORTS_TABLE]
CO = Base.classes[COMMODITY_CODE_TABLE]
CAT = Base.classes[COMMODITY_CATEGORY_TABLE]
COU = Base.classes[COUNTRY_TABLE]

### Join commodity exports table and country tables

In [None]:
sel = [CE.id, CE.year, CE.trade_flow, CE.trade_value_usd, CE.weight_kg, CE.quantity_name, CE.quantity,
      CE.country_id, CO.commodity_description, COU.country_or_area]
query = session.query(*sel).filter(CE.country_id == COU.id).limit(10).all()


for record in query:
    (CE.id, CE.year, CE.trade_flow, CE.trade_value_usd, CE.weight_kg, CE.quantity_name, CE.quantity,
     CE.country_id, CO.commodity_description, COU.country_or_area) = record
    print(record)

In [None]:
# Phils Section End

In [None]:
#Connors Section Start

In [None]:
#############################
#       DO NOT RUN ##########
#############################
# Dependencies
import pandas as pd
import numpy as np

# Database credentials
from config import username, password

# Used for making database connection.
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

# Used to abstract classes into tables.
from sqlalchemy.ext.declarative import declarative_base

# Used to declare column types.
from sqlalchemy import Column, Integer, String, Float, ForeignKey

# Used to load pandas dataframe into sql.
import d6tstack.utils
import time

# Ignore warnings.
import warnings
warnings.filterwarnings('ignore')

# Used for reading, writing to, and zipping files/folders.
from pathlib import Path
import os, zipfile
import shutil
import glob

In [None]:
#############################
#       DO NOT RUN ##########
#############################
# Constants
COMMODITY_TRADE_CSV = os.path.join(".", "Resources", "commodity_trade_statistics_data.csv")
FINANCIAL_AID_CSV =  os.path.join(".", "Resources", "june-9-data-csv-1.csv")
DB_NAME = "trade_db"
COMMODITY_TABLE = "commodity"
COMMODITY_CATEGORY_TABLE = "commodity_category"
COMMODITY_CODE_TABLE = "commodity_code"
cfg_uri_psql = f"postgresql+psycopg2://{username}:{password}@localhost/{DB_NAME}"
FINANCIAL_AID_TABLE = 'financial_aid'

In [None]:
# Running this cell will unzip the data files in the Resources folder.
extension = ".zip"
extracted_dir_name = "."

# Get the current working directory.
# Need to be in the root directory of this project for this to work.
cwd_dir_name = os.getcwd()
print(f"The current working directory is {cwd_dir_name}.")

os.chdir("Resources") # change directory from working dir to dir with the zip file(s) .
# This should be the "Resources" folder.
dir_name = os.getcwd()
print(f"You are now in the following directory: {dir_name}.")

for item in os.listdir(dir_name): # loop through the items in the directory.
    if item.endswith(extension): # check for ".zip" extension"
        try:
            file_name = os.path.abspath(item) # get full path of files
            zip_ref = zipfile.ZipFile(file_name) # create zipfile object
            unzipped_directory = os.path.join(extracted_dir_name) # reference to the directory where the zip files will be extracted.
            zip_ref.extractall(unzipped_directory) # extract file to dir
            zip_ref.close() # close file
            print(f"Successfully unzipped {item} into the following folder:{dir_name}.")
        except:
            print(f"Error trying to unzip data file(s).")
            print(f"Make sure that the files are closed and you have the correct file/folder permissions.")
            
# Go up one directory into the project root directory.
os.chdir(os.path.normpath(os.getcwd() + os.sep + os.pardir))
print(os.path.normpath(os.getcwd() + os.sep + os.pardir))

In [None]:
financial_df = pd.read_csv(FINANCIAL_AID_CSV)
financial_df.head()

In [None]:
financial_df.count()

In [None]:
cleaned_financial_df = financial_df[['Donor Country','Donor Type','Aid Type','Receiver','Amount','Currency','USD Amount']]
cleaned_financial_df = cleaned_financial_df.dropna()
cleaned_financial_df.count()

In [None]:
rn_cleaned_financial_df = cleaned_financial_df.rename(columns={
                                                     'Donor Country': 'donor_country',
                                                     'Donor Type': 'donor_type',
                                                     'Aid Type': 'aid_type',
                                                     'Receiver': 'receiver',
                                                     'Amount': 'amount',
                                                     'Currency': 'currency',
                                                     'USD Amount': 'USD_amount'})

rn_cleaned_financial_df.head()

In [None]:
rn_cleaned_financial_df.donor_country.unique()

In [None]:
# Dropping bad values
rn_cleaned_financial_df = rn_cleaned_financial_df.drop(rn_cleaned_financial_df[rn_cleaned_financial_df.donor_country.isin(['Not Applicable\r\n','Not Applicable','Not Known'])].index)

rn_cleaned_financial_df.donor_country.unique()
sorted_df = combined_cleaned_df.sort_values(by='donor_country')
sorted_df.donor_country.unique()
sorted_df.to_excel('connor.xlsx')

In [None]:
combined_cleaned_df = rn_cleaned_financial_df.replace({
    'HOLY SEE (VATICAN CITY STATE)':'Italy',
    'Korea, Republic of': 'Rep. of Korea',
    'TAIWAN, PROVINCE OF CHINA': 'China',
    'CANADA': 'Canada',
    'Baharain': 'Bahrain',
    'Czech Republic': 'Czech Rep.',
    'Monaco': 'Morocco',
    'United States': 'USA',
    'Hong Kong': 'China'
})

In [None]:
countries_df.country_or_area.unique()
countries_df.to_excel('phil.xlsx')

In [None]:
combined_cleaned_df.insert(0, 'id', range(0, 0 + len(combined_cleaned_df)))

In [None]:
combined_cleaned_df.head(5)

In [None]:
combined_w_country_id = pd.merge(combined_cleaned_df,countries_df,how='inner',left_on='donor_country',right_on='country_or_area')
combined_w_country_id

In [None]:
combined_w_country_id = combined_w_country_id.rename(columns={'id': 'country_id'})
#del combined_w_country_id['donor_country']
#del combined_w_country_id['country_or_area']

In [None]:

combined_w_country_id.insert(0, 'id', range(0, 0 + len(combined_w_country_id)))
combined_w_country_id

In [None]:
#AidCategory.__table__.drop()

class AidCategory(Base):
    __tablename__ = FINANCIAL_AID_TABLE
    id = Column(Integer, primary_key=True)
    donor_type = Column(String(255))
    aid_type = Column(String(255))
    receiver = Column(String(255))
    amount = Column(Integer)
    currency = Column(String(255))
    USD_amount = Column(Integer)
    country_id = Column(Integer, ForeignKey(f"{COUNTRY_TABLE}.id"), nullable=False)
    
Base.metadata.create_all(engine)

In [None]:
engine.table_names()

In [None]:
#start_time = time.time()
#d6tstack.utils.pd_to_psql(combined_w_country_id, cfg_uri_psql, FINANCIAL_AID_TABLE, if_exists='append', sep='\t')
#print("Time to load aid dataframe into sql:")
#print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
#Connors Section End