In [6]:
from engineer import engineer_data, load_source_data, wap_ewa
import numpy as np
import pandas as pd
from joblib import Parallel, delayed

from tqdm.notebook import tqdm
from typing import Tuple
import os
from pathlib import Path

In [23]:
DATA_DIR = Path(os.getcwd()).joinpath("data")

int_cols = ['stock_id', 'time_id', 'seconds_in_bucket', 'size_cs', 'order_count_cs']
float_cols = ["spread_ratio", "volume_imbalance", "wap1", "wap2", "rvol_0.1_1", 
              "rvol_0.1_0.8", "rvol_0_1", "rvol_0_0.8", "rvol_0.1_1_300", "rvol_0.1_0.8_300", 
              "rvol_0_1_300", "rvol_0_0.8_300", "mean_rvol"]

def engineer_seq_row(df_stock: pd.DataFrame, time_id: int) -> pd.DataFrame:
    """Generates a sequence of features for a given time bucket (time_id)"""
    dfj = pd.merge(
        pd.DataFrame({"seconds_in_bucket": [i for i in range(600)], "time_id": time_id}), 
        df_stock[df_stock['time_id'] == time_id], 
        how="left", 
        on=["seconds_in_bucket", "time_id"]
    ) 

    ffill_cols = ["bid_price1", "ask_price1", "bid_price2", "ask_price2", "bid_size1", "ask_size1", "bid_size2", "ask_size2"]
    dfj[ffill_cols] = dfj[ffill_cols].ffill()

    dfj[['size_cs', 'order_count_cs']] = dfj[['size', 'order_count']].fillna(0).cumsum().astype(np.uint16)

    dfj['spread_ratio'] = ((dfj['ask_price1'] / dfj['bid_price1'] - 1) * 10000).astype(np.float32)
    dfj['volume_imbalance'] = abs((dfj['ask_size1'] + dfj['ask_size2']) - (dfj['bid_size1'] + dfj['bid_size2'])).astype(np.float32)

    dfj['wap1'] = (dfj['bid_price1'] * dfj['ask_size1'] + dfj['ask_price1'] * dfj['bid_size1']) / (dfj['bid_size1'] + dfj['ask_size1'])
    dfj['wap2'] = (dfj['bid_price2'] * dfj['ask_size2'] + dfj['ask_price2'] * dfj['bid_size2']) / (dfj['bid_size2'] + dfj['ask_size2'])

    dfj = wap_ewa(dfj, wap2_wt=0, ewa_alpha=1)
    dfj = wap_ewa(dfj, wap2_wt=0, ewa_alpha=0.8)
    dfj = wap_ewa(dfj, wap2_wt=0.1, ewa_alpha=1)
    dfj = wap_ewa(dfj, wap2_wt=0.1, ewa_alpha=0.8)

    dfj["rvol_0.1_1"] = np.sqrt(np.square(np.log(dfj["wap_0.1_1"]).diff()).cumsum()).astype(np.float32)
    dfj["rvol_0.1_0.8"] = np.sqrt(np.square(np.log(dfj["wap_0.1_0.8"]).diff()).cumsum()).astype(np.float32)
    dfj["rvol_0_1"] = np.sqrt(np.square(np.log(dfj["wap_0_1"]).diff()).cumsum()).astype(np.float32)
    dfj["rvol_0_0.8"] = np.sqrt(np.square(np.log(dfj["wap_0_0.8"]).diff()).cumsum()).astype(np.float32)
    dfj["rvol_0.1_1_300"] = np.sqrt(np.square(np.log(dfj["wap_0.1_1"]).diff()).rolling(window=300).sum()).astype(np.float32)
    dfj["rvol_0.1_0.8_300"] = np.sqrt(np.square(np.log(dfj["wap_0.1_0.8"]).diff()).rolling(window=300).sum()).astype(np.float32)
    dfj["rvol_0_1_300"] = np.sqrt(np.square(np.log(dfj["wap_0_1"]).diff()).rolling(window=300).sum()).astype(np.float32)
    dfj["rvol_0_0.8_300"] = np.sqrt(np.square(np.log(dfj["wap_0_0.8"]).diff()).rolling(window=300).sum()).astype(np.float32)

    rvol_cols = ["rvol_0.1_1", "rvol_0.1_0.8", "rvol_0_1", "rvol_0_0.8", "rvol_0.1_1_300", "rvol_0.1_0.8_300", "rvol_0_1_300", "rvol_0_0.8_300"]
    dfj['mean_rvol'] = dfj[rvol_cols].mean(axis=1)

    feature_cols = ['size_cs', 'order_count_cs', 'spread_ratio', 'volume_imbalance' ,
                    "wap1", "wap2", 'rvol_0.1_1', 'rvol_0.1_0.8', 'rvol_0_1', 'rvol_0_0.8', 
                    'rvol_0.1_1_300', 'rvol_0.1_0.8_300', 'rvol_0_1_300', 'rvol_0_0.8_300', 'mean_rvol']

    id_cols = ['time_id', 'seconds_in_bucket']
    dfj[id_cols] = dfj[id_cols].astype(np.uint16)

    return dfj[id_cols + feature_cols]

In [24]:
df_train_targets = pd.read_csv(DATA_DIR.joinpath("train.csv"))
train_ids = df_train_targets["stock_id"].unique()

In [None]:
def for_joblib(stock_id: int, train_test: str) -> pd.DataFrame:
    df_book = pd.read_parquet(load_source_data(stock_id, "book", train_test))
    df_trade = pd.read_parquet(load_source_data(stock_id, "trade", train_test))
    
    df_joined = pd.merge(
        df_book,
        df_trade,
        how="outer",
        on=["time_id", "seconds_in_bucket"],
    )

    time_ids = df_joined["time_id"].unique()

    dfs = Parallel(n_jobs=4, verbose=1)(delayed(engineer_seq_row)(df_joined, t_id) for t_id in tqdm(time_ids))
    df = pd.concat(dfs, ignore_index=True)
    df["stock_id"] = np.uint8(stock_id)
    return df



dfs = Parallel(n_jobs=12, verbose=1)(delayed(for_joblib)(stock_id, "train") for stock_id in tqdm(train_ids)) # 10gb worth of data if loading whole dataset


  0%|          | 0/112 [00:00<?, ?it/s]

[Parallel(n_jobs=12)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=12)]: Done  26 tasks      | elapsed:  4.6min
[Parallel(n_jobs=12)]: Done 112 out of 112 | elapsed: 15.2min finished


In [2]:
import torch
torch.cuda.empty_cache()

In [None]:
feature_cols = ['seconds_in_bucket', 'size_cs', 'order_count_cs', 'spread_ratio', 'volume_imbalance' ,
                    "wap1", "wap2", 'rvol_0.1_1', 'rvol_0.1_0.8', 'rvol_0_1', 'rvol_0_0.8', 
                    'rvol_0.1_1_300', 'rvol_0.1_0.8_300', 'rvol_0_1_300', 'rvol_0_0.8_300', 'mean_rvol']


count = np.zeros(len(feature_cols))
mean = np.zeros(len(feature_cols))
M2 = np.zeros(len(feature_cols))

for _df in dfs:
    data = _df[feature_cols].to_numpy()  # Convert to numpy for easier calculations
    
    # Calculate batch mean and variance
    batch_mean = np.nanmean(data, axis=0)
    batch_var = np.nanvar(data, axis=0)
    n = np.sum(~np.isnan(data), axis=0)
    
    # Update the count, mean, and M2 for Welford's algorithm
    delta = batch_mean - mean
    mean += delta * n / (count + n)
    M2 += batch_var * n + delta**2 * count * n / (count + n)
    count += n

In [61]:
variance = M2 / (count - 1)
std = np.sqrt(variance)

In [62]:
np.save(DATA_DIR.joinpath('seq_features').joinpath("feature_means"), mean)
np.save(DATA_DIR.joinpath('seq_features').joinpath("feature_stds"), std)

In [63]:
int_cols = ['stock_id', 'time_id', 'seconds_in_bucket', 'size_cs', 'order_count_cs']
float_cols = ["spread_ratio", "volume_imbalance", "wap1", "wap2", "rvol_0.1_1", 
              "rvol_0.1_0.8", "rvol_0_1", "rvol_0_0.8", "rvol_0.1_1_300", "rvol_0.1_0.8_300", 
              "rvol_0_1_300", "rvol_0_0.8_300", "mean_rvol"]
for _df in tqdm(dfs):
    df_stock = _df.copy()
    stock_id = df_stock["stock_id"].iloc[0]
    df_stock.to_parquet(DATA_DIR.joinpath('seq_features').joinpath(f"stock_{stock_id}_seq.parquet"))

  0%|          | 0/112 [00:00<?, ?it/s]

In [76]:
import plotly.graph_objects as go
from engineer import wap_ewa

time_id = 2881
stock_id = 3

dfb = pd.read_parquet(load_source_data(stock_id, "book", "train"))
dft = pd.read_parquet(load_source_data(stock_id, "trade", "train"))

df_joined = pd.merge(dfb, dft, how="outer", on=["time_id", "seconds_in_bucket"])
dfj = pd.merge(
    pd.DataFrame({"seconds_in_bucket": [i for i in range(600)], "time_id": time_id}), 
    df_joined[df_joined['time_id'] == time_id], 
    how="left", 
    on=["seconds_in_bucket", "time_id"]
) 

dfj = wap_ewa(dfj, wap2_wt=0, ewa_alpha=1)
dfj = wap_ewa(dfj, wap2_wt=0, ewa_alpha=0.8)
dfj = wap_ewa(dfj, wap2_wt=0.1, ewa_alpha=1)
dfj = wap_ewa(dfj, wap2_wt=0.1, ewa_alpha=0.8)


fig = go.Figure()
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["wap1"], mode='lines', name='wap1'))
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["wap2"], mode='lines', name='wap2'))
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["wap_0_1"], mode='lines', name='wap_0_1'))
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["wap_0_0.8"], mode='lines', name='wap_0_0.8'))
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["wap_0.1_1"], mode='lines', name='wap_0.1_1'))
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["wap_0.1_0.8"], mode='lines', name='wap_0.1_0.8'))
fig.show()


In [77]:
dfj["rvol_0.1_1"] = np.sqrt(np.square(np.log(dfj["wap_0.1_1"]).diff()).cumsum())
dfj["rvol_0.1_0.8"] = np.sqrt(np.square(np.log(dfj["wap_0.1_0.8"]).diff()).cumsum())
dfj["rvol_0_1"] = np.sqrt(np.square(np.log(dfj["wap_0_1"]).diff()).cumsum())
dfj["rvol_0_0.8"] = np.sqrt(np.square(np.log(dfj["wap_0_0.8"]).diff()).cumsum())
dfj["rvol_0.1_1_300"] = np.sqrt(np.square(np.log(dfj["wap_0.1_1"]).diff()).rolling(window=300).sum())
dfj["rvol_0.1_0.8_300"] = np.sqrt(np.square(np.log(dfj["wap_0.1_0.8"]).diff()).rolling(window=300).sum())
dfj["rvol_0_1_300"] = np.sqrt(np.square(np.log(dfj["wap_0_1"]).diff()).rolling(window=300).sum())
dfj["rvol_0_0.8_300"] = np.sqrt(np.square(np.log(dfj["wap_0_0.8"]).diff()).rolling(window=300).sum())

rvol_cols = ["rvol_0.1_1", "rvol_0.1_0.8", "rvol_0_1", "rvol_0_0.8", "rvol_0.1_1_300", "rvol_0.1_0.8_300", "rvol_0_1_300", "rvol_0_0.8_300"]
dfj['mean_rvol'] = dfj[rvol_cols].mean(axis=1)


In [78]:
# Plot all rvols on secondary axis
from plotly.subplots import make_subplots
fig = make_subplots(specs=[[{"secondary_y": True}]])
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["rvol_0.1_1"], mode='lines', name='rvol_0.1_1'), secondary_y=True)
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["rvol_0.1_0.8"], mode='lines', name='rvol_0.1_0.8'), secondary_y=True)
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["rvol_0_1"], mode='lines', name='rvol_0_1'), secondary_y=True)
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["rvol_0_0.8"], mode='lines', name='rvol_0_0.8'), secondary_y=True)
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["rvol_0.1_1_300"], mode='lines', name='rvol_0.1_1_300'), secondary_y=True)
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["rvol_0.1_0.8_300"], mode='lines', name='rvol_0.1_0.8_300'), secondary_y=True)
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["rvol_0_1_300"], mode='lines', name='rvol_0_1_300'), secondary_y=True)
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["rvol_0_0.8_300"], mode='lines', name='rvol_0_0.8_300'), secondary_y=True)
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["mean_rvol"], mode='lines', name='mean_rvol'), secondary_y=True)

fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["wap1"], mode='lines', name='wap1'))
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["wap2"], mode='lines', name='wap2'))
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["wap_0_1"], mode='lines', name='wap_0_1'))
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["wap_0_0.8"], mode='lines', name='wap_0_0.8'))
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["wap_0.1_1"], mode='lines', name='wap_0.1_1'))
fig.add_trace(go.Scatter(x=dfj["seconds_in_bucket"], y=dfj["wap_0.1_0.8"], mode='lines', name='wap_0.1_0.8'))
fig.show()

In [79]:
dfj[rvol_cols + ['mean_rvol']].tail(5)

Unnamed: 0,rvol_0.1_1,rvol_0.1_0.8,rvol_0_1,rvol_0_0.8,rvol_0.1_1_300,rvol_0.1_0.8_300,rvol_0_1_300,rvol_0_0.8_300,mean_rvol
595,0.004432,0.00401,0.004689,0.004253,0.003219,0.002818,0.00335,0.002936,0.003713
596,0.004432,0.00401,0.004689,0.004253,0.003219,0.002818,0.00335,0.002936,0.003713
597,0.004432,0.00401,0.004689,0.004253,0.003219,0.002818,0.00335,0.002936,0.003713
598,0.004432,0.00401,0.004689,0.004253,0.003219,0.002818,0.00335,0.002936,0.003713
599,0.004432,0.00401,0.004689,0.004253,0.003219,0.002818,0.00335,0.002936,0.003713


In [80]:
df_train_targets[(df_train_targets['stock_id'] == stock_id) & (df_train_targets['time_id'] == time_id)]

Unnamed: 0,stock_id,time_id,target
11843,3,2881,0.004625


In [81]:
df_stock_ex = pd.read_parquet(DATA_DIR.joinpath('seq_features').joinpath(f"stock_{stock_id}_seq.parquet"))
df_stock_ex[df_stock_ex['time_id'] == time_id][rvol_cols + ['mean_rvol']].tail(5)

Unnamed: 0,rvol_0.1_1,rvol_0.1_0.8,rvol_0_1,rvol_0_0.8,rvol_0.1_1_300,rvol_0.1_0.8_300,rvol_0_1_300,rvol_0_0.8_300,mean_rvol
212395,0.004432,0.003555,0.004689,0.003758,0.003219,0.002632,0.00335,0.002734,0.003546
212396,0.004432,0.003555,0.004689,0.003758,0.003219,0.002632,0.00335,0.002734,0.003546
212397,0.004432,0.003555,0.004689,0.003758,0.003219,0.002632,0.00335,0.002734,0.003546
212398,0.004432,0.003555,0.004689,0.003758,0.003219,0.002632,0.00335,0.002734,0.003546
212399,0.004432,0.003555,0.004689,0.003758,0.003219,0.002632,0.00335,0.002734,0.003546
