In [None]:
from google.colab import drive
drive.mount('/content/gdrive/', force_remount=True)

Mounted at /content/gdrive/


In [None]:
!unzip gdrive/My\ Drive/optiver-realized-volatility-prediction.zip > /dev/null

In [None]:
import pandas as pd
import numpy as np

def read_train_test():

    data = pd.read_csv('/content/optiver-realized-volatility-prediction/train.csv')

    unique_stock_ids = data['stock_id'].unique()
    gk = data.groupby('stock_id')

    train=pd.DataFrame()
    test=pd.DataFrame()
    for id in unique_stock_ids:
       gkk=gk.get_group(id)
       # Creating a dataframe with 70%
       # values of original dataframe
       train_df = gkk.sample(frac = 0.7)
       
       # Creating dataframe with
       # rest of the 30% values
       test_df = gkk.drop(train_df.index)
 
       train=pd.concat([train,train_df])
       test=pd.concat([test,test_df])

    # Create a key to merge with book and trade data
    train['row_id'] = train['stock_id'].astype(str) + '-' + train['time_id'].astype(str)
    test['row_id'] = test['stock_id'].astype(str) + '-' + test['time_id'].astype(str)

    return train,test

In [None]:
train,test=read_train_test()

In [None]:
train.to_csv('train_org.csv', encoding='utf-8', index=False)
test.to_csv('test_org.csv', encoding='utf-8', index=False)

In [None]:
test


Unnamed: 0,stock_id,time_id,target,row_id
2,0,16,0.002168,0-16
3,0,31,0.002195,0-31
6,0,97,0.009388,0-97
7,0,103,0.004120,0-103
16,0,169,0.003365,0-169
...,...,...,...,...
428923,126,32739,0.002529,126-32739
428924,126,32746,0.009973,126-32746
428926,126,32750,0.003350,126-32750
428927,126,32751,0.003461,126-32751


In [None]:
train

Unnamed: 0,stock_id,time_id,target,row_id
2346,0,19386,0.001140,0-19386
726,0,5916,0.004280,0-5916
630,0,5173,0.005571,0-5173
1949,0,16147,0.004431,0-16147
261,0,2119,0.003451,0-2119
...,...,...,...,...
427348,126,18514,0.005363,126-18514
426382,126,10806,0.008255,126-10806
428883,126,32330,0.003487,126-32330
426480,126,11655,0.006074,126-11655


In [None]:
import numpy as np
from typing import Any, Dict, List, Tuple
#Functions to define weighted average prices
def calc_wap1(df):

  wap=(df["bid_price1"] * df['ask_size1'] + df["ask_price1"]*df["bid_size1"])/(df['ask_size1']+df['bid_size1'])

  return wap

def calc_wap2(df):

  wap=(df["bid_price2"] * df['ask_size2'] + df["ask_price2"]*df["bid_size2"])/(df['ask_size2']+df['bid_size2'])

  return wap

def calc_wap3(df):

  wap=(df["bid_price1"] * df['bid_size1'] + df["ask_price1"]*df["ask_size1"])/(df['ask_size1']+df['bid_size1'])

  return wap

def calc_wap4(df):

  wap=(df["bid_price2"] * df['bid_size2'] + df["ask_price2"]*df["ask_size2"])/(df['ask_size2']+df['bid_size2'])

  return wap

def log_return(series):
   return np.log(series).diff()

def realised_volatility(series):
   return np.sqrt(np.sum(series**2)) 

def count_unique(series):
    return len(np.unique(series))


In [None]:
def book_preprocessor(file_path):
   
    #print("stock_id=",file_path.split('=')[1])
    df = pd.read_parquet(file_path)

   #calculating weighted average prices
    df['wap1'] = calc_wap1(df)
    df['wap2'] = calc_wap2(df)
    df['wap3'] = calc_wap3(df)
    df['wap4'] = calc_wap4(df)

   #calculating log return(stock return) values
    df['log_return1']=df.groupby(['time_id'])['wap1'].apply(log_return)
    df['log_return2']=df.groupby(['time_id'])['wap2'].apply(log_return)
    df['log_return3']=df.groupby(['time_id'])['wap3'].apply(log_return)
    df['log_return4']=df.groupby(['time_id'])['wap4'].apply(log_return)
   
    #calculating wap balance
    df['wap_balance']=abs(df['wap1']-df['wap2'])

    #calculating differences
    df['price_spread1']=(df['ask_price1']-df['bid_price1'])/(df['ask_price1']+df['bid_price1'])
    df['price_spread2']=(df['ask_price2']-df['bid_price2'])/(df['ask_price2']+df['bid_price2'])
    df['bid_spread']=df['bid_price1']-df['bid_price2']
    df['ask_spread']=df['ask_price1']-df['ask_price2']
    df['bid_ask_spread']=df['bid_spread']-df['ask_spread']
    df['total_volume']=df['bid_size1']+df['bid_size1']+df['ask_size1']+df['ask_size2']
    df['volume_imbalance']=abs((df['bid_size1']+df['bid_size1'])-(df['ask_size1']+df['ask_size2']))



    #creating dictionary for aggregating the values
    feature_dict= {
        'wap1':[np.sum,np.std],
        'wap2':[np.sum,np.std],
        'wap3':[np.sum,np.std],
        'wap4':[np.sum,np.std],
        'log_return1':[realised_volatility],
        'log_return2':[realised_volatility],
        'log_return3':[realised_volatility],
        'log_return4':[realised_volatility],
        'wap_balance':  [np.sum,np.max],
        'price_spread1':[np.sum,np.max],
        'price_spread2':[np.sum,np.max],
        'bid_spread':[np.sum,np.max],
        'ask_spread':[np.sum,np.max],
        'bid_ask_spread':[np.sum,np.max],
        'total_volume':[np.sum,np.max],
        'volume_imbalance':[np.sum,np.max]
    }

    feature_dict_time={
        'log_return1':[realised_volatility],
        'log_return2':[realised_volatility],
        'log_return3':[realised_volatility],
        'log_return4':[realised_volatility]
    }
    
    #getting statistics for each group for different windows(seconds_in_bucket)
    def get_stats_window(feature_dict,seconds_in_bucket,suffix=False):
        df_feature =df[df['seconds_in_bucket']>=seconds_in_bucket].groupby('time_id').agg(feature_dict).reset_index()
        #renaming the  columns
        df_feature.columns =['_'.join(col) for col in df_feature.columns]
        #adding the suffix for differentiating windows
        if suffix:
          df_feature=df_feature.add_suffix('_'+str(seconds_in_bucket))

        return df_feature
    

    df_feature=get_stats_window(feature_dict,seconds_in_bucket=0,suffix=False)
    df_feature_500=get_stats_window(feature_dict_time,seconds_in_bucket=500,suffix=True)
    df_feature_400=get_stats_window(feature_dict_time,seconds_in_bucket=400,suffix=True)
    df_feature_300=get_stats_window(feature_dict_time,seconds_in_bucket=300,suffix=True)
    df_feature_200=get_stats_window(feature_dict_time,seconds_in_bucket=200,suffix=True)
    df_feature_100=get_stats_window(feature_dict_time,seconds_in_bucket=100,suffix=True)

    #merging 
    df_feature=df_feature.merge(df_feature_500,how='left',left_on='time_id_',right_on='time_id__500')
    df_feature=df_feature.merge(df_feature_400,how='left',left_on='time_id_',right_on='time_id__400')
    df_feature=df_feature.merge(df_feature_300,how='left',left_on='time_id_',right_on='time_id__300')
    df_feature=df_feature.merge(df_feature_200,how='left',left_on='time_id_',right_on='time_id__200')
    df_feature=df_feature.merge(df_feature_100,how='left',left_on='time_id_',right_on='time_id__100')

    #dropping unnecessary time_id columns
    df_feature.drop(['time_id__500','time_id__400', 'time_id__300', 'time_id__200','time_id__100'],axis=1,inplace=True)

    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['time_id_'].apply(lambda x: f'{stock_id}-{x}')
    df_feature.drop(['time_id_'], axis = 1, inplace = True)
    return df_feature

In [None]:
def trade_preprocessor(file_path):
    df = pd.read_parquet(file_path)
    #calculating log return(stock return) values
    df['log_return']=df.groupby(['time_id'])['price'].apply(log_return)
    df['amount']=df['price']*df['size']

    #creating dictionaries for aggregating
    feature_dict= {
            'log_return':[realised_volatility],
            'seconds_in_bucket':[count_unique],
            'size':[np.sum,np.max,np.min],
            'order_count':[np.sum,np.max],
            'amount':[np.sum,np.max,np.min]
    }

    feature_dict_time={
        'log_return':[realised_volatility],
        'seconds_in_bucket':[count_unique],
        'size':[np.sum],
        'order_count':[np.sum]
    }

    #getting statistics for each group for different windows(seconds_in_bucket)
    def get_stats_window(feature_dict,seconds_in_bucket,suffix=False):
        df_feature =df[df['seconds_in_bucket']>=seconds_in_bucket].groupby('time_id').agg(feature_dict).reset_index()
        #renaming the  columns
        df_feature.columns =['_'.join(col) for col in df_feature.columns]
        #adding the suffix for differentiating windows
        if suffix:
          df_feature=df_feature.add_suffix('_'+str(seconds_in_bucket))

        return df_feature

    df_feature=get_stats_window(feature_dict,seconds_in_bucket=0,suffix=False)
    df_feature_500=get_stats_window(feature_dict_time,seconds_in_bucket=500,suffix=True)
    df_feature_400=get_stats_window(feature_dict_time,seconds_in_bucket=400,suffix=True)
    df_feature_300=get_stats_window(feature_dict_time,seconds_in_bucket=300,suffix=True)
    df_feature_200=get_stats_window(feature_dict_time,seconds_in_bucket=200,suffix=True)
    df_feature_100=get_stats_window(feature_dict_time,seconds_in_bucket=100,suffix=True)
     
    def tendency(price,vol):
       df_diff =np.diff(price)
       val = (df_diff/price[1:])*100
       power =np.sum(val*vol[1:])      
       return power

    list_features=[]
    for time_id_n in df['time_id'].unique():
      df_id=df[df['time_id']== time_id_n]
      tendencyV=tendency(df_id['price'].values,df_id['size'].values)
      f_max=np.sum(df_id['price'].values > np.mean(df_id['price'].values))
      f_min=np.sum(df_id['price'].values < np.mean(df_id['price'].values))
      df_max=np.sum(np.diff(df_id['price'].values)>0)
      df_min=np.sum(np.diff(df_id['price'].values)<0)

      abs_diff_p = np.median(abs(df_id['price'].values - np.mean(df_id['price'].values)))
      energy_p=np.mean(df_id['price'].values**2)
      inter_quartile_p=np.percentile(df_id['price'].values,75)-np.percentile(df_id['price'].values,25)

      abs_diff_v = np.median(abs(df_id['size'].values - np.mean(df_id['size'].values)))
      energy_v=np.mean(df_id['size'].values**2)
      inter_quartile_v=np.percentile(df_id['size'].values,75)-np.percentile(df_id['size'].values,25)


      list_features.append({ 'time_id':time_id_n, 'tendency':tendencyV, 'f_max':f_max,'f_min':f_min,'df_max':df_max,'df_min':df_min,
                            'abs_diff_p':abs_diff_p,'energy_p':energy_p,'inter_quartile_p':inter_quartile_p,'abs_diff_v':abs_diff_v,
                            'energy_v':energy_v,'inter_quartile_v':inter_quartile_v })
      
    df_list =pd.DataFrame(list_features)
    df_feature=df_feature.merge(df_list,how='left',left_on='time_id_',right_on='time_id')


    #merging 
    df_feature=df_feature.merge(df_feature_500,how='left',left_on='time_id_',right_on='time_id__500')
    df_feature=df_feature.merge(df_feature_400,how='left',left_on='time_id_',right_on='time_id__400')
    df_feature=df_feature.merge(df_feature_300,how='left',left_on='time_id_',right_on='time_id__300')
    df_feature=df_feature.merge(df_feature_200,how='left',left_on='time_id_',right_on='time_id__200')
    df_feature=df_feature.merge(df_feature_100,how='left',left_on='time_id_',right_on='time_id__100')

    #dropping unnecessary time_id columns
    df_feature.drop(['time_id__500','time_id__400', 'time_id__300', 'time_id__200','time_id__100'],axis=1,inplace=True)

    df_feature = df_feature.add_prefix('trade_')
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['trade_time_id_'].apply(lambda x: f'{stock_id}-{x}')
    df_feature.drop(['trade_time_id_'], axis = 1, inplace = True)

    return df_feature

In [None]:
from joblib import Parallel,delayed
def preprocessor(list_stock_ids, is_train = True):
  # Parrallel for loop
    def for_joblib(stock_id):
        data_dir='/content/optiver-realized-volatility-prediction/'
        book_df_train=pd.DataFrame()
        trade_df_train=pd.DataFrame()
       
        file_path_book = data_dir + "book_train.parquet/stock_id=" +str(stock_id)
        file_path_trade = data_dir + "trade_train.parquet/stock_id=" +str(stock_id)

        if(is_train== True):
              book_df= book_preprocessor(file_path_book)
              trade_df= trade_preprocessor(file_path_trade)
              df_tmp = pd.merge(book_df, trade_df, on = 'row_id', how = 'left')
        else:
              book_df_test= book_preprocessor(file_path_book)
              trade_df_test= trade_preprocessor(file_path_trade) 
              df_tmp = pd.merge(book_df_test, trade_df_test, on = 'row_id', how = 'left')
        
        return df_tmp


    df = Parallel(n_jobs = None, verbose = 1)(delayed(for_joblib)(stock_id)for stock_id in list_stock_ids)
    #df= (for_joblib(stock_id) for stock_id in list_stock_ids)

    df = pd.concat(df, ignore_index = True)
    #print()
    return df

In [None]:
def get_time_stock(df):
    vol_cols = ['log_return1_realised_volatility', 'log_return2_realised_volatility', 'log_return1_realised_volatility_400', 'log_return2_realised_volatility_400', 
                'log_return1_realised_volatility_300', 'log_return2_realised_volatility_300', 'log_return1_realised_volatility_200', 'log_return2_realised_volatility_200', 
                'trade_log_return_realised_volatility', 'trade_log_return_realised_volatility_400', 'trade_log_return_realised_volatility_300', 'trade_log_return_realised_volatility_200']


    # Group by the stock id
    df_stock_id = df.groupby(['stock_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index()
    # Rename columns joining suffix
    df_stock_id.columns = ['_'.join(col) for col in df_stock_id.columns]
    df_stock_id = df_stock_id.add_suffix('_' + 'stock')

    # Group by the stock id
    df_time_id = df.groupby(['time_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index()
    # Rename columns joining suffix
    df_time_id.columns = ['_'.join(col) for col in df_time_id.columns]
    df_time_id = df_time_id.add_suffix('_' + 'time')
    
    # Merge with original dataframe
    df = df.merge(df_stock_id, how = 'left', left_on = ['stock_id'], right_on = ['stock_id__stock'])
    df = df.merge(df_time_id, how = 'left', left_on = ['time_id'], right_on = ['time_id__time'])
    df.drop(['stock_id__stock', 'time_id__time'], axis = 1, inplace = True)
    
    return df

In [None]:

# replace by order sum (tau)
def add_tau_feature(
    train: pd.DataFrame, test: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    train["size_tau"] = np.sqrt(1 / train["trade_seconds_in_bucket_count_unique"])
    test["size_tau"] = np.sqrt(1 / test["trade_seconds_in_bucket_count_unique"])
    # train['size_tau_450'] = np.sqrt( 1/ train['trade_seconds_in_bucket_count_unique_450'] )
    # test['size_tau_450'] = np.sqrt( 1/ test['trade_seconds_in_bucket_count_unique_450'] )
    train["size_tau_400"] = np.sqrt(
        1 / train["trade_seconds_in_bucket_count_unique_400"]
    )
    test["size_tau_400"] = np.sqrt(1 / test["trade_seconds_in_bucket_count_unique_400"])
    train["size_tau_300"] = np.sqrt(
        1 / train["trade_seconds_in_bucket_count_unique_300"]
    )
    test["size_tau_300"] = np.sqrt(1 / test["trade_seconds_in_bucket_count_unique_300"])
    # train['size_tau_150'] = np.sqrt( 1/ train['trade_seconds_in_bucket_count_unique_150'] )
    # test['size_tau_150'] = np.sqrt( 1/ test['trade_seconds_in_bucket_count_unique_150'] )
    train["size_tau_200"] = np.sqrt(
        1 / train["trade_seconds_in_bucket_count_unique_200"]
    )
    test["size_tau_200"] = np.sqrt(1 / test["trade_seconds_in_bucket_count_unique_200"])
    train["size_tau2"] = np.sqrt(1 / train["trade_order_count_sum"])
    test["size_tau2"] = np.sqrt(1 / test["trade_order_count_sum"])
    # train['size_tau2_450'] = np.sqrt( 0.25/ train['trade_order_count_sum'] )
    # test['size_tau2_450'] = np.sqrt( 0.25/ test['trade_order_count_sum'] )
    train["size_tau2_400"] = np.sqrt(0.33 / train["trade_order_count_sum"])
    test["size_tau2_400"] = np.sqrt(0.33 / test["trade_order_count_sum"])
    train["size_tau2_300"] = np.sqrt(0.5 / train["trade_order_count_sum"])
    test["size_tau2_300"] = np.sqrt(0.5 / test["trade_order_count_sum"])
    # train['size_tau2_150'] = np.sqrt( 0.75/ train['trade_order_count_sum'] )
    # test['size_tau2_150'] = np.sqrt( 0.75/ test['trade_order_count_sum'] )
    train["size_tau2_200"] = np.sqrt(0.66 / train["trade_order_count_sum"])
    test["size_tau2_200"] = np.sqrt(0.66 / test["trade_order_count_sum"])

    # delta tau
    train["size_tau2_d"] = train["size_tau2_400"] - train["size_tau2"]
    test["size_tau2_d"] = test["size_tau2_400"] - test["size_tau2"]

    return train, test

In [None]:
from sklearn.cluster import KMeans
from tqdm import tqdm
def create_agg_features(
    train: pd.DataFrame, test: pd.DataFrame, train_p:pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    # Making agg features
    #train_p = pd.read_csv(path + "train.csv")
    train_p = train_p.pivot(index="time_id", columns="stock_id", values="target")
    corr = train_p.corr()
    ids = corr.index
    kmeans = KMeans(n_clusters=7, random_state=0).fit(corr.values)
    indexes = [
        [(x - 1) for x in ((ids + 1) * (kmeans.labels_ == n)) if x > 0]
        for n in tqdm(range(7))
    ]
   
    mat = []
    mat_test = []
    n = 0
    for ind in tqdm(indexes):
        new_df = train.loc[train["stock_id"].isin(ind)]
        new_df = new_df.groupby(["time_id"]).agg(np.nanmean)
        new_df.loc[:, "stock_id"] = str(n) + "c1"
        mat.append(new_df)
        new_df = test.loc[test["stock_id"].isin(ind)]
        new_df = new_df.groupby(["time_id"]).agg(np.nanmean)
        new_df.loc[:, "stock_id"] = str(n) + "c1"
        mat_test.append(new_df)
        n += 1

    mat1 = pd.concat(mat).reset_index()
    mat1.drop(columns=["target"], inplace=True)

    mat2 = pd.concat(mat_test).reset_index()
    #mat2 = pd.concat([mat2, mat1.loc[mat1.time_id == 5]])

    mat1 = mat1.pivot(index="time_id", columns="stock_id")
    mat1.columns = ["_".join(x) for x in tqdm(mat1.columns.tolist())]
    mat1.reset_index(inplace=True)

    mat2 = mat2.pivot(index="time_id", columns="stock_id")
    mat2.columns = ["_".join(x) for x in tqdm(mat2.columns.tolist())]
    mat2.reset_index(inplace=True)
    prefix = [
        "log_return1_realised_volatility",
        "total_volume_sum",
        "trade_size_sum",
        "trade_order_count_sum",
        "price_spread_sum",
        "bid_spread_sum",
        "ask_spread_sum",
        "volume_imbalance_sum",
        "bid_ask_spread_sum",
        "size_tau2",
    ]
    selected_cols = mat1.filter(
        regex="|".join(f"^{x}.(0|1|3|4|6)c1" for x in tqdm(prefix))
    ).columns.tolist()
    selected_cols.append("time_id")
    train_m = pd.merge(train, mat1[selected_cols], how="left", on="time_id")
    test_m = pd.merge(test, mat2[selected_cols], how="left", on="time_id")

    # filling missing values with train means
    features = [
        col
        for col in train_m.columns.tolist()
        if col not in ["time_id", "target", "row_id"]
    ]

    train_m[features] = train_m[features].fillna(train_m[features].mean())
    test_m[features] = test_m[features].fillna(train_m[features].mean())

    return train_m, test_m

In [None]:
import os
import gc

def agg_features():
    train,test=read_train_test()
    train_p=train
    # Get unique stock ids 
    train_stock_ids = train['stock_id'].unique()
    print(train_stock_ids)
    # Preprocess them using Parallel and our single stock id functions
    train_df=  preprocessor(train_stock_ids, is_train = True)
    train =  train.merge(train_df, on = ['row_id'], how = 'left')
    train =get_time_stock(train)

    # Get unique stock ids 
    #test_stock_ids = test['stock_id'].unique()
    # Preprocess them using Parallel and our single stock id functions
    #test_df = preprocessor(test_stock_ids, is_train = False)
    test = test.merge(train_df, on = ['row_id'], how = 'left')
    test=get_time_stock(test)

    print(f"Before Train Features: {train.shape}")
    print(f"Before Test Features: {test.shape}")
    train, test = add_tau_feature(train, test)
    print(f"Before Train Features: {train.shape}")
    print(f"Before Test Features: {test.shape}")
    train, test = create_agg_features(train, test, train_p)
    print(f"After Train Features: {train.shape}")
    print(f"After Test Features: {test.shape}")
    
    return train,test,train_df

In [None]:
train,test,df_total = agg_features()

[  0   1   2   3   4   5   6   7   8   9  10  11  13  14  15  16  17  18
  19  20  21  22  23  26  27  28  29  30  31  32  33  34  35  36  37  38
  39  40  41  42  43  44  46  47  48  50  51  52  53  55  56  58  59  60
  61  62  63  64  66  67  68  69  70  72  73  74  75  76  77  78  80  81
  82  83  84  85  86  87  88  89  90  93  94  95  96  97  98  99 100 101
 102 103 104 105 107 108 109 110 111 112 113 114 115 116 118 119 120 122
 123 124 125 126]


[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done 112 out of 112 | elapsed: 54.9min finished


Before Train Features: (300251, 190)
Before Test Features: (128681, 190)
Before Train Features: (300251, 199)
Before Test Features: (128681, 199)


100%|██████████| 7/7 [00:00<00:00, 1774.13it/s]
100%|██████████| 7/7 [00:01<00:00,  4.75it/s]
100%|██████████| 1365/1365 [00:00<00:00, 1187559.63it/s]
100%|██████████| 1372/1372 [00:00<00:00, 900843.00it/s]
100%|██████████| 10/10 [00:00<00:00, 107546.26it/s]


After Train Features: (300251, 244)
After Test Features: (128681, 244)


In [None]:
train.to_csv('train_final.csv', encoding='utf-8', index=False)
test.to_csv('test_final.csv', encoding='utf-8', index=False)

In [None]:
length=test.shape

In [None]:
length[0]

128681

In [None]:
train1=train[:length[0]]

In [None]:
train2=train[length[0]:2*length[0]]