In [32]:
from IPython.core.display import display, HTML

import pandas as pd
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import glob
import os
import gc

from joblib import Parallel, delayed
from tqdm import tqdm
from sklearn import preprocessing, model_selection
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import QuantileTransformer
from sklearn.metrics import r2_score

import matplotlib.pyplot as plt
# import seaborn as sns
import numpy.matlib
from sklearn.model_selection import train_test_split
from sklearn.cluster import KMeans


path_submissions = '/'

target_name = 'target'
scores_folds = {}

from google.colab import drive
drive.mount('/content/drive')
GOOGLE_DRIVE_PATH_POST_MYDRIVE = 'final_project'
GOOGLE_DRIVE_PATH = os.path.join('/content', 'drive', 'MyDrive', GOOGLE_DRIVE_PATH_POST_MYDRIVE)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [7]:
def read_train_test():
    train = pd.read_csv(GOOGLE_DRIVE_PATH + '/optiver-realized-volatility-prediction/train.csv')

    train_data, temp_data = train_test_split(train, test_size=0.3, random_state=42, stratify=train['stock_id'])

    # Then split the temporary set into 2/3 validation and 1/3 test
    val_data, test_data = train_test_split(temp_data, test_size=0.333, random_state=42, stratify=temp_data['stock_id'])

    # Create a key to merge with book and trade data
    train_data.loc[:, 'row_id'] = train_data['stock_id'].astype(str) + '-' + train_data['time_id'].astype(str)
    val_data.loc[:, 'row_id'] = val_data['stock_id'].astype(str) + '-' + val_data['time_id'].astype(str)
    test_data.loc[:, 'row_id'] = test_data['stock_id'].astype(str) + '-' + test_data['time_id'].astype(str)

    print(f'Original training set has {train.shape[0]} rows')
    print(f'Training set has {train_data.shape[0]} rows')
    print(f'Validation set has {val_data.shape[0]} rows')
    print(f'Test set has {test_data.shape[0]} rows')

    return train_data, val_data, test_data

In [8]:
train, val, test = read_train_test()

Original training set has 428932 rows
Training set has 300252 rows
Validation set has 85829 rows
Test set has 42851 rows


In [34]:
# data directory
data_dir = GOOGLE_DRIVE_PATH + '/optiver-realized-volatility-prediction/'

def calc_wap1(df):
    # Function to calculate first WAP
    wap = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']) / (df['bid_size1'] + df['ask_size1'])
    return wap

def calc_wap2(df):
    # Function to calculate second WAP
    wap = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']) / (df['bid_size2'] + df['ask_size2'])
    return wap

def log_return(series):
    # Function to calculate the log of the return
    return np.log(series).diff()

def realized_volatility(series):
    # Calculate the realized volatility
    return np.sqrt(np.sum(series**2))

def count_unique(series):
    # Function to count unique elements of a series
    return len(np.unique(series))

def book_preprocessor(file_path):
    # Function to preprocess book data (for each stock id)

    df = pd.read_parquet(file_path)

    # Calculate Wap
    df['wap1'] = calc_wap1(df)
    df['wap2'] = calc_wap2(df)

    # Calculate log returns
    df['log_return1'] = df.groupby(['time_id'])['wap1'].apply(log_return).reset_index()["wap1"]
    df['log_return2'] = df.groupby(['time_id'])['wap2'].apply(log_return).reset_index()["wap2"]

    # Calculate wap balance
    df['wap_balance'] = abs(df['wap1'] - df['wap2'])

    # Calculate spread
    df['price_spread'] = (df['ask_price1'] - df['bid_price1']) / ((df['ask_price1'] + df['bid_price1']) / 2)
    df['price_spread2'] = (df['ask_price2'] - df['bid_price2']) / ((df['ask_price2'] + df['bid_price2']) / 2)
    df['bid_spread'] = df['bid_price1'] - df['bid_price2']
    df['ask_spread'] = df['ask_price1'] - df['ask_price2']
    df["bid_ask_spread"] = abs(df['bid_spread'] - df['ask_spread'])
    df['total_volume'] = (df['ask_size1'] + df['ask_size2']) + (df['bid_size1'] + df['bid_size2'])
    df['volume_imbalance'] = abs((df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2']))

    # Dict for aggregations
    create_feature_dict = {
        'wap1': [np.sum, np.mean, np.std],
        'wap2': [np.sum, np.mean, np.std],
        'log_return1': [np.sum, realized_volatility, np.mean, np.std],
        'log_return2': [np.sum, realized_volatility, np.mean, np.std],
        'wap_balance': [np.sum, np.mean, np.std],
        'price_spread':[np.sum, np.mean, np.std],
        'price_spread2':[np.sum, np.mean, np.std],
        'bid_spread':[np.sum, np.mean, np.std],
        'ask_spread':[np.sum, np.mean, np.std],
        'total_volume':[np.sum, np.mean, np.std],
        'volume_imbalance':[np.sum, np.mean, np.std],
        "bid_ask_spread":[np.sum, np.mean, np.std],
    }

    def get_stats_window(seconds_in_bucket, add_suffix = False):
        # Function to get group stats for different windows (seconds in bucket)

        # Group by the window
        df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(create_feature_dict).reset_index()

        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]

        # Add a suffix to differentiate windows
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        return df_feature

    # Get the stats for different windows
    df_feature = get_stats_window(seconds_in_bucket = 0, add_suffix = False)
    df_feature_400 = get_stats_window(seconds_in_bucket = 400, add_suffix = True)
    df_feature_300 = get_stats_window(seconds_in_bucket = 300, add_suffix = True)
    df_feature_200 = get_stats_window(seconds_in_bucket = 200, add_suffix = True)

    # Merge all
    df_feature = df_feature.merge(df_feature_400, how = 'left', left_on = 'time_id_', right_on = 'time_id__400')
    df_feature = df_feature.merge(df_feature_300, how = 'left', left_on = 'time_id_', right_on = 'time_id__300')
    df_feature = df_feature.merge(df_feature_200, how = 'left', left_on = 'time_id_', right_on = 'time_id__200')

    # Drop unnecesary time_ids
    df_feature.drop(['time_id__400', 'time_id__300', 'time_id__200'], axis = 1, inplace = True)


    # Create row_id so we can merge
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['time_id_'].apply(lambda x: f'{stock_id}-{x}')
    df_feature.drop(['time_id_'], axis = 1, inplace = True)

    return df_feature


def trade_preprocessor(file_path):
    # Function to preprocess trade data (for each stock id)

    df = pd.read_parquet(file_path)
    df['log_return'] = df.groupby('time_id')['price'].apply(log_return).reset_index()["price"]

    # Dict for aggregations
    create_feature_dict = {
        'log_return':[realized_volatility],
        'seconds_in_bucket':[count_unique],
        'size':[np.sum, realized_volatility, np.mean, np.std, np.max, np.min],
        'order_count':[np.mean,np.sum,np.max],
    }

    def get_stats_window(seconds_in_bucket, add_suffix = False):
        # Function to get group stats for different windows (seconds in bucket)

        # Group by the window
        df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(create_feature_dict).reset_index()

        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]

        # Add a suffix to differentiate windows
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        return df_feature

    # Get the stats for different windows
    df_feature = get_stats_window(seconds_in_bucket = 0, add_suffix = False)
    df_feature_400 = get_stats_window(seconds_in_bucket = 400, add_suffix = True)
    df_feature_300 = get_stats_window(seconds_in_bucket = 300, add_suffix = True)
    df_feature_200 = get_stats_window(seconds_in_bucket = 200, add_suffix = True)

    def tendency(price, vol):
        df_diff = np.diff(price)
        val = (df_diff/price[1:])*100
        power = np.sum(val*vol[1:])
        return(power)

    lis = []
    for n_time_id in df['time_id'].unique():
        df_id = df[df['time_id'] == n_time_id]
        tendencyV = tendency(df_id['price'].values, df_id['size'].values)
        f_max = np.sum(df_id['price'].values > np.mean(df_id['price'].values))
        f_min = np.sum(df_id['price'].values < np.mean(df_id['price'].values))
        df_max =  np.sum(np.diff(df_id['price'].values) > 0)
        df_min =  np.sum(np.diff(df_id['price'].values) < 0)
        abs_diff = np.median(np.abs( df_id['price'].values - np.mean(df_id['price'].values)))
        energy = np.mean(df_id['price'].values**2)
        iqr_p = np.percentile(df_id['price'].values,75) - np.percentile(df_id['price'].values,25)
        abs_diff_v = np.median(np.abs( df_id['size'].values - np.mean(df_id['size'].values)))
        energy_v = np.sum(df_id['size'].values**2)
        iqr_p_v = np.percentile(df_id['size'].values,75) - np.percentile(df_id['size'].values,25)

        lis.append({'time_id':n_time_id,'tendency':tendencyV,'f_max':f_max,'f_min':f_min,'df_max':df_max,'df_min':df_min,
                   'abs_diff':abs_diff,'energy':energy,'iqr_p':iqr_p,'abs_diff_v':abs_diff_v,'energy_v':energy_v,'iqr_p_v':iqr_p_v})

    df_lr = pd.DataFrame(lis)


    df_feature = df_feature.merge(df_lr, how = 'left', left_on = 'time_id_', right_on = 'time_id')

    # Merge all
    df_feature = df_feature.merge(df_feature_400, how = 'left', left_on = 'time_id_', right_on = 'time_id__400')
    df_feature = df_feature.merge(df_feature_300, how = 'left', left_on = 'time_id_', right_on = 'time_id__300')
    df_feature = df_feature.merge(df_feature_200, how = 'left', left_on = 'time_id_', right_on = 'time_id__200')

    # Drop unnecesary time_ids
    df_feature.drop(['time_id__400', 'time_id__300', 'time_id__200','time_id'], axis = 1, inplace = True)
    df_feature = df_feature.add_prefix('trade_')
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['trade_time_id_'].apply(lambda x:f'{stock_id}-{x}')
    df_feature.drop(['trade_time_id_'], axis = 1, inplace = True)

    def order_sum(df, sec:str):
        new_col = 'size_tau' + sec
        bucket_col = 'trade_seconds_in_bucket_count_unique' + sec
        df[new_col] = np.sqrt(1/df[bucket_col])

        new_col2 = 'size_tau2' + sec
        order_col = 'trade_order_count_sum' + sec
        df[new_col2] = np.sqrt(1/df[order_col])

        if sec == '400_':
            df['size_tau2_d'] = df['size_tau2_400'] - df['size_tau2']



    for sec in ['','_200','_300','_400']:
        order_sum(df_feature, sec)

    df_feature['size_tau2_d'] = df_feature['size_tau2_400'] - df_feature['size_tau2']

    return df_feature


def get_time_stock(df):
    # Function to get group stats for the stock_id and time_id

    # Get realized volatility columns
    vol_cols = ['log_return1_realized_volatility', 'log_return2_realized_volatility', 'log_return1_realized_volatility_400', 'log_return2_realized_volatility_400',
                'log_return1_realized_volatility_300', 'log_return2_realized_volatility_300', 'log_return1_realized_volatility_200', 'log_return2_realized_volatility_200',
                'trade_log_return_realized_volatility', 'trade_log_return_realized_volatility_400', 'trade_log_return_realized_volatility_300', 'trade_log_return_realized_volatility_200']

    # Group by the stock id
    df_stock_id = df.groupby(['stock_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index()

    # Rename columns joining suffix
    df_stock_id.columns = ['_'.join(col) for col in df_stock_id.columns]
    df_stock_id = df_stock_id.add_suffix('_' + 'stock')

    # Group by the stock id
    df_time_id = df.groupby(['time_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index()

    # Rename columns joining suffix
    df_time_id.columns = ['_'.join(col) for col in df_time_id.columns]
    df_time_id = df_time_id.add_suffix('_' + 'time')

    # Merge with original dataframe
    df = df.merge(df_stock_id, how = 'left', left_on = ['stock_id'], right_on = ['stock_id__stock'])
    df = df.merge(df_time_id, how = 'left', left_on = ['time_id'], right_on = ['time_id__time'])
    df.drop(['stock_id__stock', 'time_id__time'], axis = 1, inplace = True)

    return df

def create_agg_features(train, val, test):

    corr = train[['stock_id', 'time_id', 'target']].pivot(index='time_id', columns='stock_id', values='target').corr()
    ids = corr.index
    kmeans = KMeans(n_clusters=7, random_state=0).fit(corr.values)
    l = []
    for n in range(7):
        l.append ( [ (x-1) for x in ( (ids+1)*(kmeans.labels_ == n)) if x > 0] )

    mat = []
    matTest = []
    matVal = []
    n = 0
    for ind in l:
        newDf = train.loc[train['stock_id'].isin(ind)]
        numerical_columns = newDf.select_dtypes(include=['number']).columns
        numerical_columns = [c for c in numerical_columns if c!='time_id']
        newDf = newDf.groupby(['time_id'])[numerical_columns].agg(np.nanmean)
        newDf.loc[:,'stock_id'] = str(n)+'c1'
        mat.append ( newDf )

        newDf = val.loc[val['stock_id'].isin(ind)]
        numerical_columns = newDf.select_dtypes(include=['number']).columns
        numerical_columns = [c for c in numerical_columns if c!='time_id']
        newDf = newDf.groupby(['time_id'])[numerical_columns].agg(np.nanmean)
        newDf.loc[:,'stock_id'] = str(n)+'c1'
        matVal.append ( newDf )

        newDf = test.loc[test['stock_id'].isin(ind) ]
        numerical_columns = newDf.select_dtypes(include=['number']).columns
        numerical_columns = [c for c in numerical_columns if c!='time_id']
        newDf = newDf.groupby(['time_id'])[numerical_columns].agg(np.nanmean)
        newDf.loc[:,'stock_id'] = str(n)+'c1'
        matTest.append ( newDf )

        n+=1

    mat1 = pd.concat(mat).reset_index()
    mat1.drop(columns=['target'],inplace=True)
    mat2 = pd.concat(matTest).reset_index()
    # mat2 = pd.concat([mat2,mat1.loc[mat1.time_id==5]])
    mat2.drop(columns=['target'],inplace=True)
    mat3 = pd.concat(matVal).reset_index()
    mat3.drop(columns=['target'],inplace=True)


    mat1 = mat1.pivot(index='time_id', columns='stock_id')
    mat1.columns = ["_".join(x) for x in mat1.columns.ravel()]
    mat1.reset_index(inplace=True)

    mat2 = mat2.pivot(index='time_id', columns='stock_id')
    mat2.columns = ["_".join(x) for x in mat2.columns.ravel()]
    mat2.reset_index(inplace=True)

    mat3 = mat3.pivot(index='time_id', columns='stock_id')
    mat3.columns = ["_".join(x) for x in mat3.columns.ravel()]
    mat3.reset_index(inplace=True)

    prefix = ['log_return1_realized_volatility', 'total_volume_mean', 'trade_size_mean', 'trade_order_count_mean','price_spread_mean','bid_spread_mean','ask_spread_mean',
              'volume_imbalance_mean', 'bid_ask_spread_mean','size_tau2']
    selected_cols=mat1.filter(regex='|'.join(f'^{x}.(0|1|3|4|6)c1' for x in prefix)).columns.tolist()
    selected_cols.append('time_id')

    train_m = pd.merge(train,mat1[selected_cols],how='left',on='time_id')
    test_m = pd.merge(test,mat2[selected_cols],how='left',on='time_id')
    val_m = pd.merge(val,mat3[selected_cols],how='left',on='time_id')

    # filling missing values with train means

    features = [col for col in train_m.columns.tolist() if col not in ['time_id','target','row_id']]
    train_m[features] = train_m[features].fillna(train_m[features].mean())
    test_m[features] = test_m[features].fillna(train_m[features].mean())
    val_m[features] = val_m[features].fillna(train_m[features].mean())

    return train_m, val_m, test_m


# def preprocessor(list_stock_ids, is_train = True):
def preprocessor(list_stock_ids):
    # Funtion to make preprocessing function in parallel (for each stock id)

    # Parrallel for loop
    def for_joblib(stock_id):
        # # Train
        # if is_train:
        #     file_path_book = data_dir + "book_train.parquet/stock_id=" + str(stock_id)
        #     file_path_trade = data_dir + "trade_train.parquet/stock_id=" + str(stock_id)
        # # Test
        # else:
        #     file_path_book = data_dir + "book_test.parquet/stock_id=" + str(stock_id)
        #     file_path_trade = data_dir + "trade_test.parquet/stock_id=" + str(stock_id)
        file_path_book = data_dir + "book_train.parquet/stock_id=" + str(stock_id)
        file_path_trade = data_dir + "trade_train.parquet/stock_id=" + str(stock_id)

        # Preprocess book and trade data and merge them
        df_tmp = pd.merge(book_preprocessor(file_path_book), trade_preprocessor(file_path_trade), on = 'row_id', how = 'left')
        print("Processed Data:", stock_id)
        # Return the merge dataframe
        return df_tmp

    # df_list = []
    # for stock_id in tqdm(list_stock_ids):
    #     df = for_joblib(stock_id)
    #     df_list.append(df)
    # df = pd.concat(df_list, ignore_index = True)

    # Use parallel api to call paralle for loop
    df = Parallel(n_jobs = -1, verbose = 1)(delayed(for_joblib)(stock_id) for stock_id in list_stock_ids)
    # Concatenate all the dataframes that return from Parallel
    df = pd.concat(df, ignore_index = True)

    return df

In [14]:
# Get unique stock ids
train_stock_ids = train['stock_id'].unique()

# Preprocess them using Parallel and our single stock id functions
data_total_ = preprocessor(train_stock_ids)
train = train.merge(data_total_, on = ['row_id'], how = 'left')

# Get unique stock ids
test_stock_ids = test['stock_id'].unique()

# Preprocess them using Parallel and our single stock id functions
# test_ = preprocessor(test_stock_ids, is_train = False)
test = test.merge(data_total_, on = ['row_id'], how = 'left')

# Get unique stock ids
val_stock_ids = val['stock_id'].unique()

# Preprocess them using Parallel and our single stock id functions
# test_ = preprocessor(test_stock_ids, is_train = False)
val = val.merge(data_total_, on = ['row_id'], how = 'left')

# Get group stats of time_id and stock_id
train = get_time_stock(train)
test = get_time_stock(test)
val = get_time_stock(val)

# Fill inf values
train.replace([np.inf, -np.inf], np.nan,inplace=True)
test.replace([np.inf, -np.inf], np.nan,inplace=True)
val.replace([np.inf, -np.inf], np.nan,inplace=True)

# Aggregating some features
# train, test = create_agg_features(train,test)

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 2 concurrent workers.
[Parallel(n_jobs=-1)]: Done  46 tasks      | elapsed: 22.0min
[Parallel(n_jobs=-1)]: Done 112 out of 112 | elapsed: 52.7min finished


In [17]:
val.head(2)

Unnamed: 0,stock_id,time_id,target,row_id,wap1_sum,wap1_mean,wap1_std,wap2_sum,wap2_mean,wap2_std,...,trade_log_return_realized_volatility_400_max_time,trade_log_return_realized_volatility_400_min_time,trade_log_return_realized_volatility_300_mean_time,trade_log_return_realized_volatility_300_std_time,trade_log_return_realized_volatility_300_max_time,trade_log_return_realized_volatility_300_min_time,trade_log_return_realized_volatility_200_mean_time,trade_log_return_realized_volatility_200_std_time,trade_log_return_realized_volatility_200_max_time,trade_log_return_realized_volatility_200_min_time
0,33,13980,0.003329,33-13980,255.724294,0.998923,0.00068,255.766557,0.999088,0.000831,...,0.00274,0.000432,0.001428,0.000595,0.003681,0.000593,0.001706,0.000722,0.004412,0.00069
1,76,6342,0.002004,76-6342,410.315216,1.000769,0.0004,410.320251,1.000781,0.000402,...,0.001705,0.000386,0.001122,0.000342,0.001992,0.000598,0.001251,0.000354,0.00209,0.000661


In [36]:
train_final, val_fianl, test_final = create_agg_features(train,val,test)



In [37]:
train_final.shape

(300252, 366)

In [38]:
val_fianl.shape

(85829, 366)

In [39]:
test_final.shape

(42851, 366)

In [41]:
train_final.head()

Unnamed: 0,stock_id,time_id,target,row_id,wap1_sum,wap1_mean,wap1_std,wap2_sum,wap2_mean,wap2_std,...,trade_order_count_mean_0c1,trade_order_count_mean_1c1,trade_order_count_mean_3c1,trade_order_count_mean_4c1,trade_order_count_mean_6c1,size_tau2_0c1,size_tau2_1c1,size_tau2_3c1,size_tau2_4c1,size_tau2_6c1
0,100,10929,0.002864,100-10929,490.360657,1.000736,0.001006,490.353912,1.000722,0.001034,...,4.128936,7.942029,5.039572,7.366906,4.776723,0.046628,0.021359,0.053327,0.03125,0.04394
1,46,26186,0.001352,46-26186,446.197205,1.000442,0.000423,446.178711,1.000401,0.000486,...,3.231785,3.647059,3.680582,4.607656,3.6387,0.070433,0.0635,0.07941,0.052782,0.076122
2,22,6481,0.001956,22-6481,221.99736,0.999988,0.000669,221.993454,0.99997,0.000675,...,3.550267,5.267327,4.860561,3.311111,3.577503,0.097955,0.060186,0.101148,0.081923,0.071674
3,53,15853,0.003208,53-15853,366.397583,1.003829,0.001355,366.388428,1.003804,0.001385,...,3.008775,3.796272,3.209341,4.607656,3.835324,0.072803,0.06426,0.078718,0.052782,0.060858
4,3,8613,0.004516,3-8613,338.011383,1.000034,0.000739,338.007172,1.000021,0.000781,...,3.112151,2.508772,5.346279,4.607656,3.573627,0.06819,0.083624,0.072808,0.052782,0.073457


In [None]:
total_size = train.shape[0]
train_size = int(total_size * 0.7)
validation_size = int(total_size * 0.2)
test_size = total_size - train_size - validation_size
