In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/optiver-realized-volatility-prediction/sample_submission.csv
/kaggle/input/optiver-realized-volatility-prediction/train.csv
/kaggle/input/optiver-realized-volatility-prediction/test.csv
/kaggle/input/optiver-realized-volatility-prediction/trade_train.parquet/stock_id=97/888f813404d8417ca8d6b8aebd5f2951.parquet
/kaggle/input/optiver-realized-volatility-prediction/trade_train.parquet/stock_id=43/bb0efa57f511470e817880842e3e2afa.parquet
/kaggle/input/optiver-realized-volatility-prediction/trade_train.parquet/stock_id=21/1d8dc18ebfee47ffbb54b04e6afc0634.parquet
/kaggle/input/optiver-realized-volatility-prediction/trade_train.parquet/stock_id=72/60f62a03d8854605901dda072c84db39.parquet
/kaggle/input/optiver-realized-volatility-prediction/trade_train.parquet/stock_id=4/761268d671f9429abb29d9d2895e9bd2.parquet
/kaggle/input/optiver-realized-volatility-prediction/trade_train.parquet/stock_id=112/cd283097a5b54293ba400a19e811a7f9.parquet
/kaggle/input/optiver-realized-volatility-pr

In [2]:
import os
import glob
from joblib import Parallel, delayed
import pandas as pd
import numpy as np
import scipy as sc
from sklearn.model_selection import GroupKFold
from sklearn.model_selection import KFold
import lightgbm as lgb
import warnings
warnings.filterwarnings('ignore')
pd.set_option('max_columns', 300)

import matplotlib.pyplot as plt

In [3]:
# data directory
data_dir = '../input/optiver-realized-volatility-prediction/'

# Function to calculate first WAP
def calc_wap1(df):
    wap = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']) / (df['bid_size1'] + df['ask_size1'])
    return wap

# Function to calculate second WAP
def calc_wap2(df):
    wap = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']) / (df['bid_size2'] + df['ask_size2'])
    return wap

# Function to calculate the log of the return
# Remember that logb(x / y) = logb(x) - logb(y)
def log_return(series):
    return np.log(series).diff()

# Calculate the realized volatility
def realized_volatility(series):
    return np.sqrt(np.sum(series**2))

# Function to count unique elements of a series
def count_unique(series):
    return len(np.unique(series))

# Function to read our base train and test set
def read_train_test():
    train = pd.read_csv('../input/optiver-realized-volatility-prediction/train.csv')
    # Create a key to merge with book and trade data
    train['row_id'] = train['stock_id'].astype(str) + '-' + train['time_id'].astype(str)
    print(f'Our training set has {train.shape[0]} rows')
    return train

# Function to preprocess book data (for each stock id)
def book_preprocessor(file_path):
    df = pd.read_parquet(file_path)
    # Calculate Wap
    df['wap1'] = calc_wap1(df)
    df['wap2'] = calc_wap2(df)
    # Calculate log returns
    df['log_return1'] = df.groupby(['time_id'])['wap1'].apply(log_return)
    df['log_return2'] = df.groupby(['time_id'])['wap2'].apply(log_return)
    
    df['wap_mean'] = (df['wap1'] + df['wap2']) / 2
    # Calculate wap balance
    df['wap_balance'] = abs(df['wap1'] - df['wap2'])
    # Calculate spread
    df['price_spread'] = (df['ask_price1'] - df['bid_price1']) / ((df['ask_price1'] + df['bid_price1']) / 2)
    df['bid_spread'] = df['bid_price1'] - df['bid_price2']
    df['ask_spread'] = df['ask_price1'] - df['ask_price2']
    df['total_volume'] = (df['ask_size1'] + df['ask_size2']) + (df['bid_size1'] + df['bid_size2'])
    df['volume_imbalance'] = abs((df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2']))
    
    df['bas'] = (df[['ask_price1', 'ask_price2']].min(axis = 1)/ df[['bid_price1', 'bid_price2']].max(axis = 1) - 1) 
    df['h_spread_l1'] = df['ask_price1'] - df['bid_price1']
    df['h_spread_l2'] = df['ask_price2'] - df['bid_price2']
    df['log_return_bid_price1'] = np.log(df['bid_price1'].pct_change() + 1)
    df['log_return_ask_price1'] = np.log(df['ask_price1'].pct_change() + 1)
    df['log_return_bid_size1'] = np.log(df['bid_size1'].pct_change() + 1)
    df['log_return_ask_size1'] = np.log(df['ask_size1'].pct_change() + 1)
    df['log_ask_1_div_bid_1'] = np.log(df['ask_price1'] / df['bid_price1'])
    df['log_ask_1_div_bid_1_size'] = np.log(df['ask_size1'] / df['bid_size1'])
    df['log_return_bid_price2'] = np.log(df['bid_price2'].pct_change() + 1)
    df['log_return_ask_price2'] = np.log(df['ask_price2'].pct_change() + 1)
    df['log_return_bid_size2'] = np.log(df['bid_size2'].pct_change() + 1)
    df['log_return_ask_size2'] = np.log(df['ask_size2'].pct_change() + 1)
    df['log_ask_2_div_bid_2'] = np.log(df['ask_price2'] / df['bid_price2'])
    df['log_ask_2_div_bid_2_size'] = np.log(df['ask_size2'] / df['bid_size2'])
    
    
    # Dict for aggregations
    create_feature_dict = {
        # 测试1
        'wap1': [np.mean,np.max,np.std],#np.sum, 上次测试的结果
        'wap2': [np.mean,np.max,np.std],#np.sum, 上次测试的结果
        # np.mean, np.max 加入 sum
        #测试2
        'log_return1': [realized_volatility,np.sum], # np.sum, np.mean,np.std
        'log_return2': [realized_volatility,np.sum], # np.sum, np.mean,np.std
        'log_return_bid_price1':[realized_volatility,np.sum], # np.sum, np.mean,np.std
        'log_return_ask_price1':[realized_volatility,np.sum], # np.sum, np.mean,np.std
        'log_return_bid_size1':[realized_volatility,np.sum], # np.sum, np.mean,np.std
        'log_return_ask_size1':[realized_volatility,np.sum], # np.sum, np.mean,np.std
        'log_ask_1_div_bid_1':[realized_volatility,np.sum], # np.sum, np.mean,np.std
        'log_ask_1_div_bid_1_size':[realized_volatility,np.sum], # np.sum, np.mean,np.std
        'log_return_bid_price2':[realized_volatility,np.sum], # np.sum, np.mean,np.std
        'log_return_ask_price2':[realized_volatility,np.sum], # np.sum, np.mean,np.std
        'log_return_bid_size2':[realized_volatility,np.sum], # np.sum, np.mean,np.std
        'log_return_ask_size2':[realized_volatility,np.sum], # np.sum, np.mean,np.std
        'log_ask_2_div_bid_2':[realized_volatility,np.sum], # np.sum, np.mean,np.std
        'log_ask_2_div_bid_2_size':[realized_volatility,np.sum], # np.sum, np.mean,np.std
        #测试3
        'wap_balance': [np.mean,np.max,np.std,np.sum], #上次测试的结果, np.sum
        'wap_mean': [np.mean,np.max,np.std,np.sum],#上次测试的结果, np.sum
        'price_spread':[np.mean,np.max,np.std,np.sum],#上次测试的结果, np.sum
        'bid_spread':[np.mean,np.max,np.std,np.sum],#上次测试的结果, np.sum
        'ask_spread':[np.mean,np.max,np.std,np.sum],#上次测试的结果, np.sum
        'total_volume':[np.mean,np.max,np.std,np.sum],#上次测试的结果, np.sum
        'volume_imbalance':[np.mean,np.max,np.std,np.sum],#上次测试的结果, np.sum
        'bas':[np.mean,np.max,np.std,np.sum],#上次测试的结果, np.sum
        'h_spread_l1':[np.mean,np.max,np.std,np.sum],#上次测试的结果, np.sum
        'h_spread_l2':[np.mean,np.max,np.std,np.sum],#上次测试的结果, np.sum
         #np.mean, np.max 加入 sum
                       }
    # Function to get group stats for different windows (seconds in bucket)
    def get_stats_window(seconds_in_bucket, add_suffix = False):
        # Group by the window
        df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(create_feature_dict).reset_index()
        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        # Add a suffix to differentiate windows
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        return df_feature
    
    # Get the stats for different windows
    df_feature = get_stats_window(seconds_in_bucket = 0, add_suffix = False)
    df_feature_450 = get_stats_window(seconds_in_bucket = 450, add_suffix = True)
    df_feature_300 = get_stats_window(seconds_in_bucket = 300, add_suffix = True)
    df_feature_150 = get_stats_window(seconds_in_bucket = 150, add_suffix = True)
    
    # Merge all
    df_feature = df_feature.merge(df_feature_450, how = 'left', left_on = 'time_id_', right_on = 'time_id__450')
    df_feature = df_feature.merge(df_feature_300, how = 'left', left_on = 'time_id_', right_on = 'time_id__300')
    df_feature = df_feature.merge(df_feature_150, how = 'left', left_on = 'time_id_', right_on = 'time_id__150')
    # Drop unnecesary time_ids
    df_feature.drop(['time_id__450', 'time_id__300', 'time_id__150'], axis = 1, inplace = True)
    
    # Create row_id so we can merge
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['time_id_'].apply(lambda x: f'{stock_id}-{x}')
    df_feature.drop(['time_id_'], axis = 1, inplace = True)
    return df_feature

# Function to preprocess trade data (for each stock id)
def trade_preprocessor(file_path):
    df = pd.read_parquet(file_path)
    df['log_return'] = df.groupby('time_id')['price'].apply(log_return)
    
# Dict for aggregations
    create_feature_dict = {
        'log_return':[realized_volatility],
        'seconds_in_bucket':[count_unique],
        'size':[np.sum],
        'order_count':[np.mean],
    }
    
    
    # Function to get group stats for different windows (seconds in bucket)
    def get_stats_window(seconds_in_bucket, add_suffix = False):
        # Group by the window
        df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(create_feature_dict).reset_index()
        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        # Add a suffix to differentiate windows
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        return df_feature
    
    # Get the stats for different windows
    df_feature = get_stats_window(seconds_in_bucket = 0, add_suffix = False)
    df_feature_450 = get_stats_window(seconds_in_bucket = 450, add_suffix = True)
    df_feature_300 = get_stats_window(seconds_in_bucket = 300, add_suffix = True)
    df_feature_150 = get_stats_window(seconds_in_bucket = 150, add_suffix = True)

    # Merge all
    df_feature = df_feature.merge(df_feature_450, how = 'left', left_on = 'time_id_', right_on = 'time_id__450')
    df_feature = df_feature.merge(df_feature_300, how = 'left', left_on = 'time_id_', right_on = 'time_id__300')
    df_feature = df_feature.merge(df_feature_150, how = 'left', left_on = 'time_id_', right_on = 'time_id__150')
    # Drop unnecesary time_ids
    df_feature.drop(['time_id__450', 'time_id__300', 'time_id__150'], axis = 1, inplace = True)
    
    df_feature = df_feature.add_prefix('trade_')
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['trade_time_id_'].apply(lambda x:f'{stock_id}-{x}')
    df_feature.drop(['trade_time_id_'], axis = 1, inplace = True)
    return df_feature

# Function to get group stats for the stock_id and time_id
def get_time_stock(df):
    # Get realized volatility columns
    vol_cols = ['log_return1_realized_volatility', 'log_return2_realized_volatility', 'log_return1_realized_volatility_450', 'log_return2_realized_volatility_450', 
                'log_return1_realized_volatility_300', 'log_return2_realized_volatility_300', 'log_return1_realized_volatility_150', 'log_return2_realized_volatility_150', 
                'trade_log_return_realized_volatility', 'trade_log_return_realized_volatility_450', 'trade_log_return_realized_volatility_300', 'trade_log_return_realized_volatility_150']

    # Group by the stock id
    df_stock_id = df.groupby(['stock_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index()
    # Rename columns joining suffix
    df_stock_id.columns = ['_'.join(col) for col in df_stock_id.columns]
    df_stock_id = df_stock_id.add_suffix('_' + 'stock')

    # Group by the time id
    df_time_id = df.groupby(['time_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index()
    # Rename columns joining suffix
    df_time_id.columns = ['_'.join(col) for col in df_time_id.columns]
    df_time_id = df_time_id.add_suffix('_' + 'time')
    
    # Merge with original dataframe
    df = df.merge(df_stock_id, how = 'left', left_on = ['stock_id'], right_on = ['stock_id__stock'])
    df = df.merge(df_time_id, how = 'left', left_on = ['time_id'], right_on = ['time_id__time'])
    df.drop(['stock_id__stock', 'time_id__time'], axis = 1, inplace = True)
    return df
    
# Funtion to make preprocessing function in parallel (for each stock id)
def preprocessor(list_stock_ids, is_train = True):
    
    # Parrallel for loop
    def for_joblib(stock_id):
        # Train
        if is_train:
            file_path_book = data_dir + "book_train.parquet/stock_id=" + str(stock_id)
            file_path_trade = data_dir + "trade_train.parquet/stock_id=" + str(stock_id)
        # Test
        else:
            file_path_book = data_dir + "book_test.parquet/stock_id=" + str(stock_id)
            file_path_trade = data_dir + "trade_test.parquet/stock_id=" + str(stock_id)
    
        # Preprocess book and trade data and merge them
        df_tmp = pd.merge(book_preprocessor(file_path_book), trade_preprocessor(file_path_trade), on = 'row_id', how = 'left')
        
        # Return the merge dataframe
        return df_tmp
    
    # Use parallel api to call paralle for loop
    df = Parallel(n_jobs = -1, verbose = 1)(delayed(for_joblib)(stock_id) for stock_id in list_stock_ids)
    # Concatenate all the dataframes that return from Parallel
    df = pd.concat(df, ignore_index = True)
    return df

In [4]:
# Read train and test
train = read_train_test()

# Get unique stock ids 
train_stock_ids = train['stock_id'].unique()
# Preprocess them using Parallel and our single stock id functions
train_ = preprocessor(train_stock_ids, is_train = True)
train = train.merge(train_, on = ['row_id'], how = 'left')

# Get group stats of time_id and stock_id
train = get_time_stock(train)

Our training set has 428932 rows


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 2 concurrent workers.
[Parallel(n_jobs=-1)]: Done  46 tasks      | elapsed: 63.6min
[Parallel(n_jobs=-1)]: Done 112 out of 112 | elapsed: 154.2min finished


In [5]:
train.to_pickle('train.pkl')