# ETL Pipeline

In [None]:
import importlib
from pathlib import Path
import time

import pandas as pd

import helpers
importlib.reload(helpers)
from helpers import arrange_and_convert_columns, clean_tickers, create_df, map_id_column, match_string

import utils
importlib.reload(utils)
from utils import logger, run_sql

start_time = time.time()
logger.info('ETL pipeline started...')

# Set floats to 4 decimal places for extra precision in analysis.
pd.set_option('display.float_format', lambda x: '%.4f' % x)

account_positions_csv_file = Path().cwd().parent / 'data' / 'account_positions.csv'
accounts_csv_file = Path().cwd().parent / 'data' / 'accounts.csv'
price_history_csv_file = Path().cwd().parent / 'data' / 'price_history.csv'
transactions_csv_file = Path().cwd().parent / 'data' / 'transactions.csv'

# Create dataframes from csv files.
account_positions = pd.read_csv(account_positions_csv_file)
accounts = pd.read_csv(accounts_csv_file)
price_history = pd.read_csv(price_history_csv_file)
transactions = pd.read_csv(transactions_csv_file)

logger.info('Dataframes created from .csv files.')


In [None]:
#===================================
# Dimension ticker dataframe.
#===================================
dim_ticker = create_df('dim_ticker', price_history, ['ticker_symbol'], None, None, 'ticker').reset_index(drop=True)


In [None]:
# Order columns consistently with db schema and convert dtypes for optimal storage.
dim_ticker = arrange_and_convert_columns(dim_ticker, ['ticker_id', 'ticker_symbol'], {
    'ticker_id': 'int64',
    'ticker_symbol': 'string',
}, 'Dimension ticker')


In [None]:
#=================================
# Dimension date dataframe.
#=================================
# Extract all dates from all dataframes.
account_position_dates = create_df('account_position_dates', account_positions, ['last_updated']).rename(columns={'last_updated': 'date'})
account_dates = create_df('account_dates', accounts, ['created_at']).rename(columns={'created_at': 'date'})
price_history_dates = create_df('price_history_dates', price_history, ['date'])
transaction_dates = create_df('transaction_dates', transactions, ['trade_date']).rename(columns={'trade_date': 'date'})


In [None]:
# Concatenate date dataframes to create master date dataframe.
unique_dates = pd.concat([account_position_dates, account_dates, price_history_dates, transaction_dates]).drop_duplicates()

# Create dimension date dataframe and reset index.
dim_date = create_df('dim_date', unique_dates, ['date'], None, 'date', 'date', 'date')


In [None]:
# Order columns consistently with db schema and convert dtypes for optimal storage.
dim_date = arrange_and_convert_columns(dim_date, ['date_id', 'date'], {
    'date_id': 'int64',
    'date': 'datetime64[ns]'
}, 'Dimension date')

In [None]:
#====================================
# Dimension account dataframe.
#====================================
dim_account = create_df('dim_account', accounts, ['account_id', 'user_name', 'first_name', 'last_name', 'account_type', 'created_at'], ['account_id'], 'created_at', None, 'created_at') # Drop rows where account ID is NaN since no positions or transactions can be linked to them.


In [None]:
# Fix typos in account types column.
correct_account_types = ['joint', 'individual', 'retirement']

for index in dim_account.index:
    dim_account.loc[index, 'account_type'] = match_string(dim_account.loc[index, 'account_type'], correct_account_types)

# Change account type in dim account to i: individual, j: joint, n: n/a or r: retirement.
dim_account['account_type'] = dim_account['account_type'].replace({'joint': 'j', 'individual': 'i', 'n/a': 'n', 'retirement': 'r'})

logger.info('Account type errors in dimension account fixed and replaced with single characters.')


In [None]:
# Map ID column to corresponding values in dimension table.
dim_account = map_id_column(dim_date, 'date', 'date_id', dim_account, 'created_at').rename(columns={'date_id': 'created_at_date_id'})


In [None]:
# Order columns consistently with db schema and convert dtypes for optimal storage.
dim_account = arrange_and_convert_columns(dim_account, ['account_id', 'user_name', 'first_name', 'last_name', 'account_type', 'created_at_date_id'], {
    'account_id': 'int64',
    'user_name': 'string',
    'first_name': 'string',
    'last_name': 'string',
    'account_type': 'category',
    'created_at_date_id': 'Int64'
}, 'Dimension account')


In [None]:
#===================================
# Fact account position dataframe.
#===================================
fact_account_position = create_df('fact_account_position', account_positions, ['account_id', 'ticker_symbol', 'shares_held', 'last_updated'], list(account_positions.columns), 'last_updated', 'account_position', 'last_updated')


In [None]:
# Drop rows with invalid tickers.
fact_account_position = clean_tickers(fact_account_position)


In [None]:
# Map ID column to corresponding values in dimension table.
fact_account_position = map_id_column(dim_ticker, 'ticker_symbol', 'ticker_id', fact_account_position, 'ticker_symbol')
fact_account_position = map_id_column(dim_date, 'date', 'date_id', fact_account_position, 'last_updated').rename(columns={'date_id': 'last_updated_date_id'})


In [None]:
fact_account_position = arrange_and_convert_columns(fact_account_position, ['account_position_id', 'shares_held', 'account_id', 'ticker_id', 'last_updated_date_id'], {
    'account_position_id': 'int64',
    'shares_held': 'float64',
    'account_id': 'int64',
    'ticker_id': 'int64',
    'last_updated_date_id': 'int64'
}, 'Fact account position')


In [None]:
#===================================
# Fact price_history dataframe.
#===================================
fact_price_history = create_df('fact_price_history', price_history, list(price_history.columns), None, 'date', 'price_history', 'date')


In [None]:
# Map ID column to corresponding values in dimension table.
fact_price_history = map_id_column(dim_ticker, 'ticker_symbol', 'ticker_id', fact_price_history, 'ticker_symbol')
fact_price_history = map_id_column(dim_date, 'date', 'date_id', fact_price_history, 'date')


In [None]:
# Order columns consistently with db schema and convert dtypes for optimal storage.
fact_price_history = arrange_and_convert_columns(fact_price_history, ['price_history_id', 'open', 'high', 'low', 'close', 'adj_close', 'volume', 'ticker_id', 'date_id'], {
    'price_history_id': 'int64',
    'open': 'float64',
    'high': 'float64',
    'low': 'float64',
    'close': 'float64',
    'adj_close': 'float64',
    'volume': 'float64',
    'ticker_id': 'int64',
    'date_id': 'int64'
}, 'Fact price history')


In [None]:
#===================================
# Fact transaction dataframe.
#===================================
fact_transaction = create_df('fact_transaction', transactions, list(transactions.columns), list(transactions.columns), 'trade_date', 'transaction', 'trade_date')


In [None]:
# Fix typos in trade type column by defining regex and applying mask for each trade type.
buy_regex = r'[buy]'
sell_regex = r'[sell]'

buy_mask = fact_transaction['trade_type'].str.contains(buy_regex, case=False, na=False)
sell_mask = fact_transaction['trade_type'].str.contains(sell_regex, case=False, na=False)

fact_transaction.loc[buy_mask, 'trade_type'] = 'buy'
fact_transaction.loc[sell_mask & (~buy_mask), 'trade_type'] = 'sell'

logger.info('Typos in trade type column fixed.')


In [None]:
# Convert trade type to 'b' for 'buy' or 's' for 'sell.'
fact_transaction['trade_type'] = fact_transaction['trade_type'].replace({'buy': 'b', 'sell': 's'})
logger.info('Trade types replaced with single characters.')


In [None]:
# Drop rows with invalid tickers.
fact_transaction = clean_tickers(fact_transaction)


In [None]:
# Map ID column to corresponding values in dimension table.
fact_transaction = map_id_column(dim_date, 'date', 'date_id', fact_transaction, 'trade_date').rename(columns={'date_id': 'trade_date_id'})
fact_transaction = map_id_column(dim_ticker, 'ticker_symbol', 'ticker_id', fact_transaction, 'ticker_symbol')


In [None]:
# Order columns consistently with db schema and convert dtypes for optimal storage.
fact_transaction = arrange_and_convert_columns(fact_transaction, ['transaction_id', 'trade_type', 'shares', 'price', 'account_id', 'ticker_id', 'trade_date_id'], {
    'transaction_id': 'int64',
    'trade_type': 'category',
    'shares': 'float64',
    'price': 'float64',
    'account_id': 'int64',
    'ticker_id': 'int64',
    'trade_date_id': 'int64'
}, 'Fact transaction')


In [None]:
# Create schema.
create_schema = 'CREATE SCHEMA IF NOT EXISTS dwh;'

run_sql('Creating data warehouse schema "dwh"...', create_schema)


In [None]:
# Create Postgres tables.
dw_ddl = """
-- Dimension tables
DROP TABLE IF EXISTS dwh.dim_account CASCADE;
CREATE TABLE dwh.dim_account (
    account_id INT PRIMARY KEY NOT NULL,
    user_name VARCHAR(100),
    first_name VARCHAR(50),
    last_name VARCHAR(50),
    account_type CHAR(1) NOT NULL,
    created_at_date_id INT,
    CONSTRAINT fk_dim_account_date FOREIGN KEY (created_at_date_id)
        REFERENCES dwh.dim_date (date_id)
);

DROP TABLE IF EXISTS dwh.dim_date CASCADE;
CREATE TABLE dwh.dim_date (
    date_id INT PRIMARY KEY NOT NULL,
    date TIMESTAMP NOT NULL
);

DROP TABLE IF EXISTS dwh.dim_ticker CASCADE;
CREATE TABLE dwh.dim_ticker (
    ticker_id INT PRIMARY KEY NOT NULL,
    ticker_symbol VARCHAR(6) NOT NULL
);

-- Fact tables
DROP TABLE IF EXISTS dwh.fact_account_position CASCADE;
CREATE TABLE dwh.fact_account_position (
    account_position_id INT PRIMARY KEY NOT NULL,
    shares_held NUMERIC(6,2) NOT NULL,
    account_id INT NOT NULL,
    ticker_id INT NOT NULL,
    last_updated_date_id INT NOT NULL,
    CONSTRAINT fk_fact_account_position_account FOREIGN KEY (account_id)
        REFERENCES dwh.dim_account (account_id),
    CONSTRAINT fk_fact_account_position_ticker FOREIGN KEY (ticker_id)
        REFERENCES dwh.dim_ticker (ticker_id),
    CONSTRAINT fk_fact_account_position_date FOREIGN KEY (last_updated_date_id)
        REFERENCES dwh.dim_date (date_id)
);

DROP TABLE IF EXISTS dwh.fact_price_history CASCADE;
CREATE TABLE dwh.fact_price_history (
    price_history_id INT PRIMARY KEY NOT NULL,
    open NUMERIC(6,2),
    high NUMERIC(6,2),
    low NUMERIC(6,2),
    close NUMERIC(6,2),
    adj_close NUMERIC(6,2),
    volume NUMERIC(6,2),
    ticker_id INT NOT NULL,
    date_id INT NOT NULL,
    CONSTRAINT fk_fact_price_history_ticker FOREIGN KEY (ticker_id)
        REFERENCES dwh.dim_ticker (ticker_id),
    CONSTRAINT fk_fact_price_history_date FOREIGN KEY (date_id)
        REFERENCES dwh.dim_date (date_id)
);

DROP TABLE IF EXISTS dwh.fact_transactions CASCADE;
CREATE TABLE dwh.fact_transactions (
    transaction_id INT PRIMARY KEY NOT NULL,
    trade_type CHAR(1) NOT NULL,
    shares NUMERIC(6,2) NOT NULL,
    price NUMERIC(6,2) NOT NULL,
    account_id INT NOT NULL,
    ticker_id INT NOT NULL,
    trade_date_id INT NOT NULL,
    CONSTRAINT fk_fact_transactions_account FOREIGN KEY (account_id)
        REFERENCES dwh.dim_account (account_id),
    CONSTRAINT fk_fact_transactions_ticker FOREIGN KEY (ticker_id)
        REFERENCES dwh.dim_ticker (ticker_id),
    CONSTRAINT fk_fact_transactions_date FOREIGN KEY (trade_date_id)
        REFERENCES dwh.dim_date (date_id)
);
"""


In [None]:
end_time = time.time()
execution_time = end_time - start_time

logger.info(f'âˆ´ ETL pipeline finished.\nExecution time: {execution_time:.4f} seconds.')
