## Global Commodity Trade ETL

In [111]:
import pandas as pd
from sqlalchemy import create_engine, text

### Extraction

In [112]:
# Same values as the docker-compose file
HOST = "localhost"
PORT = 5433
DATABASE = "etl_python"
USER = "my_user"
PASSWORD = "my_password"

In [113]:
# Connect to the database
engine = create_engine(f'postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}')

In [114]:
# By using a transaction, ensures that the connection is closed automatically 
# when the block is exited, even if an exception is raised.
with engine.begin() as conn:
    query = text("SELECT * FROM trades")
    # Batch loading the data (Number of rows)
    chunk_size = 1000
    # Create a pandas generator to read the data in batches
    trades_generator = pd.read_sql_query(query, conn, chunksize=chunk_size)
    # Concatenate batch DataFrames into a single one
    df_trades = pd.concat(trades_generator)

In [115]:
# Read the JSON file with the countries data
df_countries = pd.read_json('./sources/country_data.json')

In [116]:
# Read the CSV file with the products codes
df_codes = pd.read_csv('./sources/hs_codes.csv')

In [117]:
# All product codes with Level == 2
df_parents = df_codes[df_codes['Level']==2].copy()

### Transformation

##### Clean product codes

In [118]:
# Only include the rows with a non-null value in the Code_comm column.
df_codes = df_codes[df_codes['Code_comm'].notnull()]

In [119]:
def get_clean_code(product_code):
    """
    The first 6 digits of the Code column in df_codes will later 
    allow us to match with trades in df_trades.

    The rows with a 1-digit Code_comm have an 11-digit Code column, 
    while the rest have a 12-digit Code column.
    
    Return: tuple(clean_code, parent_description)
    """
    
    product_code = str(product_code)

    # Define a dictionary to map code length to slice indices
    slice_indices = {11: (5, 1), 12: (6, 2)}

    # Get the appropriate slice indices based on the length of the code
    clean_product_code_slice_index, parent_code_slice_index = slice_indices[len(product_code)]
    
    # Slice the code to get the clean code and parent code
    clean_code = product_code[:clean_product_code_slice_index]
    parent_code = product_code[:parent_code_slice_index]

    try:
        # Only get the Description as a string
        parent_description = df_parents[df_parents['Code_comm']==parent_code]['Description'].values[0]
    except:
        parent_description = None
    return (clean_code, parent_description)

In [120]:
# Apply to each row the get_clean_code function
df_codes[['clean_code','parent_description']] = df_codes.apply(lambda x: get_clean_code(x['Code']), axis=1, result_type='expand')

In [121]:
# Only includes the rows with a non-null value in the clean_code column
# Then, it selects only three columns
df_codes = df_codes[df_codes['clean_code'].notnull()][['clean_code','Description','parent_description']]

In [122]:
# Create a unique identifier column
df_codes['id_code'] = df_codes.index

In [123]:
# Convert clean_code column to int
df_codes['clean_code'] = df_codes['clean_code'].astype('int64')

##### Clean countries data

In [124]:
# The alpha-3 columns in df_countries will later allow us 
# to match with trades in df_trades.
df_countries = df_countries[['alpha_3', 'country', 'region', 'sub_region']]

In [125]:
# Only includes the rows with a non-null value in the alpha-3 column
df_countries = df_countries[df_countries['alpha-3'].notnull()]

In [126]:
# Create a unique identifier column 
# (+ 1, because the index of df_countries start at 0)
df_countries['id_country'] = df_countries.index + 1

##### Merge dataframes

In [127]:
# Merge the df_trades Dataframe with the df_codes Dataframe
df_trades_clean = df_trades.merge(df_codes[['clean_code', 'id_code']],
                                  how='left',
                                  left_on='comm_code',
                                  right_on='clean_code')

In [128]:
# Merge the df_trades_clean Dataframe with the df_countries Dataframe
df_trades_clean = df_trades_clean.merge(df_countries[['alpha-3', 'id_country']],
                                  how='left',
                                  left_on='country_code',
                                  right_on='alpha-3')

In [129]:
def create_dimension(data, id_name):
    """
    Creates a new dataframe with the given id_name
    
    Keyword arguments:
    data -- A pandas Series
    id_name -- string

    Return: A pandas DataFrame
    """
    keys_list = []
    value = 1
    
    for _ in data:
        keys_list.append(value)
        value += 1
    
    return pd.DataFrame({id_name:keys_list, 'values': data})

In [130]:
# Create the dimensions
df_quantity = create_dimension(df_trades_clean['quantity_name'].unique(), 'id_quantity')
df_flow = create_dimension(df_trades_clean['flow'].unique(), 'id_flow')
df_year = create_dimension(df_trades_clean['year'].unique(), 'id_year')

In [133]:
# Merges to populate df_trades_clean with the dimensions ids
df_trades_clean = df_trades_clean.merge(df_quantity,
                                  how='left',
                                  left_on='quantity_name',
                                  right_on='values')
df_trades_clean = df_trades_clean.merge(df_flow,
                                  how='left',
                                  left_on='flow',
                                  right_on='values')
df_trades_clean = df_trades_clean.merge(df_year,
                                  how='left',
                                  left_on='year',
                                  right_on='values')

In [134]:
# Create a unique identifier column 
# (+ 1, because the index of df_trades_clean start at 0)
df_trades_clean['id_trades'] = df_trades_clean.index + 1

In [136]:
# Create df_trades_final to be our facts table with:
# An unique identifier, metrics(trade_usd, kg, quantity),
# and ids to relate them with the dimensions

df_trades_final = df_trades_clean[['id_trades', 'trade_usd', 'kg', 'quantity', 'id_code',
                                'id_country', 'id_quantity', 'id_flow', 'id_year']].copy()

##### Rearrange the columns

In [137]:
df_countries = df_countries[['id_country','alpha-3', 'country', 'region', 'sub-region']]
df_codes = df_codes[['id_code','clean_code', 'Description', 'parent_description']]

##### Change columns names to match with the database

In [159]:
df_year.rename(columns = {'values':'year'}, inplace = True)
df_quantity.rename(columns = {'values':'quantity'}, inplace = True)
df_codes.rename(columns = {'clean_code':'code', 'Description':'description'}, inplace = True)

### Load

In [162]:
# Values for the trades database
HOST = "localhost"
PORT = 5433
DATABASE = "trades"
USER = "my_user"
PASSWORD = "my_password"

In [163]:
# Connect to the database
engine = create_engine(f'postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}')

In [170]:
with engine.begin() as conn:
    # Load the DataFrames to the tables
    df_year.to_sql(name="years", con=conn, index=False, if_exists='append', chunksize=1000)
    df_flow.to_sql(name="flow", con=conn, index=False, if_exists='append', chunksize=1000)
    df_quantity.to_sql(name="quantity", con=conn, index=False, if_exists='append', chunksize=1000)
    df_countries.to_sql(name="countries", con=conn, index=False, if_exists='append', chunksize=1000)
    df_codes.to_sql(name="codes", con=conn, index=False, if_exists='append', chunksize=1000)
    df_trades_final.to_sql(name="trades", con=conn, index=False, if_exists='append', chunksize=1000)

In [None]:
df_trades_final.head()