### Can our model predict current volatility?  (forget future; first it should be capable of predicting current one with given features)

In [1]:
import os
import time
import multiprocessing
from multiprocessing import Pool

import pandas as pd
import matplotlib.pyplot as plt
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
from torch.utils.tensorboard.writer import SummaryWriter

from optiver_features_handler import get_features_map_for_stock, get_row_id

In [2]:
DATA_DIRECTORY = os.path.join("..","input","optiver-realized-volatility-prediction")
OUTPUT_DIRECTORY = os.path.join("..","output")
MODEL_OUTPUT_DIRECTORY = os.path.join(OUTPUT_DIRECTORY,"models")
os.makedirs(OUTPUT_DIRECTORY,exist_ok=True)
os.makedirs(MODEL_OUTPUT_DIRECTORY,exist_ok=True)

In [7]:
data_interval_seconds = 5
data_intervals_count = int(600/data_interval_seconds)
class OptiverRealizedVolatilityDataset(Dataset):
    def __init__(self, data_directory, mode="train", lazy_load=True):
        """initializes Optiver Competition dataset
        `mode`: train|test
        `data_directory`: the datadirectory of the input data, where there are test.csv, train.csv, and parquet folders for trade_train.parquet and other relevant folders
        """
        print("INIT: OptiverRealizedVolatilityDataset")
        if mode.lower() not in ['train','test']:
            raise Exception("Invalid mode passed for Optiver dataset. Valid values:train|test")
        self.data_directory = data_directory
        self.mode = mode.lower()
        self.main_df = pd.read_csv(os.path.join(self.data_directory,f'{self.mode}.csv'))
#         if self.mode == 'train':
#             self.main_df['row_id'] = self.main_df.apply(lambda x: f"{x['stock_id']:.0f}-{x['time_id']:.0f}", axis=1)
        if self.mode == 'test':
            self.main_df['target'] = 0
        
        self.cache_stocks_done_set = set()
        # this is our final features lookup where we park all our features which can be addressed by row_id
        # which is individual train/test.csv row id using 'stock_id`-`time_id`
        self.cache_rowid_feature_map = {}
        row_id_series = self.main_df['stock_id'].astype(str) + "-" +self.main_df['time_id'].astype(str)
        targets = self.main_df['target'].tolist()
        self.stock_possible_timeids_list = {}
        for idx, row_id in enumerate(row_id_series.tolist()):
            stock_id = int(row_id.split('-')[0])
            time_id = int(row_id.split('-')[1])
            self.cache_rowid_feature_map[row_id] = {'target':targets[idx], 'stock_id':stock_id,'time_id':time_id,'row_id':row_id}
            
            # below code is to make sure what timeids we expect from stock data extractor
            # in case of missing parquet files we'll have to know the keys to fill default values into
            if stock_id not in self.stock_possible_timeids_list:
                self.stock_possible_timeids_list[stock_id] = []
            self.stock_possible_timeids_list[stock_id].append(time_id)
            
        
        if lazy_load == False:
            worker_data = []
            for gkey, gdf in self.main_df.groupby(['stock_id']):
                worker_data.append((self.data_directory, self.mode, gkey))
#             print("---------- CPU COUNG:", multiprocessing.cpu_count())
            # NOTE: this was hell of a hunt; this windows and pytorch and jupyter combination is too tedious
            #       make sure the function that we distribute don't call pytorch
            chunksize = multiprocessing.cpu_count() * 2
            processed = 0
            for worker_data_chunk in [worker_data[i * chunksize:(i + 1) * chunksize] for i in range((len(worker_data) + chunksize - 1) // chunksize )]:
                with Pool(multiprocessing.cpu_count()) as p:
                    
                    feature_set_list = p.starmap(get_features_map_for_stock, worker_data_chunk)
                    
                    for feature_map in feature_set_list:
                        for rowid, features_dict in feature_map.items():
                            for fkey,fval in features_dict.items():
                                self.cache_rowid_feature_map[rowid][fkey] = fval
                            self.cache_rowid_feature_map[rowid]  = OptiverRealizedVolatilityDataset.transform_to_01_realized_volatility_linear_data(self.cache_rowid_feature_map[rowid])
                        # udpate the indications that we've already fetched this stock and the lazy loader code won't fetch this again
                        self.cache_stocks_done_set.add(int(rowid.split('-')[0]))
                    
                    processed += chunksize
                    print(f"Processed and loaded {processed} stocks features.")
    
    def __cache_generate_features(self, main_stock_id, main_time_id):
            
            
            main_row_id = get_row_id(main_stock_id, main_time_id)
            if main_stock_id not in self.cache_stocks_done_set:
#                 trade_df = pd.read_parquet(os.path.join(self.data_directory, f"trade_{self.mode}.parquet", f"stock_id={stock_id}"))   
                # we'll combine the featureset with the bigger feature set of all stocks
                feature_map = get_features_map_for_stock(self.data_directory, self.mode, main_stock_id)
                # NOTE: sometime we might now have parquet files in that case we'll have 3 entried in .csv while only 1 gets returned in feature map
                # we need to cover for that disparity
                for time_id in self.stock_possible_timeids_list[main_stock_id]:
                    expected_row_id = get_row_id(main_stock_id, time_id)
                    if expected_row_id not in feature_map:
                        feature_map[expected_row_id] = {}
                for rowid, features_dict in feature_map.items():
                    for fkey,fval in features_dict.items():
                        self.cache_rowid_feature_map[rowid][fkey] = fval
                    self.cache_rowid_feature_map[rowid]  = OptiverRealizedVolatilityDataset.transform_to_01_realized_volatility_linear_data(self.cache_rowid_feature_map[rowid])
                self.cache_stocks_done_set.add(main_stock_id)
#             print(self.cache_rowid_feature_map[main_row_id])
#             print(torch.tensor([self.cache_rowid_feature_map[main_row_id].get('book_realized_volatility',0)]))
#             print(torch.tensor(self.cache_rowid_feature_map[main_row_id].get('log_return1_2s', [0]*(int(600/2)))))
#             print(torch.tensor(self.cache_rowid_feature_map.get('book_directional_volume1_2s', [0]*(int(600/2)))))
            return self.cache_rowid_feature_map[main_row_id]
        
    @staticmethod
    def transform_to_01_realized_volatility_linear_data(features_dict):
        return (
                {
                    'row_id':features_dict['row_id'],
#                     'book_realized_volatility':torch.tensor([features_dict.get('book_realized_volatility',0)]),
                    'logrett_xs':torch.tensor(features_dict.get('logrett_xs', [0]*(int(600/data_interval_seconds)))),
                    'trade_volume_xs':torch.tensor(features_dict.get('trade_volume_xs', [0]*(int(600/data_interval_seconds)))),
                    'trade_ordercount_xs':torch.tensor(features_dict.get('trade_ordercount_xs', [0]*(int(600/data_interval_seconds)))),
                    'logret1_xs':torch.tensor(features_dict.get('logret1_xs', [0]*(int(600/data_interval_seconds)))),
                    'logret2_xs':torch.tensor(features_dict.get('logret2_xs', [0]*(int(600/data_interval_seconds)))),
                    'book_dirvolume_xs':torch.tensor(features_dict.get('book_dirvolume_xs', [0]*(int(600/data_interval_seconds)))),
#                     'askp2_1s':torch.tensor(features_dict.get('askp2_1s', [0]*(int(600/1)))),
#                     'book_directional_volume1_1s':torch.tensor(features_dict.get('book_directional_volume1_1s', [0]*(int(600/1)))) 
                },
                torch.tensor([features_dict['target']])
#                 [features_dict['target']]
        )
    
    def __len__(self):
        return len(self.main_df)
    
    def __getitem__(self, idx):
        #TODO: handle for num_workers more than 0
        #      using https://pytorch.org/docs/stable/data.html
        #      using torch.util.data.get_worker_info()
        if torch.is_tensor(idx):
            idx = idx.tolist()
        stock_id = self.main_df.at[idx, 'stock_id']
        time_id = self.main_df.at[idx, 'time_id']
        x,y = self.__cache_generate_features(stock_id,time_id)
#         x, y = self.__transform_to_01_realized_volatility_linear_data(features_dict)
        return x,y

In [6]:
if __name__=="__main__":
    dataset = OptiverRealizedVolatilityDataset(DATA_DIRECTORY, mode="train", lazy_load=False)

INIT: OptiverRealizedVolatilityDataset
Processed and loaded 32 stocks features.
Processed and loaded 64 stocks features.
Processed and loaded 96 stocks features.
Processed and loaded 128 stocks features.


In [None]:
# for x in range(0,9):
#     print(dataset[x])
# dataset[10000] #[0]['bidp1_1s']
dataset[10000]

In [87]:
dataloader_train = DataLoader(dataset, batch_size=4096,shuffle=True, num_workers=0, pin_memory=True)
sizes = set()
for train_batch_idx, (feature_dict, feature_y) in enumerate(dataloader_train):
    sizes.add(f"{feature_dict['logrett_xs'].size()}")
        
        
#         print(val)
#         input()
#     print(x)
#     input()

In [88]:
sizes

{'torch.Size([2948, 120])', 'torch.Size([4096, 120])'}

### Learnings about model CNN input
- it's better to use multiple channel for logreturn1 and logreturn2 than stacking it and using as one channel
- 2 channels input for CNN is better than stacking it(dim 2, which is logret1_t1, logret2_t1, logret1_t2, logret2_t2...) and using it as one channel

In [31]:
use_cuda = torch.cuda.is_available()
# use_cuda = False
device = torch.device("cuda:0" if use_cuda else "cpu")
torch.backends.cudnn.benchmark = True
model = None

In [108]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.cnn_stack = nn.Sequential(
            nn.Conv1d(6, 12, kernel_size=6, stride=2, padding=0),
            nn.ReLU(),
#             nn.Dropout(0.1),
            nn.Conv1d(12, 12, kernel_size=6, stride=2, padding=0),
            nn.ReLU(),
            nn.Conv1d(12, 12, kernel_size=6, stride=2, padding=0),
            nn.ReLU(),
            nn.Dropout(0.1),
#             nn.Dropout(0.1),
#             nn.Conv1d(8, 8, kernel_size=4, stride=2, padding=0), 
#             nn.ReLU(),
#             nn.Dropout(0.1),
        )
        self.linear_stack = nn.Sequential(
            nn.LazyLinear(512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 128),
            nn.ReLU(),
#             nn.Dropout(0.3),
#             nn.Linear(256, 64),
#             nn.ReLU(),
            nn.Linear(128, 1)
        )
#         self.basic_stack = nn.Sequential(
#             nn.Linear(int(600/2)*1,512),
#             nn.ReLU(),
#             nn.Dropout(0.4),
#             nn.Linear(512,1024),
#             nn.ReLU(),
#             nn.Dropout(0.4),
# #             nn.Linear(2048,1024),
# #             nn.ReLU(),
# #             nn.Dropout(),
#             nn.Linear(1024,512),
#             nn.ReLU(),
#             nn.Dropout(0.3),
#             nn.Linear(512,128),
#             nn.ReLU(),
#             nn.Dropout(0.2),
#             nn.Linear(128,128),
#             nn.ReLU(),
#             nn.Linear(128,1),
#         )
        
    def forward(self, feature_dict):
#         logits = self.basic_stack(x)
#         x = self.flatten(x)
        x = torch.cat([
            torch.nan_to_num(feature_dict['logrett_xs'])*1000, 
                           feature_dict['trade_volume_xs'],
                          feature_dict['trade_ordercount_xs'],
                             feature_dict['logret1_xs']*1000,
                             feature_dict['logret2_xs']*1000,
                             feature_dict['book_dirvolume_xs'],
                          ], 1)
#         x = torch.nan_to_num(feature_dict['logrett_xs']).type(torch.cuda.FloatTensor)
#         print(x)
#         input()
#         if torch.isnan(x).any():
# #             print(x)
#             print(feature_dict)
#             input()
        x = x.to(device)
        x = x.reshape(-1,6,data_intervals_count)
        
        logits = self.cnn_stack(x)
        logits = self.flatten(logits)
        logits = self.linear_stack(logits)
        return logits



def loss_fn_mse(y, pred):
    return torch.mean(torch.square((y-pred)))

def loss_fn_mspe(y, pred):
    return torch.mean(torch.square((y-pred)/y))

def loss_fn_orig(y, pred):
    return torch.sqrt(torch.mean(torch.square((y-pred)/y)))

In [136]:
class VolatilityLSTM(nn.Module):
    def __init__(self):
        super(VolatilityLSTM, self).__init__()
        # 16 hidden layer size, 3 reapeted blocks of LSTM, 4 input size
        self.input_size = 6
        self.hidden_size = 64
        self.repeated_lstm_cells = 1
        self.rnn = nn.LSTM(self.input_size, self.hidden_size, self.repeated_lstm_cells, batch_first=True, dropout=0)
        self.linear_stack = nn.Sequential(
            nn.Linear(self.hidden_size*self.repeated_lstm_cells, 64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1)
        )
    def forward(self, feature_dict):
        x = torch.stack([
            torch.nan_to_num(feature_dict['logrett_xs'])*1000, 
                           feature_dict['trade_volume_xs'],
                          feature_dict['trade_ordercount_xs'],
                             feature_dict['logret1_xs']*1000,
                             feature_dict['logret2_xs']*1000,
                             feature_dict['book_dirvolume_xs'],
                          ], dim=2)
        x = x.to(device)
        x = x.reshape(-1,data_intervals_count,6)
#         input("GOT FEATURES")
        h_0 = torch.rand(self.repeated_lstm_cells, x.size(0), self.hidden_size) #hidden state
        c_0 = torch.rand(self.repeated_lstm_cells, x.size(0), self.hidden_size) #internal state
        h_0 = h_0.to(device)
        c_0 = c_0.to(device)
#         print(h_0.device, c_0.device, x.device)
        output, (hn, cn) = self.rnn(x, (h_0, c_0)) #lstm with input, hidden, and internal state
#         input("---- output got")
        hn = hn.reshape(-1, self.hidden_size*self.repeated_lstm_cells) #reshaping the data for Dense layer next
#         input("---- hn got")
        out = self.linear_stack(hn)
#         input("--- out got")
        return out
        

#### analyze the initial weights (or change them)

In [96]:
# # @torch.no_grad()
# def init_weights(m):
# #     print(m)
#     if type(m) == nn.Linear:
# #         m.weight.fill_(1.0)
#         torch.nn.init.xavier_uniform_(m.weight,gain=10)
#         m.bias.data.uniform_(-1,1)
# #     elif type(m) == nn.ReLU:
# #         print(m.data)
#     else:
#         print(type(m))
# #         print(m.weight)
# model.apply(init_weights)
# # for param in model.parameters():
# # #     print(param)
# #       print(param.data.size(), param.data)

### LEarning rate: our base line is 0.34 loss as that's what the optiver guys have when they use current 10 min realize vol and use it as target (copy to prediction). We create simplest neural network and work with learning rates to figure out what's best and when we see something in range of 0.35 then we've found good Learning rate
- #### SGD: 1e-7 works best
- #### ADAM: 1e-5, (NOTE: 1e-3 makes it behave dumb where some deep local minima gets stuck and produces constant output!)
- TODO: analyze that constant output phenomenon more

In [97]:
# learning_rate = 1e-4
# batch_size = 4096
# epochs = 100

# input_scaling = 1
# output_scaling = 1

# # optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
# optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, betas=(0.9, 0.999), eps=1e-8)
# strategyname = "ret1_n_ret2"
# summary_writer = SummaryWriter(f'../output/training_tensorboard/{strategyname}_scaleIn{input_scaling}Out{output_scaling}_{learning_rate}_{batch_size}')

In [117]:
model = None

### Learnings about training
- (non scaling)logreturns input and volatility output; non scaled makes the model predict constant output with no variety(close to 0 std dev)
- scaling input rids of variety issue, 
- scaling output makes the model start with low rmse initially so there's less ground to cover and we can iterate over ideas rapidly due to less epochs needed to achieve

In [None]:
# model = None
strategyname = "tradebook6fet_basic1_rnn1"

training_configs = []
learning_rates_to_try = [1e-3, 1e-4]
batch_sizes_to_try = [32, 64, 512]#,10000, 128]
# input_scalings_to_try = [1000]
# output_scalings_to_try = [10000]
for learning_rate in learning_rates_to_try:
    for batch_size in batch_sizes_to_try:
            training_configs.append({
                'learning_rate':learning_rate,
                'batch_size':batch_size,
                'input_scaling':1000,
                'output_scaling':10000,
            })

epochs = 500
for training_config in training_configs:
    
    learning_rate = training_config['learning_rate']
    batch_size = training_config['batch_size']
    input_scaling = training_config['input_scaling']
    output_scaling = training_config['output_scaling']
    # TRAINING SETUP
    
    #refresh the model

    del model
    torch.cuda.empty_cache()
    model = VolatilityLSTM()
#     model = NeuralNetwork()
    model.to(device)
    
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, betas=(0.9, 0.999), eps=1e-8)
    
    STRATEGY_NAME_WITH_ATTRS = f"{strategyname}_{learning_rate}_{batch_size}"
    summary_writer = SummaryWriter(f'../output/training_tensorboard/{STRATEGY_NAME_WITH_ATTRS}')
    
    # TRAINING SETUP DONE
    
    print("DEVICE:", device)
    dataset_size = len(dataset)
    train_size = int(0.8 * dataset_size)
    test_size = dataset_size - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])
    
    data_ohlc_sample_len = 1 # 1 for each of open high low close
    losses_train = []
    for t in range(epochs):
        print(f"Epoch {t+1}\n-------------------------------")
        print("----------", STRATEGY_NAME_WITH_ATTRS, "----------")

        dataloader_train = DataLoader(train_dataset, batch_size=batch_size,
                            shuffle=True, num_workers=0, pin_memory=True)
        model.train()
        
        for train_batch_idx, (Feature_X, feature_y) in enumerate(dataloader_train):

            

            y = feature_y * output_scaling 
            y = y.to(device)
            feature_y = feature_y.to(device)
            
            pred = model(Feature_X)
            pred.to(device)

            loss_orig = loss_fn_orig(y, pred)
#             print(Feature_X)
#             input()
#             print(loss_orig)
#             input()
            optimizer.zero_grad()
            loss_orig.backward()
            optimizer.step()


            losses_train.append(loss_orig.item())
#             print(X)
#             print(loss_orig.item())
# #             print()
#             input()
            # we want 5 spread out output per epoch
            if (t*int(train_size/batch_size) + train_batch_idx + 1) % int(train_size/5/batch_size) == 0:
                
                # NOTE: real loss is same as upscaled normalized loss as it's percentage loss (rmspe)
                prediction_variety = np.std((pred/output_scaling).reshape(-1).tolist()) * 100
                #NOTE: prediction variety is important as model sometimes predits a constant value! regardless of the input, then per batch variety is lowest(0 std dev)
#                 print("prediction variety",)
#                 print(pred.reshape(-1).tolist()[:7])
                
                summary_writer.add_scalar("Prediction Variety", prediction_variety, t*(train_size) + (train_batch_idx*batch_size))
                summary_writer.add_scalar("Training Loss", np.mean(losses_train), t*(train_size) + (train_batch_idx*batch_size))

                print("train:", np.mean(losses_train), f"[{train_batch_idx*batch_size:>5d}/{train_size:>5d}]")
                losses_train = []
                
        dataloader_test = DataLoader(test_dataset, batch_size=batch_size,
                                shuffle=True, num_workers=0, pin_memory=True)
        dataset_size = len(dataloader_test.dataset)
        model.eval()

        losses_test = []
        for _, (Feature_X, feature_y) in enumerate(dataloader_test):
            with torch.no_grad():

#                 X = torch.cat([Feature_X['logret1_1s']*input_scaling, 
#                            Feature_X['logret1_1s']*input_scaling,
#                           Feature_X['logret1_1s']*input_scaling,
#                           ], 1)

#                 X = X.reshape(-1,3,data_interval_len*data_ohlc_sample_len*1)
                
                y = feature_y * output_scaling

                
#                 X = X.to(device)
                y = y.to(device)
                pred = model(Feature_X)
                loss = loss_fn_orig(y, pred)
                losses_test.append(loss.item())


#                 summary_writer.add_scalar("Epoch Training Loss", np.mean(losses_train), (t+1)*train_size)
        summary_writer.add_scalar("Test Loss", np.mean(losses_test), t*(train_size) + (train_batch_idx*batch_size))
        print("train:", np.mean(losses_train), "test:", np.mean(losses_test), f"[{train_batch_idx*batch_size:>5d}/{train_size:>5d}]")
        losses_test = []
        if (t+1)%50==0:
            torch.save(model.state_dict(), os.path.join(MODEL_OUTPUT_DIRECTORY,f"{STRATEGY_NAME_WITH_ATTRS}_epoch_{t}_tloss_{loss:.4f}.pth"))
    
            

DEVICE: cuda:0
Epoch 1
-------------------------------
---------- tradebook6fet_basic1_rnn1_0.001_32 ----------


In [114]:
del model
torch.cuda.empty_cache()
import gc
gc.collect()

1633

In [None]:
torch.cuda.close()

In [None]:
torch.cuda.memory_allocated(device)/1024/1024/1024
# model.to("cpu")
# torch.cuda.memory_stats()

In [None]:
torch.cuda.init()