In [1]:
import numpy as np
import pandas as pd
import glob
import os
import gc

from joblib import Parallel, delayed

In [2]:
SEED = 42
BUCKET_WINDOWS2 = [(0, 100), (100, 200), (200, 300), (300, 400), (400, 500), (500, 600)]

data_dir = '../input/optiver-realized-volatility-prediction'
BOOK_TRAbIN_PATH = '../input/optiver-realized-volatility-prediction/book_train.parquet'
TRADE_TRAIN_PATH = '../input/optiver-realized-volatility-prediction/trade_train.parquet'
BOOK_TEST_PATH = '../input/optiver-realized-volatility-prediction/book_test.parquet'
TRADE_TEST_PATH = '../input/optiver-realized-volatility-prediction/trade_test.parquet'

In [3]:
# Function to calculate first WAP
def calc_wap1(df):
    wap = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']) / (df['bid_size1'] + df['ask_size1'])
    return wap

# Function to calculate second WAP
def calc_wap2(df):
    wap = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']) / (df['bid_size2'] + df['ask_size2'])
    return wap

def calc_wap3(df):
    wap = (df['bid_price1'] * df['bid_size1'] + df['ask_price1'] * df['ask_size1']) / (df['bid_size1'] + df['ask_size1'])
    return wap

def calc_wap4(df):
    wap = (df['bid_price2'] * df['bid_size2'] + df['ask_price2'] * df['ask_size2']) / (df['bid_size2'] + df['ask_size2'])
    return wap

# Function to calculate the log of the return
# Remember that logb(x / y) = logb(x) - logb(y)
def log_return(series):
    return np.log(series).diff()

# Calculate the realized volatility
def realized_volatility(series):
    return np.sqrt(np.sum(series**2))

In [4]:
def read_train_test():
    train = pd.read_csv('../input/optiver-realized-volatility-prediction/train.csv')
    # Create a key to merge with book and trade data
    train['row_id'] = train['stock_id'].astype(str) + '-' + train['time_id'].astype(str)
    print(f'Our training set has {train.shape[0]} rows')
    return train
def book_preprocessor(file_path):
    df = pd.read_parquet(file_path)
    #Calculate Wap
    df['wap'] = (df['bid_price1'] * df['ask_size1'] +
                df['ask_price1'] * df['bid_size1']) / (df['bid_size1']+ df['ask_size1'])
    
    df.loc[:,'log_return'] = log_return(df['wap'])
    df = df[~df['log_return'].isnull()]
    df['realized_volatility'] = realized_volatility(df['log_return'])

In [19]:
file_p = '../input/optiver-realized-volatility-prediction/book_train.parquet/stock_id=0'

In [42]:
def realized_volatility_per_time_id(file_path, prediction_column_name):
    df_book_data = pd.read_parquet(file_path)
    df_book_data['wap'] =(df_book_data['bid_price1'] * df_book_data['ask_size1']+df_book_data['ask_price1'] * df_book_data['bid_size1'])  / (
                                      df_book_data['bid_size1']+ df_book_data[
                                  'ask_size1'])
    df_book_data['log_return'] = df_book_data.groupby(['time_id'])['wap'].apply(log_return)
    df_book_data = df_book_data[~df_book_data['log_return'].isnull()]
    df_realized_vol_per_stock =  pd.DataFrame(df_book_data.groupby(['time_id'])['log_return'].agg(realized_volatility)).reset_index()
    df_realized_vol_per_stock = df_realized_vol_per_stock.rename(columns = {'log_return':prediction_column_name})
    stock_id = file_path.split('=')[1]
    df_realized_vol_per_stock['row_id'] = df_realized_vol_per_stock['time_id'].apply(lambda x:f'{stock_id}-{x}')
    return df_realized_vol_per_stock[['row_id',prediction_column_name]]

In [51]:
# Funtion to make preprocessing function in parallel (for each stock id)
def preprocessor(list_stock_ids, is_train = True):
    
    # Parrallel for loop
    def for_joblib(stock_id):
        # Train
        if is_train:
            file_path_book = data_dir + "/book_train.parquet/stock_id=" + str(stock_id)
#             file_path_trade = data_dir + "/trade_train.parquet/stock_id=" + str(stock_id)
        # Test
        else:
            file_path_book = data_dir + "/book_test.parquet/stock_id=" + str(stock_id)
            file_path_trade = data_dir + "/trade_test.parquet/stock_id=" + str(stock_id)
    
        # Preprocess book and trade data and merge them
        df_tmp = realized_volatility_per_time_id(file_path_book,'realized_volatility')
        
        # Return the merge dataframe
        return df_tmp
    
    # Use parallel api to call paralle for loop
    df = Parallel(n_jobs = -1, verbose = 1)(delayed(for_joblib)(stock_id) for stock_id in list_stock_ids)
    # Concatenate all the dataframes that return from Parallel
    df = pd.concat(df, ignore_index = True)
    return df

In [52]:
def processor(list_stock_ids,is_train = True):
    df = []
    for stock_id in list_stock_ids:
        file_path_book = data_dir + "/book_train.parquet/stock_id=" + str(stock_id)
        df_tmp = book_preprocessor(file_path_book)
        df.append(df_tmp)
#     df = pd.concat(df, ignore_index = True)
    return df

In [55]:
train =read_train_test()
train_stock_ids = train['stock_id'].unique()
train_ = preprocessor(train_stock_ids, is_train = True)

Our training set has 428932 rows


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 2 concurrent workers.
[Parallel(n_jobs=-1)]: Done  46 tasks      | elapsed:   45.2s
[Parallel(n_jobs=-1)]: Done 112 out of 112 | elapsed:  1.8min finished


In [56]:
train_

Unnamed: 0,row_id,realized_volatility
0,0-5,0.004499
1,0-11,0.001204
2,0-16,0.002369
3,0-31,0.002574
4,0-62,0.001894
...,...,...
428927,126-32751,0.003691
428928,126-32753,0.004104
428929,126-32758,0.003118
428930,126-32763,0.003661


In [57]:
train = train.merge(train_, on = ['row_id'], how = 'left')

In [58]:
train

Unnamed: 0,stock_id,time_id,target,row_id,realized_volatility
0,0,5,0.004136,0-5,0.004499
1,0,11,0.001445,0-11,0.001204
2,0,16,0.002168,0-16,0.002369
3,0,31,0.002195,0-31,0.002574
4,0,62,0.001747,0-62,0.001894
...,...,...,...,...,...
428927,126,32751,0.003461,126-32751,0.003691
428928,126,32753,0.003113,126-32753,0.004104
428929,126,32758,0.004070,126-32758,0.003118
428930,126,32763,0.003357,126-32763,0.003661


In [59]:
import pickle
f = open('Realized_vol.pkl','wb')
pickle.dump(train, f)
f.close()