In [1]:
# imports
import os
import gc
import pickle
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from joblib import Parallel, delayed
from sklearn.model_selection import KFold

import lightgbm as lgb

tqdm.pandas()
%matplotlib inline


import warnings
warnings.filterwarnings("ignore")

In [24]:
# config
class cfg:
    
    paths = {
        # train path
        "train_csv": "./../inp/Raw Data/train.csv",
        "train_book": "./../inp/Raw Data/book_train.parquet",
        "train_trade": "./../inp/Raw Data/trade_train.parquet",

        # test path
        "test_csv": "./../inp/Raw Data/test.csv",
        "test_book": "./../inp/Raw Data/book_test.parquet",
        "test_trade": "./../inp/Raw Data/trade_test.parquet",

        # model paths
        "xgb_baseline": "./../models/xgbBaseline/",
        "lgb_baseline": "./../models/lgbBaseline/"
    }

    feature_dict_book = {
        "wap1": [np.sum, np.mean, np.std],
        "wap2": [np.sum, np.mean, np.std],
        "log_return1": [np.sum, calculate_rv, np.mean, np.std],
        "log_return2": [np.sum, calculate_rv, np.mean, np.std],
        "wap_balance": [np.sum, np.mean, np.std],
        "volume_imbalance": [np.sum, np.mean, np.std],
        "total_volume": [np.sum, np.mean, np.std],
        "price_spread1": [np.sum, np.mean, np.std],
        "price_spread2": [np.sum, np.mean, np.std],
        "bid_spread": [np.sum, np.mean, np.std],
        "ask_spread": [np.sum, np.mean, np.std],
    }

    feature_dict_trade = {
        "log_return": [calculate_rv],
        "seconds_in_bucket": [count_unique],
        "size": [np.sum],
        "order_count": [np.mean]
    }

    model_params = {
        "xgb_bl": {
            "objective": "reg:squarederror",
            "booster": "gbtree",
            "nthread": -1,
            "eta": 0.3,
            "max_depth": 8,
            "min_child_weight": 1,
            "sampling_method": "uniform",
            # "tree_method": "gpu_hist"
        },
        "lgb_bl": {
            "objective": "rmse",
            "boosting_type": "gbdt",
            "learning_rate": 0.05,
        }
    }
    pass

In [25]:
# feature utils
def calculate_wap(df, rank="1"):
    """
    Weighted Average Pricing for a stock at a given time ID is given by:
    (bid_price1 * ask_size1 + bid_size1 * ask_price1)/(bid_size1 + ask_size1)

    It can further be extended to:

        sum(bid_price_i * ask_size_i + bid_size_i * ask_price_i)/sum(bid_size_i + ask_size_i)

    :param rank: which wap to calculate
    :param df: parquet table containing order book
    :return: wap for given rank
    """
    return (df[f"bid_price{rank}"] * df[f"ask_size{rank}"] + df[f"bid_size{rank}"] * df[f"ask_price{rank}"]) / (df[f"bid_size{rank}"] + df[f"ask_size{rank}"])


def calculate_inter_wap(df, rank="1"):
    return (df[f"bid_price{rank}"] * df[f"bid_size{rank}"] + df[f"ask_size{rank}"] * df[f"ask_price{rank}"]) / (
                df[f"bid_size{rank}"] + df[f"ask_size{rank}"])
    pass


def calculate_log_return(series):
    return np.log(series).diff()


def calculate_rv(series):
    return np.sqrt(np.sum(np.square(series)))


def count_unique(series):
    return len(np.unique(series))


def get_stats_window(df, seconds_in_bucket, features_dict, add_suffix=False):
    df_feature = df[df["seconds_in_bucket"] >= seconds_in_bucket].groupby(["time_id"]).agg(features_dict).reset_index()
    df_feature.columns = ["_".join(col) for col in df_feature.columns]

    if add_suffix:
        df_feature = df_feature.add_suffix("_" + str(seconds_in_bucket))

    return df_feature
    pass


def window_stats(df, feature_dict, second_windows):
    df_merged = get_stats_window(df, seconds_in_bucket=0, features_dict=feature_dict)

    temp_dfs = []
    for window in second_windows:
        temp_dfs.append((window, get_stats_window(df, seconds_in_bucket=window, features_dict=feature_dict, add_suffix=True)))

    for window, temp_df in temp_dfs:
        df_merged = df_merged.merge(temp_df, how="left", left_on="time_id_", right_on=f"time_id__{window}")
        df_merged.drop(columns=[f"time_id__{window}"], inplace=True)

    return df_merged
    pass

In [26]:
# order book features
def get_book_features(file_path):
    book_df = pd.read_parquet(file_path)

    # calculate wap
    book_df['wap1'] = calculate_wap(book_df, rank="1")
    book_df['wap2'] = calculate_wap(book_df, rank="2")
    book_df['iwap1'] = calculate_inter_wap(book_df, rank="1")
    book_df['iwap2'] = calculate_inter_wap(book_df, rank="2")

    # calculate log return
    book_df["log_return1"] = book_df.groupby(["time_id"])["wap1"].apply(calculate_log_return)
    book_df["log_return2"] = book_df.groupby(["time_id"])["wap2"].apply(calculate_log_return)
    book_df["inter_log_return1"] = book_df.groupby(["time_id"])["iwap1"].apply(calculate_log_return)
    book_df["inter_log_return2"] = book_df.groupby(["time_id"])["iwap2"].apply(calculate_log_return)

    # calculate balance
    book_df["wap_balance"] = abs(book_df["wap1"] - book_df["wap2"])
    book_df["volume_imbalance"] = abs(
        (book_df["ask_size1"] + book_df["ask_size2"]) - (book_df["bid_size1"] + book_df["bid_size2"]))
    book_df["total_volume"] = book_df["ask_size1"] + book_df["ask_size2"] + book_df["bid_size1"] + book_df[
        "bid_size2"]

    # calculate spread
    book_df["price_spread1"] = (book_df["ask_price1"] - book_df["bid_price1"]) / (
            (book_df["ask_price1"] + book_df["bid_price1"]) / 2)
    book_df["price_spread2"] = (book_df["ask_price2"] - book_df["bid_price2"]) / (
            (book_df["ask_price2"] + book_df["bid_price2"]) / 2)

    book_df["bid_spread"] = book_df["bid_price1"] - book_df["bid_price2"]
    book_df["ask_spread"] = book_df["ask_price1"] - book_df["ask_price2"]

    book_df_merged = window_stats(book_df, cfg.feature_dict_book, [450, 300, 150])

    book_df_merged["row_id"] = book_df_merged["time_id_"].apply(lambda x: f"{file_path.split('=')[1]}-{x}")
    book_df_merged.drop(["time_id_"], axis=1, inplace=True)

    return book_df_merged
                                                                
# trade features
def get_trade_features(file_path):
    trade_df = pd.read_parquet(file_path)

    trade_df["log_return"] = trade_df.groupby(["time_id"])["price"].apply(calculate_log_return)

    trade_df_merged = window_stats(trade_df, cfg.feature_dict_trade, [450, 300, 150])

    trade_df_merged = trade_df_merged.add_prefix("trade_")

    trade_df_merged["row_id"] = trade_df_merged["trade_time_id_"].apply(lambda x: f"{file_path.split('=')[1]}-{x}")
    trade_df_merged.drop(["trade_time_id_"], axis=1, inplace=True)

    return trade_df_merged                                                

In [27]:
# create dataset
class GetData:
    def __init__(self, df, book_path, trade_path):
        self.df = df.copy(deep=True)
        self.order_book_path = book_path
        self.trade_path = trade_path

        self._get_rowid()

    def _get_rowid(self):
        self.df["row_id"] = self.df["stock_id"].astype(str) + "-" + self.df["time_id"].astype(str)

    def get_time_stock(self):
        vol_cols = ['log_return1_calculate_rv', 'log_return2_calculate_rv',
                    'log_return1_calculate_rv_450', 'log_return2_calculate_rv_450',
                    'log_return1_calculate_rv_300', 'log_return2_calculate_rv_300',
                    'log_return1_calculate_rv_150', 'log_return2_calculate_rv_150',
                    'trade_log_return_calculate_rv', 'trade_log_return_calculate_rv_450',
                    'trade_log_return_calculate_rv_300', 'trade_log_return_calculate_rv_150']

        df_stock_id = self.df.groupby(['stock_id'])[vol_cols].agg(['mean', 'std', 'max', 'min']).reset_index()
        df_stock_id.columns = ['_'.join(col) for col in df_stock_id.columns]
        df_stock_id = df_stock_id.add_suffix('_' + 'stock')

        df_time_id = self.df.groupby(['time_id'])[vol_cols].agg(['mean', 'std', 'max', 'min']).reset_index()
        df_time_id.columns = ['_'.join(col) for col in df_time_id.columns]
        df_time_id = df_time_id.add_suffix('_' + 'time')

        # Merge with original dataframe
        self.df = self.df.merge(df_stock_id, how='left', left_on=['stock_id'], right_on=['stock_id__stock'])
        self.df = self.df.merge(df_time_id, how='left', left_on=['time_id'], right_on=['time_id__time'])
        self.df.drop(['stock_id__stock', 'time_id__time'], axis=1, inplace=True)
        return self.df

    def process_features(self, list_stock_ids):
        def parallel_helper(stock_id):
            book_sample_path = os.path.join(self.order_book_path, f"stock_id={stock_id}")
            trade_sample_path = os.path.join(self.trade_path, f"stock_id={stock_id}")

            return pd.merge(get_book_features(book_sample_path), get_trade_features(trade_sample_path),
                            on="row_id",
                            how="left")

        df = Parallel(n_jobs=4, verbose=1)(delayed(parallel_helper)(stock_id) for stock_id in list_stock_ids)
        df = pd.concat(df, ignore_index=True)

        return df

    def get_features(self):
        features_df = self.process_features(self.df["stock_id"].unique())
        self.df = self.df.merge(features_df, on=["row_id"], how="left")

        return self.get_time_stock()
        pass

In [28]:
# metric utils
def rmspe(y_true, y_pred):
    return np.sqrt(np.mean(np.square((y_true - y_pred) / y_true)))


def feval_rmspe(y_pred, model, is_xgb=True):
    y_true = model.get_label()

    if is_xgb:
        return "RMSPE", rmspe(y_true, y_pred)

    return "RMSPE", rmspe(y_true, y_pred), False

In [31]:
# model utils
def feval_wrapper(y_pred, model):
    return feval_rmspe(y_pred, model, is_xgb=False)

class TrainFer:
    def __init__(self, params_dict, n_splits, model_path, random_state=2021):
        self.params = params_dict
        self.n_splits = n_splits
        self.random_state = random_state
        self.model_path = model_path
        if not os.path.isdir(model_path):
            os.makedirs(model_path)

    def train(self, X, y):
        oof_predictions = np.zeros(X.shape[0])
        kfold = KFold(n_splits=self.n_splits, random_state=self.random_state, shuffle=True)
        oof_scores = []

        for fold, (train_idx, val_idx) in enumerate(kfold.split(X)):
            print(f"\nFold - {fold}\n")

            x_train, y_train = X.iloc[train_idx], y.iloc[train_idx]
            x_val, y_val = X.iloc[val_idx], y.iloc[val_idx]

            dtrain = lgb.Dataset(x_train, y_train, weight=1/np.square(y_train), categorical_feature=["stock_id"])
            dval = lgb.Dataset(x_val, y_val, weight=1/np.square(y_val), categorical_feature=["stock_id"])

            model = lgb.train(params=self.params,
                              num_boost_round=10000,
                              train_set=dtrain,
                              valid_sets=dval,
                              verbose_eval=250,
                              early_stopping_rounds=200,
                              feval=feval_wrapper)

            pickle.dump(model, open(os.path.join(self.model_path, f"lgb_bl_{fold}.pkl"), "wb"))
            fold_preds = model.predict(x_val)
            oof_score = rmspe(y_val, fold_preds)
            print(f"\nRMSPE of fold {fold}: {oof_score}")
            
            oof_scores.append(oof_score)
            oof_predictions[val_idx] = fold_preds
        
        print(f"\nOOF Scores: {oof_scores}\n")
        rmspe_score = rmspe(y, oof_predictions)
        print(f"OOF RMSPE: {rmspe_score}")
        
    def infer(self, x_test):
        test_predictions = np.zeros(x_test.shape[0])

        for mpth in os.listdir(self.model_path):
            model = pickle.load(open(os.path.join(self.model_path, mpth), "rb"))
            test_predictions += model.predict(x_test)/5

        return test_predictions

In [37]:
if __name__=="__main__":
    _ = gc.collect()
    is_train = False

    model = TrainFer(cfg.model_params["lgb_bl"], n_splits=5, model_path=cfg.paths["lgb_baseline"])
    
    if is_train:
        train = pd.read_csv(cfg.paths["train_csv"])
        train_data = GetData(train, cfg.paths["train_book"], cfg.paths["train_trade"])
        train_df = train_data.get_features()
        
        model.train(train_df.drop(columns=["row_id", "target", "time_id"]), train_df["target"])
    else:
        test = pd.read_csv(cfg.paths["test_csv"])
        test_data = GetData(test, cfg.paths["test_book"], cfg.paths["test_trade"])
        test_df = test_data.get_features()
        
        preds = model.infer(test_df.drop(columns=["row_id", "time_id"])) 
        test["target"] = preds
        print(test.head())

[Parallel(n_jobs=4)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=4)]: Done   1 out of   1 | elapsed:    0.7s finished


   stock_id  time_id row_id    target
0         0        4    0-4  0.001727
1         0       32   0-32  0.001797
2         0       34   0-34  0.001797


EOF