# Optiver Realized Volatility Prediction - Train

**This notebook seeks to EDITS HERE**
---------

## Files
**book_[train/test].parquet** - A [parquet](https://arrow.apache.org/docs/python/parquet.html) file partitioned by `stock_id`. Provides order book data on the most competitive buy and sell orders entered into the market. The top two levels of the book are shared. The first level of the book will be more competitive in price terms, it will then receive execution priority over the second level.

 - `stock_id` - ID code for the stock. Not all `stock_id`s exist in every time bucket. Parquet coerces this column to the categorical data type when loaded; you may wish to convert it to int8.
 - `time_id` - ID code for the time bucket. `time_id`s are not necessarily sequential but are consistent across all stocks.
 - `seconds_in_bucket` - Number of seconds from the start of the bucket, always starting from 0.
 - `bid_price[1/2]` - Normalized prices of the most/second most competitive buy level.
 - `ask_price[1/2]` - Normalized prices of the most/second most competitive sell level.
 - `bid_size[1/2]` - The number of shares on the most/second most competitive buy level.
 - `ask_size[1/2]` - The number of shares on the most/second most competitive sell level.
 
**trade_[train/test].parquet** - A [parquet](https://arrow.apache.org/docs/python/parquet.html) file partitioned by `stock_id`. Contains data on trades that actually executed. Usually, in the market, there are more passive buy/sell intention updates (book updates) than actual trades, therefore one may expect this file to be more sparse than the order book.

 - `stock_id` - Same as above.
 - `time_id` - Same as above.
 - `seconds_in_bucket` - Same as above. Note that since trade and book data are taken from the same time window and trade data is more sparse in general, this field is not necessarily starting from 0.
 - `price` - The average price of executed transactions happening in one second. Prices have been normalized and the average has been weighted by the number of shares traded in each transaction.
 - `size` - The sum number of shares traded.
 - `order_count` - The number of unique trade orders taking place.
 
**train.csv** The ground truth values for the training set.

 - `stock_id` - Same as above, but since this is a csv the column will load as an integer instead of categorical.
 - `time_id` - Same as above.
 - `target` - The realized volatility computed over the 10 minute window following the feature data under the same `stock_id`/`time_id`. There is no overlap between feature and target data. 
 
**test.csv** Provides the mapping between the other data files and the submission file. As with other test files, most of the data is only available to your notebook upon submission with just the first few rows available for download.

 - `stock_id` - Same as above.
 - `time_id` - Same as above.
 - `row_id` - Unique identifier for the submission row. There is one row for each existing `stock_id`/`time_id` pair. Each time window is not necessarily containing every individual stock.
 
**sample_submission.csv** - A sample submission file in the correct format.

 - `row_id` - Same as in test.csv.
 - `target` - Same definition as in **train.csv**. The benchmark is using the median target value from **train.csv**.
 
## Prepare Environment
### Import Packages

In [1]:
# General packages
import pandas as pd
import numpy as np
import pyarrow.parquet as pq # To handle parquet files
import os
import gc
import random
from tqdm import tqdm, tqdm_notebook
from pathlib import Path
import multiprocessing

import time
import warnings
warnings.filterwarnings('ignore')

# Data vis packages
import matplotlib.pyplot as plt
%matplotlib inline

# Data prep
from sklearn.preprocessing import RobustScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.feature_selection import SelectFromModel
from sklearn.decomposition import PCA

# Modelling packages
import tensorflow as tf
from tensorflow import keras
from tensorflow.python.keras import backend as k
# Key layers
from tensorflow.keras.models import Model, Sequential, load_model
from tensorflow.keras.layers import Input, Add, Dense, Flatten
# Activation layers
from tensorflow.keras.layers import ReLU, LeakyReLU, ELU, ThresholdedReLU
# Dropout layers
from tensorflow.keras.layers import Dropout, AlphaDropout, GaussianDropout
# Normalisation layers
from tensorflow.keras.layers import BatchNormalization
# Embedding layers
from tensorflow.keras.layers import Embedding, Concatenate, Reshape
# Callbacks
from tensorflow.keras.callbacks import Callback, EarlyStopping, LearningRateScheduler, ModelCheckpoint
# Optimisers
from tensorflow.keras.optimizers import SGD, RMSprop, Adam, Adadelta, Adagrad, Adamax, Nadam, Ftrl
# Model cross validation and evaluation
from sklearn.model_selection import StratifiedKFold
from tensorflow.keras.losses import binary_crossentropy

# For Bayesian hyperparameter searching
from skopt import gbrt_minimize, gp_minimize
from skopt.utils import use_named_args
from skopt.space import Real, Categorical, Integer

In [2]:
strategy = tf.distribute.get_strategy()
REPLICAS = strategy.num_replicas_in_sync

# Data access
gpu_options = tf.compat.v1.GPUOptions(allow_growth=True)

# Get number of cpu cores for multiprocessing
try:
    cpus = int(multiprocessing.cpu_count() / 2)
except NotImplementedError:
    cpus = 1 # Default number of cores
    
print(f"Num GPUs Available: {len(tf.config.experimental.list_physical_devices('GPU'))}")
print(f"Num CPU Threads Available: {cpus}")
print(f'REPLICAS: {REPLICAS}')

Num GPUs Available: 1
Num CPU Threads Available: 64
REPLICAS: 1


### Read in Data

In [3]:
# Data paths
comp_dir_path = Path("../input/optiver-realized-volatility-prediction")

# Train paths
train_book_path   = comp_dir_path/"book_train.parquet"
train_trade_path  = comp_dir_path/"trade_train.parquet"
train_labels_path = comp_dir_path/"train.csv"

# Test paths
test_book_path   = comp_dir_path/"book_test.parquet"
test_trade_path  = comp_dir_path/"trade_test.parquet"
test_labels_path = comp_dir_path/"test.csv"

# Sample submission path
sample_sub_path = comp_dir_path/"sample_submission.csv"

In [4]:
# Define helper functions for data reading
def get_stock_ids_list(data_dir_path):
    data_dir = os.listdir(data_dir_path)
    # Get list of stock ids in directory
    stock_ids = list(map(lambda x: x.split("=")[1], data_dir))
    return stock_ids
    
    
def load_book_stock_id_data(stock_id):
    # Get stock id extension
    stock_id_ext = f"stock_id={stock_id}"
    
    # Read individual stock parquet file
    if is_train_test == "train":
        book_stock_id_path = os.path.join(train_book_path, stock_id_ext)
    elif is_train_test == "test":
        book_stock_id_path = os.path.join(test_book_path, stock_id_ext)
    book_stock_id = pd.read_parquet(book_stock_id_path)
    
    # Add stock id feature from filename
    book_stock_id["stock_id"] = stock_id
            
    return book_stock_id

def load_trade_stock_id_data(stock_id):
    # Get stock id extension
    stock_id_ext = f"stock_id={stock_id}"
    
    # Read individual stock parquet file
    if is_train_test == "train":
        trade_stock_id_path = os.path.join(train_trade_path, stock_id_ext)
    elif is_train_test == "test":
        trade_stock_id_path = os.path.join(test_trade_path, stock_id_ext)
    trade_stock_id = pd.read_parquet(trade_stock_id_path)
    
    # Add stock id feature from filename
    trade_stock_id["stock_id"] = stock_id
            
    return trade_stock_id

In [5]:
%%time
# Get list of stock ids
train_stock_ids = get_stock_ids_list(train_book_path)
test_stock_ids = get_stock_ids_list(test_book_path)

# Read train data
is_train_test = "train"
# Create worker pool and read
pool         = multiprocessing.Pool(processes=cpus)
train_book   = pd.concat(pool.map(load_book_stock_id_data, train_stock_ids[0:2]))
train_trade  = pd.concat(pool.map(load_trade_stock_id_data, train_stock_ids[0:2]))
train_labels = pd.read_csv(train_labels_path)
# Close worker pool
pool.close()
pool.join()

# Read test data
is_train_test = "test"
# Create worker pool and read
pool        = multiprocessing.Pool(processes=cpus)
test_book   = pd.concat(pool.map(load_book_stock_id_data, test_stock_ids))
test_trade  = pd.concat(pool.map(load_trade_stock_id_data, test_stock_ids))
test_labels = pd.read_csv(test_labels_path)

# Read sample submission
sample_sub = pd.read_csv(sample_sub_path)

# Print data dimensions
print("TRAIN DATA DIMENSIONS")
print(f"train_book shape: {train_book.shape}")
print(f"train_trade shape: {train_trade.shape}")
print(f"train_labels shape: {train_labels.shape}")

print("\nTEST DATA DIMENSIONS")
print(f"test_book shape: {test_book.shape}")
print(f"test_trade shape: {test_trade.shape}")
print(f"test_labels shape: {test_labels.shape}\n")

TRAIN DATA DIMENSIONS
train_book shape: (2425085, 11)
train_trade shape: (419653, 6)
train_labels shape: (428932, 3)

TEST DATA DIMENSIONS
test_book shape: (3, 11)
test_trade shape: (3, 6)
test_labels shape: (3, 3)

CPU times: user 226 ms, sys: 766 ms, total: 993 ms
Wall time: 1.37 s


## Data Preparation
### Feature Engineering

In [6]:
# Define helper functions for data manipulation
def get_log_return(list_stock_prices):
    return np.log(list_stock_prices).diff()


def get_trade_log_return(df_trade, col_stock_id, col_time_id, col_price):
    """
    Returns the Log Return at each time ID.
    """
    trade_log_return = df_trade.groupby([col_stock_id, col_time_id])[col_price].apply(get_log_return)
    trade_log_return = trade_log_return.fillna(0)
    return trade_log_return


def get_agg_feature(df, col_name, func):
    """
    Returns aggregated feature by stock ID and time ID based on input df and feature.
    """
    if "function" in str(func):
        func_str = str(func).split(" ")[1]
        agg_feat_col_name = f"{col_name}_{func_str}"
    else:
        agg_feat_col_name = f"{col_name}_{func}"
    
    agg_feat = df.groupby(by=["stock_id", "time_id"])[col_name].agg(func)
    agg_feat = agg_feat.reset_index().rename(columns={col_name: agg_feat_col_name})
    
    return agg_feat


def get_wap(df_book, col_bid_price, col_ask_price, col_bid_size, col_ask_size):
    """
    Returns Weighted Average Price. 
    """
    wap_numerator = df_book[col_bid_price]  * df_book[col_ask_size]
    wap_numerator += df_book[col_ask_price] * df_book[col_bid_size]
    
    wap_denominator = df_book[col_bid_size] + df_book[col_ask_size]
    
    return wap_numerator / wap_denominator


def get_wap_combined(df_book, col_bid_price1, col_ask_price1, col_bid_size1, col_ask_size1,
                     col_bid_price2, col_ask_price2, col_bid_size2, col_ask_size2):    
    """
    Returns the Combined Weighted Average Price for both Bid and Ask features.
    """
    wap_numerator1  = df_book[col_bid_price1] * df_book[col_ask_size1]
    wap_numerator1 += df_book[col_ask_price1] * df_book[col_bid_size1]
    wap_numerator2  = df_book[col_bid_price2] * df_book[col_ask_size2]
    wap_numerator2 += df_book[col_ask_price2] * df_book[col_bid_size2]
    
    wap_denominator  = df_book[col_bid_size1] + df_book[col_ask_size1]
    wap_denominator += df_book[col_bid_size2] + df_book[col_ask_size2]
    
    return (wap_numerator1 + wap_numerator2) / wap_denominator


def get_wap_avg(df_book, col_bid_price1, col_ask_price1, col_bid_size1, col_ask_size1,
                col_bid_price2, col_ask_price2, col_bid_size2, col_ask_size2):
    """
    Returns the Combined Average Weighted Average Price for both Bid and Ask features.
    """
    wap_numerator1  = df_book[col_bid_price1] * df_book[col_ask_size1]
    wap_numerator1 += df_book[col_ask_price1] * df_book[col_bid_size1]
    wap_numerator1 /= df_book[col_bid_size1] + df_book[col_ask_size1]
    
    wap_numerator2  = df_book[col_bid_price2] * df_book[col_ask_size2]
    wap_numerator2 += df_book[col_ask_price2] * df_book[col_bid_size2]
    wap_numerator2 /= df_book[col_bid_size2] + df_book[col_ask_size2]
    
    return (wap_numerator1 + wap_numerator2) / 2


def get_vol_wap(df_book, col_stock_id, col_time_id, col_wap):
    """
    Returns the Volume Weighted Average Price at each time ID.
    """
    vol_wap = df_book.groupby([col_stock_id, col_time_id])[col_wap].apply(get_log_return)
    vol_wap = vol_wap.fillna(0)
    return vol_wap


def get_bid_ask_spread(df_book, col_bid_price1, col_ask_price1, col_bid_price2, col_ask_price2):
    """
    Get Combined bid ask spread using both Bid and Ask features.
    """
    bas_numerator   = df_book[[col_ask_price1, col_ask_price2]].min(axis=1)
    bas_denominator = df_book[[col_bid_price1, col_bid_price2]].max(axis=1) - 1
    
    return bas_numerator / bas_denominator


def get_vertical_spread(df_book, col_price1, col_price2):
    """
    Returns the vertical spread for Bid/Ask price features inputted.
    """
    v_spread = df_book[col_price1] - df_book[col_price2]
    return v_spread


def get_spread_feature(df_book, col_price_a, col_price_b):
    """
    Returns a spread feature based on the price features inputted.
    """
    spread_feat = df_book[col_price_a] - df_book[col_price_b]
    return spread_feat


def realized_volatility(series_log_return):
    """
    Returns the realized volatility for a given period.
    """
    return np.sqrt(np.sum(series_log_return**2))


def rmspe(y_true, y_pred):
    """
    Returns the Root Mean Squared Prediction Error.
    """
    rmspe = np.sqrt(np.mean(np.square((y_true - y_pred) / y_true)))
    return rmspe


def get_row_id(df, col_stock_id, col_time_id):
    """
    Returns row ids in format required for submission. 
    """
    row_ids = df[col_stock_id].astype("str") + "-" + df[col_time_id].astype("str")
    return row_ids

In [7]:
# Compile data manipulation helper functions into complete functions
def extract_trade_feature_set(df_trade):
    """
    Returns engineered trade dataset, where each row is a unique stock ID/time ID pair.
    """
    # Get the Log return for trades by stock ID and time ID
    df_trade["trade_log_return"] = get_trade_log_return(df_trade, "stock_id", "time_id", "price")
    #df_trade["trade_log_return"] = df_trade["trade_log_return"].fillna(0)
    
    # Get aggregate statistics for specified numerical features
    trade_features = ["price", "size", "order_count", "trade_log_return"]
    
    for trade_feature in trade_features:
        # Get min aggregations
        df_trade = df_trade.merge(
            get_agg_feature(df=df_trade, col_name=trade_feature, func="min", rename=True),
            how="left",
            on=["stock_id", "time_id"]
        )
        # Get max aggregations
        df_trade = df_trade.merge(
            get_agg_feature(df=df_trade, col_name=trade_feature, func="max", rename=True),
            how="left",
            on=["stock_id", "time_id"]
        )
        # Get mean aggregations
        df_trade = df_trade.merge(
            get_agg_feature(df=df_trade, col_name=trade_feature, func="mean", rename=True),
            how="left",
            on=["stock_id", "time_id"]
        )
        # Get std aggregations
        df_trade = df_trade.merge(
            get_agg_feature(df=df_trade, col_name=trade_feature, func="std", rename=True),
            how="left",
            on=["stock_id", "time_id"]
        )
        # Get sum aggregations
        df_trade = df_trade.merge(
            get_agg_feature(df=df_trade, col_name=trade_feature, func="sum", rename=True),
            how="left",
            on=["stock_id", "time_id"]
        )
    
    # Reduce trade df to just unique stock ID and time ID pairs
    df_trade = df_trade.drop(["seconds_in_bucket", "price", "size", "order_count", "trade_log_return"], axis=1)
    df_trade = df_trade.drop_duplicates().reset_index(drop=True)
    
    return df_trade


def extract_book_feature_set(df_book):
    """
    Returns engineered book dataset, where each row is a unique stock ID/time ID pair.
    """
    # WAP for both bid/ask price/size features
    df_book["wap1"] = get_wap(df_book, "bid_price1", "ask_price1", "bid_size1", "ask_size1")
    df_book["wap2"] = get_wap(df_book, "bid_price2", "ask_price2", "bid_size2", "ask_size2")
    # Combined WAP
    df_book["wap_combined"] = get_wap_combined(
        df_book, "bid_price1", "ask_price1", "bid_size1", "ask_size1", 
        "bid_price2", "ask_price2", "bid_size2", "ask_size2"
    )
    # Average WAP for both bid/ask price/size features
    df_book["wap_avg"] = get_wap_avg(
        df_book, "bid_price1", "ask_price1", "bid_size1", "ask_size1", 
        "bid_price2", "ask_price2", "bid_size2", "ask_size2"
    )
    
    # Get VWAPS based on different WAP features
    df_book["vol_wap1"]         = get_vol_wap(df_book, "stock_id", "time_id", "wap1")
    df_book["vol_wap2"]         = get_vol_wap(df_book, "stock_id", "time_id", "wap2")
    df_book["vol_wap_combined"] = get_vol_wap(df_book, "stock_id", "time_id", "wap_combined")
    df_book["vol_wap_avg"]      = get_vol_wap(df_book, "stock_id", "time_id", "wap_avg")
    
    # Get different spread features
    df_book["bid_ask_spread"] = get_bid_ask_spread(df_book, "bid_price1", "ask_price1", "bid_price2","ask_price2")
    df_book["bid_v_spread"]   = get_vertical_spread(df_book, "bid_price1", "bid_price2")
    df_book["ask_v_spread"]   = get_vertical_spread(df_book, "ask_price1", "ask_price2")
    df_book["h_spread1"]      = get_spread_feature(df_book, "ask_price1", "bid_price1")
    df_book["h_spread2"]      = get_spread_feature(df_book, "ask_price2", "bid_price2")
    df_book["spread_diff1"]   = get_spread_feature(df_book, "ask_price1", "bid_price2")
    df_book["spread_diff2"]   = get_spread_feature(df_book, "ask_price2", "bid_price1")
    
    # Get aggregated volatility features for each VWAP
    vol_features = ["vol_wap1", "vol_wap2", "vol_wap_combined", "vol_wap_avg"]
    
    for vol_feature in vol_features:
         df_book = df_book.merge(
             get_agg_feature(df=df_book, col_name=vol_feature, func=realized_volatility),
             how="left",
             on=["stock_id", "time_id"]
         )
            
    # Get aggregated features for different spread features
    spread_features = [
        "bid_ask_spread", "bid_v_spread", "ask_v_spread", "h_spread1", 
        "h_spread2", "spread_diff1", "spread_diff2"
    ]
    
    for spread_feature in spread_features:
        # Get min aggregations
        df_book = df_book.merge(
            get_agg_feature(df=df_book, col_name=spread_feature, func="min"),
            how="left",
            on=["stock_id", "time_id"]
        )
        # Get max aggregations
        df_book = df_book.merge(
            get_agg_feature(df=df_book, col_name=spread_feature, func="max"),
            how="left",
            on=["stock_id", "time_id"]
        )
        # Get mean aggregations
        df_book = df_book.merge(
             get_agg_feature(df=df_book, col_name=spread_feature, func="mean"),
             how="left",
             on=["stock_id", "time_id"]
        )
        # Get std aggregations
        df_book = df_book.merge(
            get_agg_feature(df=df_book, col_name=spread_feature, func="std"),
            how="left",
            on=["stock_id", "time_id"]
        )
        # Get sum aggregations
        df_book = df_book.merge(
            get_agg_feature(df=df_book, col_name=spread_feature, func="sum"),
            how="left",
            on=["stock_id", "time_id"]
        )

    # Reduce trade df to just unique stock ID and time ID pairs
    df_book = df_book.drop([
        "seconds_in_bucket", "bid_price1", "ask_price1", "bid_price2", 
        "ask_price2", "bid_size1", "ask_size1", "bid_size2", "ask_size2",
        # WAP features
        "wap1", "wap2", "wap_combined", "wap_avg", "vol_wap1", 
        "vol_wap2", "vol_wap_combined", "vol_wap_avg", 
        # Spread features
        "bid_ask_spread", "bid_v_spread", "ask_v_spread", "h_spread1", 
        "h_spread2", "spread_diff1", "spread_diff2" 
    ], axis=1)
    df_book = df_book.drop_duplicates().reset_index(drop=True)
    
    return df_book

In [8]:
extract_book_feature_set(train_book)

Unnamed: 0,time_id,stock_id,vol_wap1_realized_volatility,vol_wap2_realized_volatility,vol_wap_combined_realized_volatility,vol_wap_avg_realized_volatility,bid_ask_spread_min,bid_ask_spread_max,bid_ask_spread_mean,bid_ask_spread_std,...,spread_diff1_min,spread_diff1_max,spread_diff1_mean,spread_diff1_std,spread_diff1_sum,spread_diff2_min,spread_diff2_max,spread_diff2_mean,spread_diff2_std,spread_diff2_sum
0,5,0,0.004499,0.006999,0.004106,0.004115,235.541718,7.051342e+02,320.050354,99.705377,...,0.000414,0.001551,0.001031,0.000223,0.311224,0.000517,0.001500,0.001006,0.000185,0.303775
1,11,0,0.001204,0.002476,0.001507,0.001268,-39871.871094,3.978736e+04,1878.224854,17613.091377,...,0.000251,0.000954,0.000536,0.000192,0.107207,0.000251,0.001054,0.000529,0.000170,0.105850
2,16,0,0.002369,0.004801,0.002469,0.002719,-13928.000000,8.362587e+03,-1408.515747,3016.430184,...,0.000431,0.001388,0.000921,0.000246,0.173237,0.000479,0.001628,0.000923,0.000252,0.173524
3,31,0,0.002574,0.003637,0.002709,0.002625,-5406.040039,-3.786276e+02,-1293.191162,1691.972493,...,0.000416,0.001989,0.001049,0.000381,0.125890,0.000555,0.001666,0.000968,0.000262,0.116127
4,62,0,0.001894,0.003257,0.001932,0.001901,-4768.609375,-1.045971e+03,-2068.633545,1141.670493,...,0.000233,0.001026,0.000588,0.000156,0.103437,0.000233,0.000979,0.000506,0.000150,0.089079
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7655,32751,1,0.003723,0.004996,0.003322,0.003101,-14897.615234,inf,inf,,...,0.000201,0.001142,0.000755,0.000178,0.231676,0.000336,0.001142,0.000716,0.000161,0.219720
7656,32753,1,0.010829,0.012168,0.010422,0.009786,-14362.764648,inf,inf,,...,0.000279,0.002508,0.001090,0.000365,0.538242,0.000209,0.002159,0.001056,0.000335,0.521524
7657,32758,1,0.003135,0.004268,0.002797,0.002765,-11208.745117,inf,inf,,...,0.000268,0.001072,0.000789,0.000137,0.246919,0.000447,0.001250,0.000780,0.000158,0.244062
7658,32763,1,0.003750,0.005773,0.003814,0.003890,-16265.990234,1.626199e+04,7.209878,2403.935707,...,0.000246,0.001353,0.000612,0.000224,0.265700,0.000246,0.001476,0.000654,0.000242,0.283659
