In [1]:
import os
import pandas as pd
import matplotlib.pyplot as plt
from pprint import pprint
import numpy as np
from sklearn.metrics import r2_score


from numpy.random import default_rng
RANDOM_STATE = 2 # random state for default_rng
rng = default_rng(RANDOM_STATE)


import random
#import altair as alt
from tqdm import tqdm
import datetime

FAST_PASS = False # True for quick debug passes, not for ML

# CHECKLIST for Kaggle variant
# Use FAST_PASS True on first pass
# USE_ALL_STOCK_IDS False to check then True
# USE_TEST_LOCAL_6_ITEMS must be False else we override the local test data
# TRAIN_TEST False
# Check on Kaggle that "internet" is disabled
# First run with "USE_ALL_STOCK_IDS=False", flip to True, Save Version, it'll take 30 mins to run

# CHECKLIST for home variant
# USE_ALL_STOCK_IDS False for fast dev, True for proper testing
# USE_TEST_LOCAL_6_ITEMS False for fast dev, True for proper testing
# NBR_FOR_SUBSET_OF_STOCK_IDS 4 for quick testing

t1_notebook_start = datetime.datetime.utcnow()

if os.environ.get('USER') == 'ian':
    ENV_HOME = True
    import ipython_memory_usage
    %ipython_memory_usage_start
    USE_ALL_STOCK_IDS = True
    NBR_FOR_SUBSET_OF_STOCK_IDS = 4
    TRAIN_TEST = True
    USE_TEST_LOCAL_6_ITEMS = False # robust local testing at home
    MEMORY_LOCATION = 'joblib_cache'
else:
    ENV_HOME = False
    USE_ALL_STOCK_IDS = False # for KAGGLE on first-upload for a quick test
    TRAIN_TEST = False
    USE_TEST_LOCAL_6_ITEMS = False
    NBR_FOR_SUBSET_OF_STOCK_IDS = 4
    MEMORY_LOCATION = '/kaggle/working/joblib_cache'
    # kaggle notes:
    # You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
    # You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

if FAST_PASS:
    USE_ALL_STOCK_IDS = False
    TRAIN_TEST = False
    USE_TEST_LOCAL_6_ITEMS = False # robust local testing at home TEMPORARY WHILST DEBUGGING

from joblib import Memory
memory = Memory(location=MEMORY_LOCATION, verbose=0)

if USE_ALL_STOCK_IDS:
    NBR_FOR_SUBSET_OF_STOCK_IDS=None
print(f'ENV_HOME: {ENV_HOME}, TRAIN_TEST {TRAIN_TEST}, USE_ALL_STOCK_IDS {USE_ALL_STOCK_IDS}, USE_TEST_LOCAL_6_ITEMS {USE_TEST_LOCAL_6_ITEMS}')
print(f'NBR_FOR_SUBSET_OF_STOCK_IDS: {NBR_FOR_SUBSET_OF_STOCK_IDS}')

ENV_HOME: True, TRAIN_TEST True, USE_ALL_STOCK_IDS True, USE_TEST_LOCAL_6_ITEMS False
NBR_FOR_SUBSET_OF_STOCK_IDS: None
In [1] used 0.0000 MiB RAM in 0.10s, peaked 0.00 MiB above current, total RAM usage 159.38 MiB


In [2]:
# OR PASTE IN UTILITY CODE HERE FOR KAGGLE
from utility import make_unique_time_ids, get_training_stock_ids, rmspe_score
from utility import ROOT, TEST_CSV, TRAIN_CSV

Utility says ROOT is /home/ian/data/kaggle/optiver_volatility/
In [2] used 0.0000 MiB RAM in 0.11s, peaked 0.00 MiB above current, total RAM usage 159.38 MiB


## Load train set

In [3]:
stock_ids = get_training_stock_ids('book_train.parquet') # all stocks by default

if not USE_ALL_STOCK_IDS:
    # choose a random subset
    print(f"Using a subset of {NBR_FOR_SUBSET_OF_STOCK_IDS}")
    rng.shuffle(stock_ids)
    #random.shuffle(stock_ids)
    stock_ids = stock_ids[:NBR_FOR_SUBSET_OF_STOCK_IDS]
else:
    print("Using all")
stock_ids[:3] # expect 59, 58, 23 if we're using all or 76, 73, 0 on the RANDOM_STATE of 1 if we don't use all stock ids

Using all


[59, 58, 23]

In [3] used 0.2305 MiB RAM in 0.11s, peaked 0.00 MiB above current, total RAM usage 159.61 MiB


In [4]:
df_train_all = pd.read_csv(TRAIN_CSV)
df_train_all = df_train_all.set_index(['stock_id', 'time_id'])
print(df_train_all.shape)
#rows_for_stock_id_0 = df_train_all.query('stock_id == 0').shape[0]
#rows_for_stock_id_0

(428932, 1)
In [4] used 20.6211 MiB RAM in 0.29s, peaked 20.39 MiB above current, total RAM usage 180.23 MiB


In [5]:
def show_details(df):
    try:
        nbr_index_levels = len(df.index.levels)
    except AttributeError:
        nbr_index_levels = 1
    nbr_nulls = df.isnull().sum().sum()
    #nulls_msg = "Has no nulls"
    #if nbr_nulls==0:
    nulls_msg = f"{nbr_nulls} nulls"
    is_view_msg = f'is_view {df_train_all._data.is_view}'
    is_single_block_msg = f'is_single_block {df_train_all._data.is_single_block}'
    is_consolidated_msg = f'is_consolidated {df_train_all._data.is_consolidated()}'    
    print(f'[{nbr_index_levels}c] {df.shape[0]:,}x{df.shape[1]:,}, {nulls_msg}, {is_view_msg}, {is_single_block_msg}, {is_consolidated_msg}')

show_details(df_train_all)

[2c] 428,932x1, 0 nulls, is_view True, is_single_block True, is_consolidated True
In [5] used 0.0000 MiB RAM in 0.11s, peaked 0.00 MiB above current, total RAM usage 180.23 MiB


In [6]:
if False:
    all_time_ids = df_train_all.reset_index().time_id.unique()
    rng.shuffle(all_time_ids)

In [6] used 0.2891 MiB RAM in 0.10s, peaked 0.00 MiB above current, total RAM usage 180.52 MiB


In [7]:
def load_book(ROOT, filename, stock_id):
    df_book_train_stock_X = pd.read_parquet(os.path.join(ROOT, f"{filename}/stock_id={stock_id}"))
    df_book_train_stock_X["stock_id"] = stock_id
    df_book_train_stock_X = df_book_train_stock_X.set_index(['stock_id', 'time_id'])
    return df_book_train_stock_X

df_book_train_stock_X = load_book(ROOT, 'book_train.parquet', 0)

In [7] used 178.5117 MiB RAM in 0.30s, peaked 0.00 MiB above current, total RAM usage 359.04 MiB


In [8]:
# make feature columns
def make_features_stats(df_book, agg_type, cols):
    features_var1 = df_book.groupby(['stock_id', 'time_id'])[cols].agg(agg_type)
    #print(type(features_var1))
    if isinstance(features_var1, pd.Series):
        # .size yields a series not a df
        #features_var1.name = str(agg_type)
        features_var1 = pd.DataFrame(features_var1, columns=[agg_type])
        #pass
    else:
        features_var1_col_names = [f"{col}_{agg_type}" for col in cols]
        features_var1.columns = features_var1_col_names
    return features_var1

if True: # lightweight tests
    df_book_train_stock_X = load_book(ROOT, 'book_train.parquet', 0)
    display(make_features_stats(df_book_train_stock_X, 'nunique', ['ask_size1']).head())

Unnamed: 0_level_0,Unnamed: 1_level_0,ask_size1_nunique
stock_id,time_id,Unnamed: 2_level_1
0,5,67
0,11,26
0,16,22
0,31,30
0,62,54


In [8] used 97.9102 MiB RAM in 0.47s, peaked 40.14 MiB above current, total RAM usage 456.95 MiB


In [9]:


#def realized_volatility(series_log_return):
#    return np.sqrt(np.sum(series_log_return**2))

In [9] used -15.4062 MiB RAM in 0.10s, peaked 0.00 MiB above current, total RAM usage 441.54 MiB


In [10]:
def log_return(list_stock_prices):
    return np.log(list_stock_prices).diff()

def _realized_volatility_weighted_sub(ser, weights):
    ser_weighted = ser * weights
    return np.sqrt(np.sum(ser_weighted**2))

def realized_volatility_weighted(ser, weights_type):
    """Weighted volatility"""
    # as a numpy array
    # we drop from 12us to 3us by adding @njit to the _sub function
    # we can't make _sub a closure, it loses all compilation benefits
    # and we can't add njit(cache=True) in Jupyter as it can't
    # find a cache location    
    # as a Series we have 5us and 15us w/wo @njit respectively
    if isinstance(ser, pd.Series):
        ser = ser.to_numpy()
    nbr_items = ser.shape[0]
    if weights_type == 'uniform':
        weights = np.ones(nbr_items)
    elif weights_type == 'linear':
        weights = np.linspace(0.1, 1, nbr_items) # linear increasing weight
    elif weights_type == 'half0half1':
        half_way = int(ser.shape[0] / 2)
        weights = np.concatenate((np.zeros(half_way), np.ones(ser.shape[0] - half_way))) # 0s then 1s weight
    elif weights_type == 'geom':
        weights = np.geomspace(0.01, 1, nbr_items) # geometric increase
    #assert isinstance(weights_type, str) == False, f"Must not be a string like '{weights}' at this point"
    return _realized_volatility_weighted_sub(ser, weights)

if True:
    series_log_return = pd.Series(np.linspace(0, 1, 600))
    print(realized_volatility_weighted(series_log_return, weights_type="uniform"))

    #%timeit realized_volatility_weighted(series_log_return, weights_type="uniform")

14.148036786359453
In [10] used -3.4805 MiB RAM in 0.11s, peaked 3.48 MiB above current, total RAM usage 438.06 MiB


In [12]:
def make_wap(df_book_data, num=1, wap_colname="wap"):
    """Modifies df_book_data"""
    assert num==1 or num==2
    wap_numerator = (df_book_data[f'bid_price{num}'] * df_book_data[f'ask_size{num}'] +
                                     df_book_data[f'ask_price{num}'] * df_book_data[f'bid_size{num}'])
    wap_denominator = df_book_data[f'bid_size{num}'] + df_book_data[f'ask_size{num}']
    df_book_data[wap_colname] = wap_numerator / wap_denominator


def make_realized_volatility(df_book_data, col, weights="uniform"):
    """Consume wap column"""
    new_name = "realized_vol_log_return_" + col + "_" + weights
    df_book_data[new_name] = df_book_data.groupby(['stock_id', 'time_id'])[col].apply(log_return)
    df_book_data = df_book_data[~df_book_data[new_name].isnull()]
    df_realized_vol_per_stock =  pd.DataFrame(df_book_data.groupby(['stock_id', 'time_id'])[new_name].agg(realized_volatility_weighted, weights))
    return df_realized_vol_per_stock

if True: # lightweight tests
    df_book_train_stock_X = load_book(ROOT, 'book_train.parquet', 0)
    make_wap(df_book_train_stock_X, 2, wap_colname="wap2") # adds 'wap' column
    df_realized_vol_per_stockX = make_realized_volatility(df_book_train_stock_X, col="wap2", weights='linear')
    display(df_realized_vol_per_stockX.head(2))

Unnamed: 0_level_0,Unnamed: 1_level_0,realized_vol_log_return_wap2_linear
stock_id,time_id,Unnamed: 2_level_1
0,5,0.0045
0,11,0.001749


In [12] used 30.3789 MiB RAM in 2.53s, peaked 93.36 MiB above current, total RAM usage 361.21 MiB


In [13]:
if False:
    def make_volatility_ask_bid_diff(df_book_data, col='ask1_bid1_diff', weights='uniform'):
                                     #new_name='ask1_bid1_diff_log_ret'):
        #new_name = col + '_log_return' # 'ask1_bid1_diff'
        new_name = col + "_log_return_" + weights
        df_book_data[new_name] = df_book_data.groupby(['stock_id', 'time_id'])[col].apply(log_return)
        df_book_data = df_book_data[~df_book_data[new_name].isnull()]
        # makes a new dataframe
        df_realized_vol_ask_bid_diff_per_stock =  pd.DataFrame(df_book_data.groupby(['stock_id', 'time_id'])[new_name].agg(realized_volatility_weighted, weights))
        return df_realized_vol_ask_bid_diff_per_stock


    if True: # lightweight tests
        df_book_train_stock_XX = pd.read_parquet(os.path.join(ROOT, f"book_train.parquet/stock_id=0"))
        df_book_train_stock_XX["stock_id"] = 0
        df_book_train_stock_XX = df_book_train_stock_XX.set_index(['stock_id', 'time_id'])
        df_book_train_stock_XX['ask1_bid1_diff'] = (df_book_train_stock_XX['ask_price1'] / df_book_train_stock_XX['bid_price1']) - 1
        df_realized_vol_ask_bid_diff_per_stock = make_volatility_ask_bid_diff(df_book_train_stock_XX, weights='linear')
        #make_wap(df_book_train_stock_XX, 2) # adds 'wap' column
        #df_realized_vol_per_stockXX = make_realized_volatility(df_book_train_stock_XX, log_return_name="log_return2", weights='linear')
        display(df_realized_vol_ask_bid_diff_per_stock.head(5))

In [13] used 0.0078 MiB RAM in 0.11s, peaked 0.00 MiB above current, total RAM usage 361.22 MiB


In [14]:
if True: # lightweight tests
    df_book_train_stock_X = load_book(ROOT, 'book_train.parquet', 0)
    df_book_train_stock_X['ask1_bid1_diff'] = (df_book_train_stock_X['ask_price1'] / df_book_train_stock_X['bid_price1']) - 1
    df_realized_vol_ask_bid_diff_per_stock = make_realized_volatility(df_book_train_stock_X, col='ask1_bid1_diff', weights='linear')
    #make_wap(df_book_train_stock_XX, 2) # adds 'wap' column
    #df_realized_vol_per_stockXX = make_realized_volatility(df_book_train_stock_XX, log_return_name="log_return2", weights='linear')
    display(df_realized_vol_ask_bid_diff_per_stock.head(5))

Unnamed: 0_level_0,Unnamed: 1_level_0,realized_vol_log_return_ask1_bid1_diff_linear
stock_id,time_id,Unnamed: 2_level_1
0,5,2.032456
0,11,1.646661
0,16,1.050446
0,31,1.069932
0,62,2.013495


In [14] used -7.5273 MiB RAM in 2.62s, peaked 81.75 MiB above current, total RAM usage 353.70 MiB


In [15]:
#@memory.cache
def load_data_build_features(stock_id, ROOT, book_filename, trade_filename, cols, df_target):
    # filename e.g. book_train.parquet
    assert isinstance(stock_id, int)
    #df_book_stock_X = pd.read_parquet(
    #    os.path.join(ROOT, f"{book_filename}/stock_id={stock_id}")
    #)
    #df_book_stock_X["stock_id"] = stock_id
    #df_book_stock_X = df_book_stock_X.set_index(['stock_id', 'time_id'])
    df_book_stock_X = load_book(ROOT, book_filename, stock_id)
    #assert df_book_train_stock_X.shape[0] > rows_for_stock_id_0, (df_book_train_stock_X.shape[0], rows_for_stock_id_0)
    
    df_trade_stock_X = pd.read_parquet(
        os.path.join(ROOT, f"{trade_filename}/stock_id={stock_id}")
    )
    df_trade_stock_X["stock_id"] = stock_id
    df_trade_stock_X = df_trade_stock_X.set_index(['stock_id', 'time_id'])
    
    #df_book_train_stock_X_gt500 = df_book_train_stock_X.query("seconds_in_bucket>500").copy()
    #df_realized_vol_per_stock_short500 = add_wap_make_realized_volatility(df_book_train_stock_X_gt500, log_return_name='log_return_gt500sec')
    #df_book_train_stock_X_gt300 = df_book_train_stock_X.query("seconds_in_bucket>300").copy()
    #df_realized_vol_per_stock_short300 = add_wap_make_realized_volatility(df_book_train_stock_X_gt300, log_return_name='log_return_gt300sec')
    if True:
        make_wap(df_book_stock_X, 2, "wap2") 
        df_realized_vol_per_stock_wap2_uniform = make_realized_volatility(df_book_stock_X, col="wap2", weights='uniform')    
        df_realized_vol_per_stock_wap2_linear = make_realized_volatility(df_book_stock_X, col="wap2", weights='linear')
        df_realized_vol_per_stock_wap2_half0half1 = make_realized_volatility(df_book_stock_X, col="wap2", weights='half0half1')
        make_wap(df_book_stock_X, 1, "wap") # adds 'wap' column
        df_realized_vol_per_stock_wap1_uniform = make_realized_volatility(df_book_stock_X, col="wap", weights='uniform')
        df_realized_vol_per_stock_wap1_linear = make_realized_volatility(df_book_stock_X, col="wap", weights='linear')
        df_realized_vol_per_stock_wap1_half0half1 = make_realized_volatility(df_book_stock_X, col="wap", weights='half0half1')

        features_var1 = make_features_stats(df_book_stock_X, 'var', cols)
        features_mean1 = make_features_stats(df_book_stock_X, 'mean', cols)
        features_size1 = make_features_stats(df_book_stock_X, 'size', cols)
        features_min1 = make_features_stats(df_book_stock_X, 'min', cols)
        features_max1 = make_features_stats(df_book_stock_X, 'max', cols)
        features_nunique1 = make_features_stats(df_book_stock_X, 'nunique', cols)

        if False:
            df_book_stock_X['ask1_bid1_diff'] = (df_book_stock_X['ask_price1'] / df_book_stock_X['bid_price1']) - 1
            df_realized_vol_ask1_bid1_diff_per_stock = make_volatility_ask_bid_diff(df_book_stock_X, col='ask1_bid1_diff')
            df_realized_vol_ask1_bid1_diff_per_stock_linear = make_volatility_ask_bid_diff(df_book_stock_X, col='ask1_bid1_diff', weights='linear')
            df_realized_vol_ask1_bid1_diff_per_stock_half0half1 = make_volatility_ask_bid_diff(df_book_stock_X, col='ask1_bid1_diff', weights='half0half1')
            df_book_stock_X['ask2_bid2_diff'] = (df_book_stock_X['ask_price2'] / df_book_stock_X['bid_price2']) - 1
            df_realized_vol_ask2_bid2_diff_per_stock = make_volatility_ask_bid_diff(df_book_stock_X, col='ask2_bid2_diff')
            df_realized_vol_ask2_bid2_diff_per_stock_linear = make_volatility_ask_bid_diff(df_book_stock_X, col='ask2_bid2_diff', weights='linear')
            df_realized_vol_ask2_bid2_diff_per_stock_half0half1 = make_volatility_ask_bid_diff(df_book_stock_X, col='ask2_bid2_diff', weights='half0half1')
            df_book_stock_X['ask1_bid2_diff'] = (df_book_stock_X['ask_price1'] / df_book_stock_X['bid_price2']) - 1
            df_realized_vol_ask1_bid2_diff_per_stock = make_volatility_ask_bid_diff(df_book_stock_X, col='ask1_bid2_diff')
            df_book_stock_X['ask2_bid1_diff'] = (df_book_stock_X['ask_price2'] / df_book_stock_X['bid_price1']) - 1
            df_realized_vol_ask2_bid1_diff_per_stock = make_volatility_ask_bid_diff(df_book_stock_X, col='ask2_bid1_diff')
        df_book_stock_X['ask1_bid1_diff'] = (df_book_stock_X['ask_price1'] / df_book_stock_X['bid_price1']) - 1
        df_realized_vol_ask1_bid1_diff_per_stock = make_realized_volatility(df_book_stock_X, col='ask1_bid1_diff')
        df_realized_vol_ask1_bid1_diff_per_stock_linear = make_realized_volatility(df_book_stock_X, col='ask1_bid1_diff', weights='linear')
        df_realized_vol_ask1_bid1_diff_per_stock_half0half1 = make_realized_volatility(df_book_stock_X, col='ask1_bid1_diff', weights='half0half1')
        df_book_stock_X['ask2_bid2_diff'] = (df_book_stock_X['ask_price2'] / df_book_stock_X['bid_price2']) - 1
        df_realized_vol_ask2_bid2_diff_per_stock = make_realized_volatility(df_book_stock_X, col='ask2_bid2_diff')
        df_realized_vol_ask2_bid2_diff_per_stock_linear = make_realized_volatility(df_book_stock_X, col='ask2_bid2_diff', weights='linear')
        df_realized_vol_ask2_bid2_diff_per_stock_half0half1 = make_realized_volatility(df_book_stock_X, col='ask2_bid2_diff', weights='half0half1')
        df_book_stock_X['ask1_bid2_diff'] = (df_book_stock_X['ask_price1'] / df_book_stock_X['bid_price2']) - 1
        df_realized_vol_ask1_bid2_diff_per_stock = make_realized_volatility(df_book_stock_X, col='ask1_bid2_diff')
        df_book_stock_X['ask2_bid1_diff'] = (df_book_stock_X['ask_price2'] / df_book_stock_X['bid_price1']) - 1
        df_realized_vol_ask2_bid1_diff_per_stock = make_realized_volatility(df_book_stock_X, col='ask2_bid1_diff')
    else:
        features_var1 = make_features_stats(df_book_stock_X, 'var', cols)
        
    #breakpoint()
    # trade stats
    df_trade_basic_stats = df_trade_stock_X.groupby(['stock_id', 'time_id']).agg(trade_size_count=pd.NamedAgg('size', 'count'), trade_order_count_sum=pd.NamedAgg('order_count', 'sum'))

    df_train_stock_X = df_target.query('stock_id == @stock_id')
    if True:
        to_merge_book = [df_train_stock_X, 
                    features_var1, features_mean1, features_size1, 
                    features_min1, features_max1, features_nunique1,
                    df_realized_vol_ask1_bid1_diff_per_stock_linear,
                    df_realized_vol_ask1_bid1_diff_per_stock_half0half1,
                     df_realized_vol_ask2_bid2_diff_per_stock_linear,
                     df_realized_vol_ask2_bid2_diff_per_stock_half0half1,
                    df_realized_vol_per_stock_wap1_uniform,
                    df_realized_vol_per_stock_wap2_uniform,
                    df_realized_vol_per_stock_wap1_linear,
                    df_realized_vol_per_stock_wap2_linear,
                    df_realized_vol_per_stock_wap1_half0half1,
                    df_realized_vol_per_stock_wap2_half0half1,
                    df_realized_vol_ask1_bid1_diff_per_stock,
                    df_realized_vol_ask2_bid2_diff_per_stock,
                    df_realized_vol_ask1_bid2_diff_per_stock,
                    df_realized_vol_ask2_bid1_diff_per_stock,]
        to_merge_trade = [df_trade_basic_stats]
    else:
        to_merge_book = [df_train_stock_X, features_var1]
        to_merge_trade = [df_trade_basic_stats]

    # some trade datasets are missing some time_ids, making the join a mess
    # we reindex and make the choice to fillna 0
    to_merge_trade = [to_merge_tr.reindex(to_merge_book[0].index, fill_value=0) for to_merge_tr in to_merge_trade]
    #to_merge_trade = [to_merge_tr.reindex(to_merge_book[0].index, fill_value=0).fillna(0) for to_merge_tr in to_merge_trade]
    
    row_lengths = [df.shape[0] for df in to_merge_book]
    assert len(set(row_lengths)) == 1, f"row_lengths are different for stock {stock_id}: {row_lengths}" # should all be same length
    to_merge = to_merge_book + to_merge_trade
    for idx, item_to_merge in enumerate(to_merge):
        assert item_to_merge.index.names == ['stock_id', 'time_id'], f"We must have the same index on idx {idx}"
    train_merged = pd.concat(to_merge, axis=1)
                             
    if 'target' in train_merged.columns:
        features = train_merged.drop(columns='target').columns
        #print(features)
        assert len(set(features)) == len(features), f"Feature duplication! {len(set(features))} vs {len(features)}"

    return train_merged

#if 'memory' in dir():
#    # only setup local cache if we're running locally in development
#    load_data_build_features = memory.cache(load_data_build_features)
    
cols = ['bid_price1', 'ask_price1', 'bid_price2', 'ask_price2',] 
cols += ['bid_size1', 'ask_size1', 'bid_size2', 'ask_size2']

if True:    
    # test...
    train_mergedXX = load_data_build_features(31, ROOT, 'book_train.parquet', 'trade_train.parquet', cols, df_train_all)
    display(train_mergedXX.head(5))

Unnamed: 0_level_0,Unnamed: 1_level_0,target,bid_price1_var,ask_price1_var,bid_price2_var,ask_price2_var,bid_size1_var,ask_size1_var,bid_size2_var,ask_size2_var,bid_price1_mean,...,realized_vol_log_return_wap_linear,realized_vol_log_return_wap2_linear,realized_vol_log_return_wap_half0half1,realized_vol_log_return_wap2_half0half1,realized_vol_log_return_ask1_bid1_diff_uniform,realized_vol_log_return_ask2_bid2_diff_uniform,realized_vol_log_return_ask1_bid2_diff_uniform,realized_vol_log_return_ask2_bid1_diff_uniform,trade_size_count,trade_order_count_sum
stock_id,time_id,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1
31,5,0.004113,2.692188e-06,2.68108e-06,2.692195e-06,2.681076e-06,22385560.0,36232300.0,18060150.0,12580870.0,0.997129,...,0.002836,0.003074,0.003343,0.003636,0.98144,0.408084,0.574537,0.574467,50,582
31,11,0.000956,2.032095e-08,2.031816e-08,2.032095e-08,2.031816e-08,45707350.0,183155700.0,17664190.0,162804700.0,1.000411,...,0.000324,0.001223,0.000292,0.000252,0.001358,0.001292,0.00126,0.001261,10,43
31,16,0.002127,1.15802e-06,1.158066e-06,1.157965e-06,1.157958e-06,180093500.0,203391000.0,643763400.0,273023100.0,0.999483,...,0.001487,0.002725,0.001602,0.002811,0.002366,0.002362,0.002336,0.002442,12,69
31,31,0.003748,4.79624e-07,4.79624e-07,4.79624e-07,4.79624e-07,84739590.0,40964410.0,37103870.0,124390100.0,0.994919,...,0.001739,0.00424,0.002036,0.005161,0.003457,0.003353,0.003121,0.003269,30,149
31,62,0.001573,2.165609e-07,2.166966e-07,2.165884e-07,2.167498e-07,169019100.0,131616000.0,64406200.0,78439080.0,0.999969,...,0.001121,0.001908,0.000994,0.001528,0.980168,0.406911,0.573358,0.57351,21,179


In [15] used 40.9141 MiB RAM in 39.20s, peaked 527.36 MiB above current, total RAM usage 394.61 MiB


In [None]:
from joblib import Parallel, delayed
print(f'Iterating over {len(stock_ids)} stocks:')

all_train_merged = Parallel(n_jobs=-1, verbose=10)(delayed(load_data_build_features)(stock_id, ROOT, 'book_train.parquet', 'trade_train.parquet', cols, df_train_all) for stock_id in stock_ids)

Iterating over 112 stocks:


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done   2 tasks      | elapsed:  1.1min
[Parallel(n_jobs=-1)]: Done   9 tasks      | elapsed:  2.1min
[Parallel(n_jobs=-1)]: Done  16 tasks      | elapsed:  2.4min
[Parallel(n_jobs=-1)]: Done  25 tasks      | elapsed:  4.3min


In [None]:
# join all the partial results back together
train_merged = pd.concat(all_train_merged)
show_details(train_merged)

In [None]:
train_merged.head()

In [None]:
features = train_merged.drop(columns='target').columns
print(features)
assert len(set(features)) == len(features), f"{len(set(features))} vs {len(features)} features, we should not have any duplicates"

# Features

In [None]:
feature_cols = list(features) + ['stock_id']
print(feature_cols)
if not TRAIN_TEST:
    # probably we're building on Kaggle
    # we need all data for train, there is no test set
    df_train_merged = train_merged.reset_index()[feature_cols+['time_id', 'target']]
    X_train = df_train_merged.drop(columns=['target', 'time_id'])
    y_train = df_train_merged['target']
    print(X_train.shape, y_train.shape)
    #X_test.shape, y_train.shape, y_test.shape

# ML on a train/test split

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor
import xgboost as xgb
from lightgbm import LGBMRegressor
from sklearn.experimental import enable_hist_gradient_boosting 
from sklearn.ensemble import HistGradientBoostingRegressor

In [None]:
#est = LinearRegression()
#est = RandomForestRegressor(n_estimators=10, n_jobs=-1, random_state=RANDOM_STATE) # default n_estimators==100
#est = RandomForestRegressor(n_estimators=100, n_jobs=-1, random_state=RANDOM_STATE) # default n_estimators==100
#est = GradientBoostingRegressor(random_state=RANDOM_STATE)
#est = HistGradientBoostingRegressor(random_state=RANDOM_STATE)

# https://xgboost.readthedocs.io/en/latest/python/python_api.html
#tree_method='exact' default
#est = xgb.XGBRegressor(objective ='reg:linear', colsample_bytree = 0.3, learning_rate = 0.1, max_depth = 5, alpha = 10, n_estimators = 10)
#est = xgb.XGBRegressor(tree_method='hist', )
est = xgb.XGBRegressor( )

#est = LGBMRegressor()

if not TRAIN_TEST:
    print('Fitting estimator on all the data')
    est.fit(X_train, y_train)

In [None]:
%%time

print(f"USE_ALL_STOCK_IDS: {USE_ALL_STOCK_IDS}")

print(f"{df_train_all.reset_index().stock_id.unique().shape[0]} unique stock ids")
print(f"Features:", feature_cols)
print(est)

scores = []
if TRAIN_TEST:
    # https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GroupKFold.html
    # note the splits appear to be deterministic, possibly on discovery order
    from sklearn.model_selection import GroupKFold
    train_merged_no_idx = train_merged.reset_index()
    groups = train_merged_no_idx['time_id']
    group_kfold = GroupKFold(n_splits=3)
    X_all = train_merged_no_idx[feature_cols]
    y_all = train_merged_no_idx['target']
    print(group_kfold.get_n_splits(X_all, y_all, groups))
    for train_index, test_index in group_kfold.split(X_all, y_all, groups):
        print("TRAIN:", train_index, "TEST:", test_index)
        X_train, X_test = X_all.loc[train_index], X_all.loc[test_index]
        y_train, y_test = y_all.loc[train_index], y_all.loc[test_index]
        est.fit(X_train, y_train)
        y_pred = est.predict(X_test)
        score = r2_score(y_test, y_pred)
        rmspe = rmspe_score(y_test, y_pred)
        print(f"rmspe score {rmspe:0.3f}, r^2 score {score:0.3f} on {y_pred.shape[0]:,} predictions")
        scores.append({'r2': score, 'rmspe': rmspe})

In [None]:
if len(scores) > 0:
    # only show results if we've used cross validation
    df_scores = pd.DataFrame(scores).T
    folds = df_scores.columns.values
    df_scores['std'] = df_scores[folds].std(axis=1)
    df_scores['mean'] = df_scores[folds].mean(axis=1)
    display(df_scores)

In [None]:
if TRAIN_TEST:
    df_preds = pd.DataFrame({'y_test': y_test, 'y_pred': y_pred})
    df_preds['abs_diff'] = (df_preds['y_test'] - df_preds['y_pred']).abs()
    display(df_preds.sort_values('abs_diff', ascending=False))

In [None]:
#item_to_debug = 32451
#train_merged.reset_index().loc[item_to_debug][['stock_id', 'time_id', 'target']]

In [None]:
try:
    #if X_test.shape[0] > 0:
    if TRAIN_TEST:
        from yellowbrick.regressor import PredictionError
        visualizer = PredictionError(est)
        visualizer.fit(X_train, y_train)  # Fit the training data to the visualizer
        visualizer.score(X_test, y_test)  # Evaluate the model on the test data
        ax_subplot = visualizer.show()        
except ModuleNotFoundError:
    print('no yellowbrick')

In [None]:
if ENV_HOME:
    import eli5
    display(eli5.show_weights(est, feature_names=feature_cols, top=30))

In [None]:
if 'feature_importances_' in dir(est):
    feature_col = 'feature_importances_'
elif 'coef_' in dir(est):
    feature_col = 'coef_'
df_features = pd.DataFrame(zip(getattr(est, feature_col), feature_cols), columns=['importance', 'feature']).set_index('importance')
df_features.sort_index(ascending=False)

# Make predictions

In [None]:
len(stock_ids) # expecting 112

In [None]:
if USE_TEST_LOCAL_6_ITEMS: # True if debugging
    # book train as a substitute
    df_test_all = pd.read_csv(os.path.join(ROOT, 'test_local.csv'))
    df_test_all = df_test_all.rename(columns={'target': 'train_target'})
    TEST_FOLDER = 'book_test_local.parquet'
    assert ENV_HOME == True
else:
    df_test_all = pd.read_csv(TEST_CSV)
    if df_test_all.shape[0] == 3: # kaggle test data
        df_test_all = df_test_all[:1] # cut out 2 rows so predictions work    
    TEST_FOLDER = 'book_test.parquet'
print(ROOT, TEST_FOLDER)
df_test_all = df_test_all.set_index(['stock_id', 'time_id'])

show_details(df_test_all)

In [None]:
test_set_predictions = []
stock_ids_test = get_training_stock_ids(TEST_FOLDER) # all stocks by default

df_test_predictions = pd.DataFrame() # prediction set to build up
for stock_id in tqdm(stock_ids_test):
    df_test_all_X = df_test_all.query('stock_id==@stock_id').copy()
    test_merged = load_data_build_features(stock_id, ROOT, TEST_FOLDER, 'trade_test.parquet', cols, df_test_all)
    test_set_predictions_X = est.predict(test_merged.reset_index()[list(features) + ['stock_id']])
    df_test_all_X['target'] = test_set_predictions_X
    df_test_predictions = pd.concat((df_test_predictions, df_test_all_X))
    
assert df_test_all.shape[0] == df_test_predictions.shape[0], "Expecting all rows to be predicted"

In [None]:
print(f"Writing {df_test_predictions.shape[0]} rows to submission.csv on {datetime.datetime.utcnow()}")
df_test_predictions.reset_index()[['row_id', 'target']].to_csv('submission.csv', index=False)
show_details(df_test_predictions)
print(f'Notebook took {datetime.datetime.utcnow()-t1_notebook_start} to run')

In [None]:
if not ENV_HOME:
    assert USE_ALL_STOCK_IDS, "If we're on Kaggle but not using all stock_ids, we're not ready to submit, so fail here to remind me to change USSE_ALL_STOCK_IDS!"