In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
import glob
from joblib import Parallel, delayed
import pandas as pd
import numpy as np
import scipy as sc
from sklearn.model_selection import KFold
import lightgbm as lgb
import warnings
warnings.filterwarnings('ignore')
pd.set_option('max_columns', 300)
from sklearn.model_selection import GroupKFold

In [None]:
SEED = 200
TRIALS =20

In [None]:
kind = "k"
n_fold = 5
fileName = f"{kind}{n_fold}"

In [None]:
# data directory
data_dir = '../input/optiver-realized-volatility-prediction/'

# Function to calculate first WAP
def calc_wap1(df):
    wap = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']) / (df['bid_size1'] + df['ask_size1'])
    return wap

# Function to calculate second WAP
def calc_wap2(df):
    wap = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']) / (df['bid_size2'] + df['ask_size2'])
    return wap

# Function to calculate the log of the return
# Remember that logb(x / y) = logb(x) - logb(y)
def log_return(series):
    return np.log(series).diff()

# Calculate the realized volatility
def realized_volatility(series):
    return np.sqrt(np.sum(series**2))

# Function to count unique elements of a series
def count_unique(series):
    return len(np.unique(series))

In [None]:
# Function to read our base train and test set
def read_train_test():
    train = pd.read_csv('../input/optiver-realized-volatility-prediction/train.csv')
    test = pd.read_csv('../input/optiver-realized-volatility-prediction/test.csv')
    # Create a key to merge with book and trade data
    train['row_id'] = train['stock_id'].astype(str) + '-' + train['time_id'].astype(str)
    test['row_id'] = test['stock_id'].astype(str) + '-' + test['time_id'].astype(str)
    print(f'Our training set has {train.shape[0]} rows')
    return train, test

In [None]:
# Function to preprocess book data (for each stock id)
def book_preprocessor(file_path):
    df = pd.read_parquet(file_path)
    # Calculate Wap
    df['wap1'] = calc_wap1(df)
    df['wap2'] = calc_wap2(df)
    # Calculate log returns
    df['log_return1'] = df.groupby(['time_id'])['wap1'].apply(log_return)
    df['log_return2'] = df.groupby(['time_id'])['wap2'].apply(log_return)
    
    df['wap_mean'] = (df['wap1'] + df['wap2']) / 2
    # Calculate wap balance
    df['wap_balance'] = abs(df['wap1'] - df['wap2'])
    # Calculate spread
    df['price_spread'] = (df['ask_price1'] - df['bid_price1']) / ((df['ask_price1'] + df['bid_price1']) / 2)
    df['bid_spread'] = df['bid_price1'] - df['bid_price2']
    df['ask_spread'] = df['ask_price1'] - df['ask_price2']
    df['total_volume'] = (df['ask_size1'] + df['ask_size2']) + (df['bid_size1'] + df['bid_size2'])
    df['volume_imbalance'] = abs((df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2']))
    
    df['bas'] = (df[['ask_price1', 'ask_price2']].min(axis = 1)/ df[['bid_price1', 'bid_price2']].max(axis = 1) - 1) 
    df['h_spread_l1'] = df['ask_price1'] - df['bid_price1']
    df['h_spread_l2'] = df['ask_price2'] - df['bid_price2']
    
    
    df['log_return_bid_price1'] = np.log(df['bid_price1'].pct_change() + 1)
    df['log_return_ask_price1'] = np.log(df['ask_price1'].pct_change() + 1)
    df['log_return_bid_size1'] = np.log(df['bid_size1'].pct_change() + 1)
    df['log_return_ask_size1'] = np.log(df['ask_size1'].pct_change() + 1)
    df['log_ask_1_div_bid_1'] = np.log(df['ask_price1'] / df['bid_price1'])
    df['log_ask_1_div_bid_1_size'] = np.log(df['ask_size1'] / df['bid_size1'])
    df['log_return_bid_price2'] = np.log(df['bid_price2'].pct_change() + 1)
    df['log_return_ask_price2'] = np.log(df['ask_price2'].pct_change() + 1)
    df['log_return_bid_size2'] = np.log(df['bid_size2'].pct_change() + 1)
    df['log_return_ask_size2'] = np.log(df['ask_size2'].pct_change() + 1)
    df['log_ask_2_div_bid_2'] = np.log(df['ask_price2'] / df['bid_price2'])
    df['log_ask_2_div_bid_2_size'] = np.log(df['ask_size2'] / df['bid_size2'])
    
    
    # Dict for aggregations
    create_feature_dict = {
        # 测试1
        'wap1': [np.mean, np.max, np.sum],#np.sum, 上次测试的结果
        'wap2': [np.mean, np.max, np.sum],#np.sum, 上次测试的结果
        # 如果np.mean, np.max 加入 sum
        #测试2
        'log_return1': [realized_volatility], # np.sum, np.mean,np.std
        'log_return2': [realized_volatility], # np.sum, np.mean,np.std
        'log_return_bid_price1':[realized_volatility], # np.sum, np.mean,np.std
        'log_return_ask_price1':[realized_volatility], # np.sum, np.mean,np.std
        'log_return_bid_size1':[realized_volatility], # np.sum, np.mean,np.std
        'log_return_ask_size1':[realized_volatility], # np.sum, np.mean,np.std
        'log_ask_1_div_bid_1':[realized_volatility], # np.sum, np.mean,np.std
        'log_ask_1_div_bid_1_size':[realized_volatility], # np.sum, np.mean,np.std
        'log_return_bid_price2':[realized_volatility], # np.sum, np.mean,np.std
        'log_return_ask_price2':[realized_volatility], # np.sum, np.mean,np.std
        'log_return_bid_size2':[realized_volatility], # np.sum, np.mean,np.std
        'log_return_ask_size2':[realized_volatility], # np.sum, np.mean,np.std
        'log_ask_2_div_bid_2':[realized_volatility], # np.sum, np.mean,np.std
        'log_ask_2_div_bid_2_size':[realized_volatility], # np.sum, np.mean,np.std
        #测试3
        'wap_balance': [np.mean, np.max, np.std, np.sum], #上次测试的结果, np.sum
        'wap_mean': [np.mean, np.max, np.std, np.sum],#上次测试的结果, np.sum
        'price_spread':[np.mean, np.max, np.std, np.sum],#上次测试的结果, np.sum
        'bid_spread':[np.mean, np.max, np.std, np.sum],#上次测试的结果, np.sum
        'ask_spread':[np.mean, np.max, np.std, np.sum],#上次测试的结果, np.sum
        'total_volume':[np.mean, np.max, np.std, np.sum],#上次测试的结果, np.sum
        'volume_imbalance':[np.mean, np.max, np.std, np.sum],#上次测试的结果, np.sum
        'bas':[np.mean, np.max, np.std, np.sum],#上次测试的结果, np.sum
        'h_spread_l1':[np.mean, np.max, np.std, np.sum],#上次测试的结果, np.sum
        'h_spread_l2':[np.mean, np.max, np.std, np.sum],#上次测试的结果, np.sum
                       }
    
    # Function to get group stats for different windows (seconds in bucket)
    def get_stats_window(seconds_in_bucket, add_suffix = False):
        # Group by the window
        df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(create_feature_dict).reset_index()
        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        # Add a suffix to differentiate windows
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        return df_feature
    
    # Get the stats for different windows
    #根据上周测试还可以加入其他时间段
    df_feature = get_stats_window(seconds_in_bucket = 0, add_suffix = False)
    df_feature_450 = get_stats_window(seconds_in_bucket = 450, add_suffix = True)
    df_feature_300 = get_stats_window(seconds_in_bucket = 300, add_suffix = True)
    df_feature_150 = get_stats_window(seconds_in_bucket = 150, add_suffix = True)
    
    # Merge all
    df_feature = df_feature.merge(df_feature_450, how = 'left', left_on = 'time_id_', right_on = 'time_id__450')
    df_feature = df_feature.merge(df_feature_300, how = 'left', left_on = 'time_id_', right_on = 'time_id__300')
    df_feature = df_feature.merge(df_feature_150, how = 'left', left_on = 'time_id_', right_on = 'time_id__150')
    # Drop unnecesary time_ids
    df_feature.drop(['time_id__450', 'time_id__300', 'time_id__150'], axis = 1, inplace = True)
    
    # Create row_id so we can merge
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['time_id_'].apply(lambda x: f'{stock_id}-{x}')
    df_feature.drop(['time_id_'], axis = 1, inplace = True)
    return df_feature

In [None]:
# Function to preprocess trade data (for each stock id)
def trade_preprocessor(file_path):
    df = pd.read_parquet(file_path)
    df['log_return'] = df.groupby('time_id')['price'].apply(log_return)
    
    # Dict for aggregations
    #根据上周测试调整
    create_feature_dict = {
        'log_return':[realized_volatility],
        'seconds_in_bucket':[count_unique],
        'size':[np.sum],
        'order_count':[np.mean],
    }
    
    # Function to get group stats for different windows (seconds in bucket)
    def get_stats_window(seconds_in_bucket, add_suffix = False):
        # Group by the window
        df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(create_feature_dict).reset_index()
        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        # Add a suffix to differentiate windows
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        return df_feature
    
    # Get the stats for different windows
    #根据上周测试还可以加入其他时间段
    df_feature = get_stats_window(seconds_in_bucket = 0, add_suffix = False)
    df_feature_450 = get_stats_window(seconds_in_bucket = 450, add_suffix = True)
    df_feature_300 = get_stats_window(seconds_in_bucket = 300, add_suffix = True)
    df_feature_150 = get_stats_window(seconds_in_bucket = 150, add_suffix = True)

    # Merge all
    df_feature = df_feature.merge(df_feature_450, how = 'left', left_on = 'time_id_', right_on = 'time_id__450')
    df_feature = df_feature.merge(df_feature_300, how = 'left', left_on = 'time_id_', right_on = 'time_id__300')
    df_feature = df_feature.merge(df_feature_150, how = 'left', left_on = 'time_id_', right_on = 'time_id__150')
    # Drop unnecesary time_ids
    df_feature.drop(['time_id__450', 'time_id__300', 'time_id__150'], axis = 1, inplace = True)
    
    df_feature = df_feature.add_prefix('trade_')
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['trade_time_id_'].apply(lambda x:f'{stock_id}-{x}')
    df_feature.drop(['trade_time_id_'], axis = 1, inplace = True)
    return df_feature

In [None]:
# Function to get group stats for the stock_id and time_id
def get_time_stock(df):
    # Get realized volatility columns
    vol_cols = ['log_return1_realized_volatility', 'log_return2_realized_volatility', 'log_return1_realized_volatility_450', 'log_return2_realized_volatility_450', 
                'log_return1_realized_volatility_300', 'log_return2_realized_volatility_300', 'log_return1_realized_volatility_150', 'log_return2_realized_volatility_150', 
                'trade_log_return_realized_volatility', 'trade_log_return_realized_volatility_450', 'trade_log_return_realized_volatility_300', 'trade_log_return_realized_volatility_150']

    # Group by the stock id
    df_stock_id = df.groupby(['stock_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index()
    # Rename columns joining suffix
    df_stock_id.columns = ['_'.join(col) for col in df_stock_id.columns]
    df_stock_id = df_stock_id.add_suffix('_' + 'stock')

    # Group by the stock id
    df_time_id = df.groupby(['time_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index()
    # Rename columns joining suffix
    df_time_id.columns = ['_'.join(col) for col in df_time_id.columns]
    df_time_id = df_time_id.add_suffix('_' + 'time')
    
    # Merge with original dataframe
    df = df.merge(df_stock_id, how = 'left', left_on = ['stock_id'], right_on = ['stock_id__stock'])
    df = df.merge(df_time_id, how = 'left', left_on = ['time_id'], right_on = ['time_id__time'])
    df.drop(['stock_id__stock', 'time_id__time'], axis = 1, inplace = True)
    return df

In [None]:
# Funtion to make preprocessing function in parallel (for each stock id)
def preprocessor(list_stock_ids, is_train = True):
    
    # Parrallel for loop
    def for_joblib(stock_id):
        # Train
        if is_train:
            file_path_book = data_dir + "book_train.parquet/stock_id=" + str(stock_id)
            file_path_trade = data_dir + "trade_train.parquet/stock_id=" + str(stock_id)
        # Test
        else:
            file_path_book = data_dir + "book_test.parquet/stock_id=" + str(stock_id)
            file_path_trade = data_dir + "trade_test.parquet/stock_id=" + str(stock_id)
    
        # Preprocess book and trade data and merge them
        df_tmp = pd.merge(book_preprocessor(file_path_book), trade_preprocessor(file_path_trade), on = 'row_id', how = 'left')
        
        # Return the merge dataframe
        return df_tmp
    
    # Use parallel api to call paralle for loop
    df = Parallel(n_jobs = -1, verbose = 1)(delayed(for_joblib)(stock_id) for stock_id in list_stock_ids)
    # Concatenate all the dataframes that return from Parallel
    df = pd.concat(df, ignore_index = True)
    return df

In [None]:
# Read train and test
train, test = read_train_test()

# Get unique stock ids 
train_stock_ids = train['stock_id'].unique()
# Preprocess them using Parallel and our single stock id functions
train_ = preprocessor(train_stock_ids, is_train = True)
train = train.merge(train_, on = ['row_id'], how = 'left')

# Get unique stock ids 
test_stock_ids = test['stock_id'].unique()
# Preprocess them using Parallel and our single stock id functions
test_ = preprocessor(test_stock_ids, is_train = False)
test = test.merge(test_, on = ['row_id'], how = 'left')

# Get group stats of time_id and stock_id
train = get_time_stock(train)
test = get_time_stock(test)

In [None]:
# Function to calculate the root mean squared percentage error
def rmspe(y_true, y_pred):
    return np.sqrt(np.mean(np.square((y_true - y_pred) / y_true)))

# Function to early stop with root mean squared percentage error
def feval_rmspe(y_pred, lgb_train):
    y_true = lgb_train.get_label()
    return 'RMSPE', rmspe(y_true, y_pred), False


In [None]:
# Split features and target
x = train.drop(['row_id', 'target', 'time_id'], axis = 1)
y = train['target']
x_test = test.drop(['row_id', 'time_id'], axis = 1)
    # Transform stock id to a numeric value
x['stock_id'] = x['stock_id'].astype(int)
x_test['stock_id'] = x_test['stock_id'].astype(int)
    
 # Create out of folds array
oof_predictions = np.zeros(x.shape[0])
# Create test array to store predictions
test_predictions = np.zeros(x_test.shape[0])
groups=train['time_id']
# Create a KFold object
if kind == "k":
    kfold = KFold(n_splits = n_fold, random_state = SEED, shuffle = True) #上周一样
elif (kind == "g"):
    kfold = GroupKFold(n_splits = n_fold, random_state = SEED, shuffle = True)
    # Iterate through each fold
for fold, (trn_ind, val_ind) in enumerate(kfold.split(x)):
    x_train, x_val = x.iloc[trn_ind], x.iloc[val_ind]
    y_train, y_val = y.iloc[trn_ind], y.iloc[val_ind]

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import r2_score
import os
import glob
from tqdm import tqdm
from joblib import Parallel, delayed
import gc

from sklearn.model_selection import train_test_split, KFold

from lightgbm import LGBMRegressor
from xgboost import XGBRegressor
import optuna
from optuna.samplers import TPESampler

In [None]:
def objective(trial):
    
    def rmspe(y_true, y_pred):
        return  (np.sqrt(np.mean(np.square((y_true - y_pred) / y_true))))
    
    valid = [(x_val, y_val)]
    
    param = {
        "device": "gpu",
        'boosting': 'gbdt',
        "metric": "rmse",
        "verbosity": -1,
        'learning_rate':trial.suggest_loguniform('learning_rate', 0.005, 0.5),
        "max_depth": -1,
        "lambda_l1": trial.suggest_float("lambda_l1", 1e-8, 10.0, log=True),
        "lambda_l2": trial.suggest_float("lambda_l2", 1e-8, 10.0, log=True),
        "num_leaves": trial.suggest_int("num_leaves", 2, 1000),
        "n_estimators": trial.suggest_int("n_estimators", 100, 4000),
        "min_data_in_leaf": trial.suggest_int("min_data_in_leaf", 100, 2000),
        "feature_fraction": trial.suggest_float("feature_fraction", 0.4, 1.0),
        'feature_fraction_bynode': trial.suggest_float("feature_fraction_bynode", 0.4, 1.0),
        'min_sum_hessian_in_leaf':trial.suggest_int("min_sum_hessian_in_leaf", 10, 100),
        "bagging_fraction": trial.suggest_float("bagging_fraction", 0.4, 1.0),
        "bagging_freq": trial.suggest_int("bagging_freq", 1, 7),
        'seed':SEED,
        'feature_fraction_seed': SEED,
        'bagging_seed': SEED,
        'drop_seed': SEED,
        'data_random_seed': SEED}

    pruning_callback = optuna.integration.LightGBMPruningCallback(trial, "rmse")
    model = LGBMRegressor(**param)
    
    model.fit(x_train, y_train, eval_set=valid, verbose=False, callbacks=[pruning_callback], early_stopping_rounds=100)

    preds = model.predict(x_val)
    
    rmspe = rmspe(y_val, preds)
    return rmspe

In [None]:
study = optuna.create_study(sampler=TPESampler(), direction='minimize', pruner=optuna.pruners.MedianPruner(n_warmup_steps=5))
study.optimize(objective, n_trials=TRIALS, gc_after_trial=True) # change n trials 

In [None]:
print('Number of finished trials:', len(study.trials))
print('Best trial:', study.best_trial.params)

In [None]:
optuna.visualization.plot_optimization_history(study)

In [None]:
optuna.visualization.plot_param_importances(study)

In [None]:
best_lgbmparams = study.best_params
best_lgbmparams

In [None]:
import json
with open(f"{fileName}X_{SEED}.json","w") as f:
    json.dump(best_lgbmparams,f)