In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import tensorflow as tf # tensorflow 
from tensorflow import keras # Keras
from keras import layers # Keras.Layers

from sklearn.metrics import r2_score


# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
#for dirname, _, filenames in os.walk('/kaggle/input'):
#    for filename in filenames:
#        print(os.path.join(dirname, filename))
        
# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [78]:
# Variables & functions

book_input_dir='/kaggle/input/optiver-realized-volatility-prediction/book_train.parquet/'
book_input_val_dir='/kaggle/input/optiver-realized-volatility-prediction/book_train.parquet/'
book_input_test_dir='/kaggle/input/optiver-realized-volatility-prediction/book_test.parquet/'
trade_input_dir='/kaggle/input/optiver-realized-volatility-prediction/trade_train.parquet/'
trade_input_test_dir='/kaggle/input/optiver-realized-volatility-prediction/trade_test.parquet/'
target_train_data=pd.read_csv('/kaggle/input/optiver-realized-volatility-prediction/train.csv')
target_test= '/kaggle/input/optiver-realized-volatility-prediction/test.csv'
#time_split=[0,60,120,180,240,300,360,420,480,540,600]
time_split=[0,100,200,300,400,500,600]
TIME_INTERVAL=len(time_split)-1
NUM_FEATURES_RNN=6
key_fields=['time_id','stock_id']
val_files_list = []
train_files_list = []
test_files_list = []


def log_return(list_stock_prices):
    return np.log(list_stock_prices).diff() 

def realized_volatility(series_log_return):
    return np.sqrt(np.sum(series_log_return**2))

def get_wap1(df):
    a1 = df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']
    b = df['bid_size1'] + df['ask_size1']
    return a1/b

def get_wap2(df):
    a1 = df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']
    b = df['bid_size2'] + df['ask_size2']
    return a1/b

def get_wap(df):
    a1 = df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']
    a2 = df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']
    b = df['bid_size1'] + df['ask_size1'] + df['bid_size2']+ df['ask_size2']
    return (a1 + a2)/ b


def get_wap3(df):
    wap = (df['bid_price1'] * df['bid_size1'] + df['ask_price1'] * df['ask_size1']) / (df['bid_size1'] + df['ask_size1'])
    return wap

def get_wap4(df):
    wap = (df['bid_price2'] * df['bid_size2'] + df['ask_price2'] * df['ask_size2']) / (df['bid_size2'] + df['ask_size2'])
    return wap

def get_trade_wap(df):
    return df['price'] * df['size'] /  df['size']

def get_trade_wap_count(df):
    return (df['price'] * df['size'] * df['order_count']) / ((df['size'] * df['order_count']))


def normalize_trade(df):
    result = df.copy()
    for feature_name in ['size', 'order_count']:
        max_value = df[feature_name].max()
        min_value = df[feature_name].min()
        result[feature_name] = (df[feature_name] - min_value) / (max_value - min_value)
    return result

def normalize_trade_osum(df):
    result = df.copy()
    for feature_name in ['osum']:
        max_value = df[feature_name].max()
        min_value = df[feature_name].min()
        result[feature_name] = (df[feature_name] - min_value) / (max_value - min_value)
    return result

def normalize_book(df):
    result = df.copy()
    for feature_name in ['log_return', 'wap']:
        max_value = df[feature_name].max()
        min_value = df[feature_name].min()
        result[feature_name] = (df[feature_name] - min_value) / (max_value - min_value)
    return result

def normalize_book_data(df):
    result = df.copy()
    for feature_name in ['price_spread', 'price_spread2', 'bid_spread', 'ask_spread', 'bid_ask_spread']:
        max_value = df[feature_name].max()
        min_value = df[feature_name].min()
        result[feature_name] = (df[feature_name] - min_value) / (max_value - min_value)
    return result

book_files = tf.data.Dataset.list_files(book_input_dir+"*")
for book_file in book_files:
    stock_id = tf.strings.split(book_file,"=")[-1]
    stock_id = tf.strings.to_number(stock_id, tf.int32).numpy()
    val_file = (8,19,27,36,45,54,63,72,82,99,110,121)
    
    if stock_id in val_file:
        val_files_list.append(book_file)
    #elif stock_id not in val_file:
    elif stock_id < 10:
        train_files_list.append(book_file)
        
test_files = tf.data.Dataset.list_files(book_input_test_dir+"*")
for test_file in test_files:
    test_files_list.append(test_file)

***How Data defined for RNN?***
TBC 

In [74]:
# Preprocess function
# reads book and trade parquet files as dataframes
# - adds stock ID to dataframe
# - derives wap and log_return for book data
# - aggregates wap/log_return as per time bin
# - normalizes trade data and aggregates price/size/order_count as per time bin
# - merges book, trade with target value (key - time_id and stock_id)
# returns data (as time-series ragged tensor) and target values.

def preprocess_rnn_data(mode, book_file, trade_file, stock_id):
    
    print(book_file, trade_file, type(stock_id))
    
    # Get All data in required format and combine with target
    book = pd.read_parquet(book_file)
    #trade = pd.read_parquet(trade_file)
    
    book['stock_id'] = stock_id
    #trade['stock_id'] = stock_id
    
    #Process book data
    book['id'] = str(stock_id) + '-' + book['time_id'].astype(str) 
    book['wap1'] = get_wap1(book) 
    book.loc[:,'log_return1'] = log_return(book['wap1'])
    book = book[~book['log_return1'].isnull()]
    
    book['wap2'] = get_wap2(book) 
    book.loc[:,'log_return2'] = log_return(book['wap2'])
    book = book[~book['log_return2'].isnull()]
    
    book['wap3'] = get_wap3(book) 
    book.loc[:,'log_return3'] = log_return(book['wap3'])
    book = book[~book['log_return3'].isnull()]
    
    book['wap4'] = get_wap4(book) 
    book.loc[:,'log_return4'] = log_return(book['wap4'])
    book = book[~book['log_return4'].isnull()]
    
    book['price_spread'] = (book['ask_price1'] - book['bid_price1']) / ((book['ask_price1'] + book['bid_price1']) / 2)
    book['price_spread2'] = (book['ask_price2'] - book['bid_price2']) / ((book['ask_price2'] + book['bid_price2']) / 2)
    
    book['bid_spread'] = book['bid_price1'] - book['bid_price2']
    book['ask_spread'] = book['ask_price1'] - book['ask_price2']
    book["bid_ask_spread"] = abs(book['bid_spread'] - book['ask_spread'])
    book['total_volume'] = (book['ask_size1'] + book['ask_size2']) + (book['bid_size1'] + book['bid_size2'])
    # drop columns
    #book = book.drop(columns=['bid_price1','ask_price1','bid_price2','ask_price2','bid_size1','ask_size1','bid_size2','ask_size2'])
    
    # aggregate by time bins
    #book = normalize_book(book)
    bins = pd.cut(book['seconds_in_bucket'], time_split) # split for every minute
    book_data = book.groupby(['time_id','stock_id',bins]).aggregate(
        vol1=('log_return1', realized_volatility),
        vol3=('log_return3', realized_volatility),
        vol4=('log_return4', realized_volatility),
        vol2=('log_return2', realized_volatility),
        #wap1=('wap1', np.std),
        #wap2=('wap2', np.std),
        #wap3=('wap3', np.std),
        #wap4=('wap4', np.std),
        ps1=('price_spread', np.std),
        ps2=('price_spread2', np.std),
        #bs_std=('bid_spread', np.std),
        #as_std=('ask_spread', np.std),
        bas_std=('bid_ask_spread', np.std)
    ).fillna(0)
   
    book_data = book_data.groupby(key_fields).agg(tuple).applymap(list).reset_index()

    if mode == "train":
        data = pd.merge(book_data,target_train_data,on=key_fields)
    else:
        data = book_data
    
    
    # Get data into 5x5 matrix
    data['final'] = data.apply(lambda d: np.column_stack((d['vol1'], d['vol2'], d['bas_std'], d['ps1'], d['ps2'], d['bas_std'])), axis=1)
    #size = data['target'].size
    size = len(data)
    rnn_data = tf.ragged.constant(
        [data['final']],
        inner_shape=(size, TIME_INTERVAL, NUM_FEATURES_RNN))
    sid = tf.fill(size, stock_id)
    
    if mode == "train":
        return {'data': rnn_data, 'stockID': sid}, data['target']
    else:
        return {'data': rnn_data, 'stockID': sid}

In [71]:
data = preprocess_rnn_data("train", '/kaggle/input/optiver-realized-volatility-prediction/book_train.parquet/stock_id=66',
                '/kaggle/input/optiver-realized-volatility-prediction/trade_train.parquet/stock_id=66',
               66)

In [75]:
# RNN Generator
def gen_rnn_train_dataset():
       for book_file in train_files_list:    
            stock_id = tf.strings.split(book_file,"/")[-1]
            stock_id_num = tf.strings.split(stock_id, '=')[-1]
            trade_file = trade_input_dir+stock_id
            processed_data = preprocess_rnn_data("train", book_file.numpy().decode('ascii') ,
                                             trade_file.numpy().decode('ascii'),
                                             tf.strings.to_number(stock_id_num, tf.int32).numpy())
                
            
            yield processed_data # tuple of ragged time-series tensor and target
            
def gen_rnn_val_dataset():
       for book_file in val_files_list:    
            stock_id = tf.strings.split(book_file,"/")[-1]
            stock_id_num = tf.strings.split(stock_id, '=')[-1]
            trade_file = trade_input_dir+stock_id
            processed_data = preprocess_rnn_data("train", book_file.numpy().decode('ascii') ,
                                             trade_file.numpy().decode('ascii'),
                                             tf.strings.to_number(stock_id_num, tf.int32).numpy())
                
           
            yield processed_data # tuple of ragged time-series tensor and target
            
#tensorflow Dataset 
# cache data based on files
# preprocess by creating realized vol for every n seconds
# combine with trade data every n seconds
# returns dataset having list of avg(price), avg(order) and realized vol per n secondshelp

train_rnn_data = tf.data.Dataset.from_generator(gen_rnn_train_dataset, 
                                               output_signature= (
                                                {'data':tf.TensorSpec(shape=(None,TIME_INTERVAL, NUM_FEATURES_RNN), dtype=tf.float64, name='data'),
                                                 'stockID':tf.TensorSpec(shape=(None,), dtype=tf.int16, name='stockID')},
                                                tf.TensorSpec(shape=(None,), dtype=tf.float64, name='target'),
                                           ))
train_rnn_data=train_rnn_data.prefetch(buffer_size=5)
train_rnn_data=train_rnn_data.unbatch()
train_rnn_data=train_rnn_data.batch(32)
train_rnn_data=train_rnn_data.cache()
# Shuffle,repeat,batch ??

#val dataset
val_rnn_data = tf.data.Dataset.from_generator(gen_rnn_val_dataset, 
                                               output_signature= (
                                                {'data':tf.TensorSpec(shape=(None,TIME_INTERVAL, NUM_FEATURES_RNN), dtype=tf.float64, name='data'),
                                                 'stockID':tf.TensorSpec(shape=(None,), dtype=tf.int16, name='stockID')},
                                                tf.TensorSpec(shape=(None,), dtype=tf.float64, name='target'),
                                           ))
val_rnn_data=val_rnn_data.prefetch(buffer_size=5)
val_rnn_data=val_rnn_data.unbatch()
val_rnn_data=val_rnn_data.batch(32)
val_rnn_data=val_rnn_data.cache()

In [76]:
def rmspe(y_true, y_pred):
    return  (tf.sqrt(tf.reduce_mean(tf.square((y_true - y_pred) / y_true))))

def r2score(y, y_pred):
  residual = tf.reduce_sum(tf.square(tf.subtract(y, y_pred)))
  total = tf.reduce_sum(tf.square(tf.subtract(y, tf.reduce_mean(y))))
  r2 = tf.subtract(1.0, tf.divide(residual, total))
  return r2

In [139]:
#Build Model
CELLS=256
EMBEDDING_SIZE=8
OUTPUT_RNN=32
OUTPUT_FRNN=8

input_rnn = keras.Input(shape=(TIME_INTERVAL, NUM_FEATURES_RNN), name='data')
sID = keras.Input(shape=(1), name='stockID')
flat_rnn = layers.Flatten()(input_rnn)


#RNN Processing
rnn = layers.GRU(CELLS, activation='relu', return_sequences=True)(input_rnn)
rnn = layers.GRU(OUTPUT_RNN, activation='relu')(rnn)

#Flat RNN Processing 
frnn = keras.layers.Dense(CELLS, activation='relu')(flat_rnn)
frnn = keras.layers.Dense(128, activation='relu')(flat_rnn)
frnn = layers.Dense(OUTPUT_FRNN, activation='relu')(frnn)

#StockID Embedding
sembed = layers.Embedding(127, EMBEDDING_SIZE, input_length=1)(sID)
embed = layers.Flatten()(sembed)

#Concatenated Processing
combined = layers.concatenate([rnn, frnn, embed])
combined = layers.Dense(CELLS,activation='relu')(combined)
#combined = keras.layers.BatchNormalization()(combined)
final = layers.Dense(CELLS, activation='relu')(combined)
final = layers.Dense(CELLS, activation='relu')(combined)
final = layers.Dropout(.3)(final)
final = layers.Dense(128, activation='relu')(final)
final = layers.Dense(32, activation='relu')(final)
final = layers.Dense(1)(final)

model= keras.models.Model(inputs=[input_rnn, sID], outputs=final)
        
callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=6)
model.compile(optimizer=keras.optimizers.Adam(),
              loss=rmspe, 
              metrics=[r2score, keras.metrics.MeanSquaredError()])
    
model.summary()

In [107]:
keras.utils.plot_model(model)

In [141]:
history = model.fit(x=train_rnn_data, validation_data=val_rnn_data,
                    epochs=30, verbose=1, callbacks=[callback])

In [142]:
pd.DataFrame(history.history)[['loss','val_loss']].plot()

In [143]:
pd.DataFrame(history.history)[['r2score','val_r2score']].plot()

In [172]:
def gen_test_dataset():
       processed_data = []
       for book_file in test_files_list:    
            stock_id = tf.strings.split(book_file,"/")[-1]
            stock_id_num = tf.strings.split(stock_id, '=')[-1]
            trade_file = trade_input_dir+stock_id
            data = preprocess_rnn_data("test",book_file.numpy().decode('ascii') ,
                                             trade_file.numpy().decode('ascii'),
                                             tf.strings.to_number(stock_id_num, tf.int32).numpy())
                
            processed_data = processed_data.append(data)
            return processed_data # tuple of ragged time-series tensor and target

In [151]:
#Get Test Data
test_data = tf.data.Dataset.from_generator(gen_test_dataset, 
                                           output_signature= (
                                             {'data':tf.TensorSpec(shape=(None,TIME_INTERVAL, NUM_FEATURES_RNN), dtype=tf.float64, name='data'),
                                                 'stockID':tf.TensorSpec(shape=(None,), dtype=tf.int16, name='stockID')}
                                           ))
#test_data=test_data.prefetch(buffer_size=5)
#test_data=test_data.unbatch()
#test_data=test_data.batch(1)

In [179]:
test_data = gen_test_dataset()

In [158]:
tdata = preprocess_rnn_data("test",'/kaggle/input/optiver-realized-volatility-prediction/book_train.parquet/stock_id=10',
                '/kaggle/input/optiver-realized-volatility-prediction/trade_train.parquet/stock_id=10',
               10)

In [178]:
result = model.predict_generator(test_data)

In [137]:
# Model evaluate with test data and write output
result = model.predict(tf.convert_to_tensor([[2.941987e-04, 2.506079e-04, 1.000000e+00, 1.00000,1.000000e+00,1.000000e+00]]))