In [2]:
import os
import gc
import glob

import numpy as np
import pandas as pd

from itertools import islice

from multiprocessing import Pool
from sklearn.preprocessing import MinMaxScaler, StandardScaler

import tensorflow as tf
import keras.backend as K

from tensorflow.keras.layers import Dense, Lambda, Dot, Activation, Concatenate
from tensorflow.keras.layers import Layer

from sklearn.model_selection import train_test_split

from tqdm.auto import tqdm
tqdm.pandas()

import warnings
warnings.filterwarnings('ignore')

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
NTHREADS = 4
SEED = 42
TRAIN_BATCH_SIZE = 256
TEST_BATCH_SIZE = 256
BUCKET_WINDOWS2 = [(0, 100), (100, 200), (200, 300), (300, 400), (400, 500), (500, 600)]

DATA_PATH = '/content/drive/MyDrive/Colab Notebooks/OptiverVolatility'
BOOK_TRAIN_PATH = '/content/drive/MyDrive/Colab Notebooks/OptiverVolatility/book_train.parquet'
TRADE_TRAIN_PATH = '/content/drive/MyDrive/Colab Notebooks/OptiverVolatility/trade_train.parquet'
BOOK_TEST_PATH = '/content/drive/MyDrive/Colab Notebooks/OptiverVolatility/book_test.parquet'
TRADE_TEST_PATH = '/content/drive/MyDrive/Colab Notebooks/OptiverVolatility/trade_test.parquet'
CHECKPOINT = './model_checkpoint/model_01'

book_skip_columns = trade_skip_columns = ['time_id', 'row_id', 'target']

In [5]:
def get_path_dict(f, v):

    f_dict = {}
    for i in tqdm(v):
        fpath = f'{f}/stock_id={i}'
        flist = glob.glob(os.path.join(fpath, '*.parquet'))

        if len(flist) > 0:
            f_dict[i] = flist[0]

    return f_dict

In [6]:
train_ds = pd.read_csv(os.path.join(DATA_PATH, 'train.csv'))
test_ds = pd.read_csv(os.path.join(DATA_PATH, 'test.csv'))
print(f'Train ds shape: {train_ds.shape}')
print(f'Test ds shape: {test_ds.shape}')
train_ds['row_id'] = train_ds['stock_id'].astype(str) + '-' + train_ds['time_id'].astype(str)

Train ds shape: (428932, 3)
Test ds shape: (3, 3)


In [7]:
book_train_dict = get_path_dict(BOOK_TRAIN_PATH, train_ds['stock_id'].unique())
trade_train_dict = get_path_dict(TRADE_TRAIN_PATH, train_ds['stock_id'].unique())

book_test_dict = get_path_dict(BOOK_TEST_PATH, test_ds['stock_id'].unique())
trade_test_dict = get_path_dict(TRADE_TEST_PATH, test_ds['stock_id'].unique())

  0%|          | 0/112 [00:00<?, ?it/s]

  0%|          | 0/112 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

In [8]:
def calc_wap1(df):
    # Function to calculate first WAP
    wap = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']) / (df['bid_size1'] + df['ask_size1'])
    return wap

def calc_wap2(df):
    # Function to calculate second WAP
    wap = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']) / (df['bid_size2'] + df['ask_size2'])
    return wap

def calc_wap3(df):
    wap = (df['bid_price1'] * df['bid_size1'] + df['ask_price1'] * df['ask_size1']) / (df['bid_size1'] + df['ask_size1'])
    return wap

def calc_wap4(df):
    wap = (df['bid_price2'] * df['bid_size2'] + df['ask_price2'] * df['ask_size2']) / (df['bid_size2'] + df['ask_size2'])
    return wap

def calc_ma(df, colname, window_size):
    a = df[colname].rolling(window=window_size).mean()
    return a

def calc_mstd(df, colname, window_size):
    a = df[colname].rolling(window=window_size).std()
    return a

def calc_memw(df, colname, window_size):
    a = df[colname].ewm(span=10).mean()
    return a

def log_return(series):
    # Function to calculate the log of the return
    return np.log(series).diff().fillna(0)

def realized_volatility(series):
    # Calculate the realized volatility
    return np.sqrt(np.sum(series**2))

def count_unique(series):
    # Function to count unique elements of a series
    return len(np.unique(series))

def book_ds_fe(df):
    # Calculate Wap
    df['wap1'] = calc_wap1(df)
    df['wap2'] = calc_wap2(df)

    # Calculate log returns
    df['log_return1'] = df.groupby(['time_id'])['wap1'].apply(log_return).reset_index(level=0, drop=True)
    df['log_return2'] = df.groupby(['time_id'])['wap2'].apply(log_return).reset_index(level=0, drop=True)

    # Calculate wap balance
    df['wap_balance'] = abs(df['wap1'] - df['wap2'])

    # Calculate spread
    df['price_spread'] = (df['ask_price1'] - df['bid_price1']) / ((df['ask_price1'] + df['bid_price1']) / 2)
    df['price_spread2'] = (df['ask_price2'] - df['bid_price2']) / ((df['ask_price2'] + df['bid_price2']) / 2)

    df['bid_spread'] = df['bid_price1'] - df['bid_price2']
    df['ask_spread'] = df['ask_price1'] - df['ask_price2']
    df["bid_ask_spread"] = abs(df['bid_spread'] - df['ask_spread'])

    df['total_volume'] = (df['ask_size1'] + df['ask_size2']) + (df['bid_size1'] + df['bid_size2'])
    df['volume_imbalance'] = abs((df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2']))

    #####
    win_size = 10
    bid_price1_ma = calc_ma(df, 'bid_price1', win_size)
    ask_size1_ma = calc_ma(df, 'ask_size1', win_size)
    ask_price1_ma = calc_ma(df, 'ask_price1', win_size)
    bid_size1_ma = calc_ma(df, 'bid_size1', win_size)

    bid_price2_ma = calc_ma(df, 'bid_price2', win_size)
    ask_size2_ma = calc_ma(df, 'ask_size2', win_size)
    ask_price2_ma = calc_ma(df, 'ask_price2', win_size)
    bid_size2_ma = calc_ma(df, 'bid_size2', win_size)

    df['wap1_ma'] = (bid_price1_ma * ask_size1_ma + ask_price1_ma * bid_size1_ma) / (bid_size1_ma + ask_size1_ma)
    df['wap2_ma'] = (bid_price2_ma * ask_size2_ma + ask_price2_ma * bid_size2_ma) / (bid_size2_ma + ask_size2_ma)

    return df

def trade_ds_fe(df):
    df['log_return'] = df.groupby('time_id')['price'].transform(log_return)
    df['amount'] = df['price'] * df['size']
    return df


In [9]:
import librosa

''' MFCC coefficients contain information about the rate changes in the different spectrum bands '''
def get_mfcc(a):
    r = np.zeros((1, a.shape[1]))
    for i in range(a.shape[1]):
        mfcc = librosa.feature.mfcc(a[:, i])
        mfcc_mean = mfcc.mean(axis=1)
        #print(mfcc_mean)
        #r[:, i] = np.mean(mfcc_mean)
        r[:, i] = mfcc_mean[1]
    return r

In [10]:
%%capture
!pip install tsfresh

In [11]:
from tsfresh.feature_extraction import feature_calculators

''' Number of peaks '''
def get_number_peaks(a):
    r = np.zeros((1, a.shape[1]))
    for i in range(a.shape[1]):
        r[:, i] = feature_calculators.number_peaks(a[:, i], 2)
    return r

In [12]:
def np_seq_stat(a, s):
    ''' a - array, s - seconds_in_bucket'''

    r = []
    for w in BUCKET_WINDOWS2:

        idx = np.where(np.logical_and(s >= w[0], s < w[1]))[0]

        s_min = np.zeros((1, a.shape[1]))
        s_max = np.zeros((1, a.shape[1]))
        s_mean = np.zeros((1, a.shape[1]))
        s_std = np.zeros((1, a.shape[1]))
        s_median = np.zeros((1, a.shape[1]))
        s_sum = np.zeros((1, a.shape[1]))
        #s_mfcc = np.zeros((1, a.shape[1]))
        #s_peaks = np.zeros((1, a.shape[1]))

        if a[idx].shape[0] > 0:
            s_min = np.min(a[idx], axis=0, keepdims=True)
            s_max = np.max(a[idx], axis=0, keepdims=True)
            s_mean = np.mean(a[idx], axis=0, keepdims=True)
            s_std = np.std(a[idx], axis=0, keepdims=True)
            s_median = np.median(a[idx], axis=0, keepdims=True)
            s_sum = np.sum(a[idx], axis=0, keepdims=True)

            #s_peaks = get_number_peaks(a[idx]) # <- it gives small boost
            #s_mfcc = get_mfcc(a[idx])

        r.append(np.concatenate((s_min, s_max, s_mean, s_std, s_median, s_sum), axis=0))

    return np.nan_to_num(np.concatenate(r, axis=0).transpose())

In [13]:
def process_optiver_ds(ds, f_dict, fe_func, skip_cols, train_flg=True):

    x = []
    y = []

    for stock_id, stock_fnmame in tqdm(f_dict.items()):

        optiver_ds = pd.read_parquet(stock_fnmame)
        optiver_ds['row_id'] = str(stock_id) + '-' + optiver_ds['time_id'].astype(str)

        sds = ds[ds['stock_id'] == stock_id]

        cols = ['time_id', 'target']
        if train_flg == False:
            cols = ['time_id']

        merge_ds = pd.merge(sds[cols], optiver_ds, on='time_id', how='left')
        merge_ds = fe_func(merge_ds).fillna(0)

        cols = [c for c in merge_ds.columns if c not in skip_cols]

        np_ds = merge_ds[cols].to_numpy(dtype=np.float16)
        seconds_in_bucket = merge_ds['seconds_in_bucket'].to_numpy()
        g_idx = merge_ds[['time_id']].to_numpy()

        l = np.unique(g_idx, return_index=True)[1][1:]
        a_list = np.split(np_ds, l)
        s_list = np.split(seconds_in_bucket, l)

        stat = list(map(np_seq_stat, a_list, s_list))
        b = np.transpose(np.dstack(stat), (2, 1, 0))
        b = b.astype(np.float16)

        r = []
        if train_flg:
            targets = merge_ds[['target']].to_numpy(dtype=np.float16)
            t_list = np.split(targets, l)
            r = [t[0][0] for t in t_list]

        x.append(b)
        y.append(r)
        #break
    return x, y

In [14]:
def chunks(data, SIZE=10000):
    it = iter(data)
    for i in range(0, len(data), SIZE):
        yield {k:data[k] for k in islice(it, SIZE)}

def process_book_train_chunk(chunk_ds):
    return process_optiver_ds(train_ds, chunk_ds, book_ds_fe, book_skip_columns)
def process_trade_train_chunk(chunk_ds):
    return process_optiver_ds(train_ds, chunk_ds, trade_ds_fe, trade_skip_columns)
def process_book_test_chunk(chunk_ds):
    return process_optiver_ds(test_ds, chunk_ds, book_ds_fe, book_skip_columns, False)
def process_trade_test_chunk(chunk_ds):
    return process_optiver_ds(test_ds, chunk_ds, trade_ds_fe, trade_skip_columns, False)

book_train_chunks = [i for i in chunks(book_train_dict, int(len(book_train_dict)/NTHREADS))]
trade_train_chunks = [i for i in chunks(trade_train_dict, int(len(trade_train_dict)/NTHREADS))]

z = 1 if len(book_test_dict) < NTHREADS else NTHREADS
book_test_chunks = [i for i in chunks(book_test_dict, int(len(book_test_dict)/z))]
trade_test_chunks = [i for i in chunks(trade_test_dict, int(len(trade_test_dict)/z))]

In [None]:
%%time
pool = Pool(NTHREADS)
r = pool.map(process_book_train_chunk, book_train_chunks)
pool.close()

a1, a2 = zip(*r)
np_books = [np.concatenate(a1[i], axis=0) for i in range(len(a1))]
np_books = np.concatenate(np_books, axis=0)

targets = [np.concatenate(a2[i], axis=0) for i in range(len(a2))]
targets = np.concatenate(targets, axis=0)

In [None]:
%%time
pool = Pool(NTHREADS)
r = pool.map(process_trade_train_chunk, trade_train_chunks)
pool.close()

a1, _ = zip(*r)
np_trades = [np.concatenate(a1[i], axis=0) for i in range(len(a1))]
np_trades = np.concatenate(np_trades, axis=0)

In [None]:
print(np_books.shape, np_trades.shape, targets.shape)
np_train = np.concatenate((np_books, np_trades), axis=2)
print(np_train.shape, targets.shape)

In [None]:
idx = np.arange(np_train.shape[0])
train_idx, valid_idx = train_test_split(idx, shuffle=False, test_size=0.1, random_state=SEED)

In [None]:
# Scaler
transformers = []
for i in tqdm(range(np_train.shape[1])):
    a = np.nan_to_num(np_train[train_idx, i, :])
    b = np.nan_to_num(np_train[valid_idx, i, :])

    transformer = StandardScaler() # StandardScaler is very useful!
    np_train[train_idx, i, :] = transformer.fit_transform(a)
    np_train[valid_idx, i, :] = transformer.transform(b)
    transformers.append(transformer) # Save Scalers for the inference stage

In [None]:
np_train = np.nan_to_num(np_train)

In [None]:
# Loss function
def rmspe(y_true, y_pred):
    return K.sqrt(K.mean(K.square((y_true - y_pred) / y_true)))

In [None]:
# https://github.com/philipperemy/keras-attention-mechanism
class Attention(Layer):

    def __init__(self, units=128, **kwargs):
        self.units = units
        super().__init__(**kwargs)

    def __call__(self, inputs):
        """
        Many-to-one attention mechanism for Keras.
        @param inputs: 3D tensor with shape (batch_size, time_steps, input_dim).
        @return: 2D tensor with shape (batch_size, 128)
        @author: felixhao28, philipperemy.
        """
        hidden_states = inputs
        hidden_size = int(hidden_states.shape[2])
        # Inside dense layer
        #              hidden_states            dot               W            =>           score_first_part
        # (batch_size, time_steps, hidden_size) dot (hidden_size, hidden_size) => (batch_size, time_steps, hidden_size)
        # W is the trainable weight matrix of attention Luong's multiplicative style score
        score_first_part = Dense(hidden_size, use_bias=False, name='attention_score_vec')(hidden_states)
        #            score_first_part           dot        last_hidden_state     => attention_weights
        # (batch_size, time_steps, hidden_size) dot   (batch_size, hidden_size)  => (batch_size, time_steps)
        h_t = Lambda(lambda x: x[:, -1, :], output_shape=(hidden_size,), name='last_hidden_state')(hidden_states)
        score = Dot(axes=[1, 2], name='attention_score')([h_t, score_first_part])
        attention_weights = Activation('softmax', name='attention_weight')(score)
        # (batch_size, time_steps, hidden_size) dot (batch_size, time_steps) => (batch_size, hidden_size)
        context_vector = Dot(axes=[1, 1], name='context_vector')([hidden_states, attention_weights])
        pre_activation = Concatenate(name='attention_output')([context_vector, h_t])
        attention_vector = Dense(self.units, use_bias=False, activation='tanh', name='attention_vector')(pre_activation)
        return attention_vector

    def get_config(self):
        return {'units': self.units}

    @classmethod
    def from_config(cls, config):
        return cls(**config)

In [None]:
class DataGenerator(tf.keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, ds, targets, batch_size, shape=(32,32,32), shuffle=True):
        'Initialization'
        self.batch_size = batch_size
        self.targets = targets
        self.shape = shape
        self.ds = ds
        self.ids = np.arange(ds.shape[0])
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.ids) / self.batch_size))

    def __getitem__(self, index):

        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.ids[index*self.batch_size:(index+1)*self.batch_size]

        # Find list of IDs
        ids_temp = [self.ids[k] for k in indexes]


        x = self.ds[ids_temp, :, :]
        y = self.targets[ids_temp]

        return x, y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.ids = np.arange(self.ds.shape[0])
        if self.shuffle == True:
            np.random.shuffle(self.ids)

In [None]:
def get_model_v1():
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.LSTM(50, input_shape=(np_train.shape[1], np_train.shape[2]), return_sequences=True))
    model.add(tf.keras.layers.LSTM(50, input_shape=(np_train.shape[1], np_train.shape[2]), return_sequences=True))
    model.add(Attention(256)) # the gain is small, but ...
    model.add(tf.keras.layers.Dense(1))

    model.compile(loss=rmspe, optimizer='adam')
    model.summary()
    return model

In [None]:
training_generator = DataGenerator(np_train[train_idx, :, :], targets[train_idx], batch_size=TRAIN_BATCH_SIZE)
validation_generator = DataGenerator(np_train[valid_idx, :, :], targets[valid_idx], batch_size=TRAIN_BATCH_SIZE)

model = get_model_v1()

checkpoint_filepath = CHECKPOINT + '.weights.h5'
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True,
    monitor='val_loss',
    mode='min',
    save_best_only=True)

model_earlystopping_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=20)

NEPOCHS = 50
history = model.fit(x=training_generator,
                              callbacks=[model_checkpoint_callback, model_earlystopping_callback],
                              epochs=NEPOCHS,
                              validation_data=validation_generator,
                              # use_multiprocessing=False,
                              # workers=NTHREADS
)

In [None]:
import matplotlib.pyplot as plt

plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
a = np.min(history.history['val_loss'])
print(f'The best val_loss is {a:.4f}')

In [None]:
del np_train, np_books, np_trades
z = gc.collect()

In [None]:
%%time
pool = Pool(NTHREADS)
r = pool.map(process_book_test_chunk, book_test_chunks)
pool.close()

a1, _ = zip(*r)
np_books = [np.concatenate(a1[i], axis=0) for i in range(len(a1))]
np_books = np.concatenate(np_books, axis=0)

In [None]:
%%time
pool = Pool(NTHREADS)
r = pool.map(process_trade_test_chunk, trade_test_chunks)
pool.close()

a1, _ = zip(*r)
np_trades = [np.concatenate(a1[i], axis=0) for i in range(len(a1))]
np_trades = np.concatenate(np_trades, axis=0)

In [None]:
print(np_books.shape, np_trades.shape)
np_test = np.concatenate((np_books, np_trades), axis=2)
print(np_test.shape)

In [None]:
# Scaler
for i in tqdm(range(np_test.shape[1])):
    transformer = transformers[i]
    np_test[:, i, :] = transformer.transform(np.nan_to_num(np_test[:, i, :]))

In [None]:
np_test = np.nan_to_num(np_test)

In [None]:
model.load_weights(checkpoint_filepath)
res = model.predict(np_test, batch_size=TEST_BATCH_SIZE)
res = np.clip(res, 0, 1)

In [None]:
res[:3]

In [None]:
import shutil
shutil.rmtree('./model_checkpoint')

In [None]:
submission_ds = pd.DataFrame()
submission_ds['row_id'] = test_ds['row_id']
submission_ds['target'] = res
submission_ds.to_csv('submission.csv', index=False)

In [None]:
submission_ds[:3]