In [1]:
import numpy as np
import pandas as pd
import warnings
from itertools import combinations
import time
import gc
import polars as pl
import joblib
from numba import njit, prange
import sys
warnings.filterwarnings('ignore')
pd.options.display.max_rows = 999

In [2]:
train = pd.read_csv('/kaggle/input/optiver-trading-at-the-close/train.csv')
flag = 'submission' #submission None

# Memory Reduction

In [4]:
def reduce_mem_usage(df):
    if flag == 'test':
        pass
    
    else:
        start_mem = df.memory_usage().sum() / 1024**2
        print(f'Memory usage of dataframe is {start_mem:.2f} MB')

        for col in df.columns:
            col_type = df[col].dtype

            if col_type != object:
                c_min = df[col].min()
                c_max = df[col].max()
                if str(col_type)[:3] == 'int':
                    if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                        df[col] = df[col].astype(np.int8)
                    elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                        df[col] = df[col].astype(np.int16)
                    elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                        df[col] = df[col].astype(np.int32)
                    elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                        df[col] = df[col].astype(np.int64)  
                else:
                    if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                        df[col] = df[col].astype(np.float16)
                    elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                        df[col] = df[col].astype(np.float32)
                    else:
                        df[col] = df[col].astype(np.float32)

        end_mem = df.memory_usage().sum() / 1024**2
        print(f'Memory usage after optimization is: {end_mem:.2f} MB')
        decrease = 100 * (start_mem - end_mem) / start_mem
        print(f'Decreased by {decrease:.2f}%')
    
    return df

# memory management; remove useless columns after each iteration

def memory_management(df, columns):
    df = df[df.columns[~df.columns.isin(columns)]]

    return df

# List of Insignificant Features

In [5]:
useless = [
'ask_price',
 'ask_price__1_ewm',
 'ask_price__2_ewm',
 'ask_price__3_ewm',
 'ask_price_bid_ask_spread_imb',
 'ask_price_bid_ask_spread_imb3',
 'ask_price_bid_price_bid_ask_spread_imb2',
 'ask_price_bid_price_imb',
 'ask_price_bid_price_imb3',
 'ask_price_mid_price_bid_ask_spread_imb2',
 'ask_price_mid_price_bid_price_imb2',
 'ask_price_mid_price_imb',
 'ask_price_mid_price_imb3',
 'ask_price_mid_price_wap_imb2',
 'ask_size__1_ewm',
 'ask_size__2_ewm',
 'ask_size__3_ewm',
 'ask_size__4_ewm',
 'bid_ask_ratio__1_ewm',
 'bid_ask_ratio__1_rolling_mean',
 'bid_ask_ratio__2_ewm',
 'bid_ask_ratio__3_ewm',
 'bid_ask_ratio__4_ewm',
 'bid_ask_spread',
 'bid_ask_spread_ratio',
 'bid_ask_spread_vols_ratio',
 'bid_plus_ask_sizes',
 'bid_price',
 'bid_price__1_ewm',
 'bid_price__2_ewm',
 'bid_price__3_ewm',
 'bid_price_bid_ask_spread_imb',
 'bid_price_bid_ask_spread_imb3',
 'bid_size__1_ewm',
 'bid_size__1_rolling_mean',
 'bid_size__2_ewm',
 'bid_size__3_ewm',
 'bid_size__4_ewm',
 'bid_size_ask_size_bid_ask_ratio_imb2',
 'bid_size_ask_size_bid_plus_ask_sizes_imb2',
 'bid_size_ask_size_imb',
 'bid_size_bid_ask_ratio_imb3',
 'bid_size_bid_plus_ask_sizes_imb',
 'bid_size_bid_plus_ask_sizes_imb3',
 'far_price',
 'far_price_ask_price_bid_ask_spread_imb2',
 'far_price_ask_price_imb',
 'far_price_ask_price_imb3',
 'far_price_ask_price_mid_price_imb2',
 'far_price_ask_price_wap_imb2',
 'far_price_bid_ask_spread_imb',
 'far_price_bid_ask_spread_imb3',
 'far_price_bid_price_bid_ask_spread_imb2',
 'far_price_bid_price_imb',
 'far_price_bid_price_imb3',
 'far_price_bid_price_wap_imb2',
 'far_price_mid_price_bid_ask_spread_imb2',
 'far_price_mid_price_bid_price_imb2',
 'far_price_mid_price_imb',
 'far_price_mid_price_imb3',
 'far_price_mid_price_wap_imb2',
 'far_price_near_price_ask_price_imb2',
 'far_price_near_price_bid_ask_spread_imb2',
 'far_price_near_price_bid_price_imb2',
 'far_price_near_price_imb',
 'far_price_near_price_imb3',
 'far_price_near_price_mid_price_imb2',
 'far_price_near_price_wap_imb2',
 'far_price_wap_bid_ask_spread_imb2',
 'far_price_wap_imb',
 'far_price_wap_imb3',
 'imbalance_buy_sell_flag__1_ewm',
 'imbalance_buy_sell_flag__1_rolling_mean',
 'imbalance_buy_sell_flag__2_ewm',
 'imbalance_buy_sell_flag__3_ewm',
 'imbalance_buy_sell_flag_pct_change_2',
 'imbalance_buy_sell_flag_pct_change_3',
 'imbalance_size_ask_size_bid_ask_ratio_imb2',
 'imbalance_size_ask_size_bid_plus_ask_sizes_imb2',
 'imbalance_size_ask_size_imb',
 'imbalance_size_bid_ask_ratio_imb',
 'imbalance_size_bid_ask_ratio_imb3',
 'imbalance_size_bid_plus_ask_sizes_bid_ask_ratio_imb2',
 'imbalance_size_bid_plus_ask_sizes_imb',
 'imbalance_size_bid_plus_ask_sizes_imb3',
 'imbalance_size_bid_size_ask_size_imb2',
 'imbalance_size_bid_size_bid_ask_ratio_imb2',
 'imbalance_size_bid_size_bid_plus_ask_sizes_imb2',
 'imbalance_size_bid_size_imb',
 'imbalance_size_bid_size_imb3',
 'imbalance_size_matched_size_ask_size_imb2',
 'imbalance_size_matched_size_bid_ask_ratio_imb2',
 'imbalance_size_matched_size_bid_plus_ask_sizes_imb2',
 'imbalance_size_matched_size_bid_size_imb2',
 'imbalance_size_matched_size_imb',
 'imbalance_size_matched_size_imb3',
 'liquidity_imbalance__1_ewm',
 'liquidity_imbalance__1_rolling_mean',
 'liquidity_imbalance__2_ewm',
 'liquidity_imbalance__3_ewm',
 'liquidity_imbalance__4_ewm',
 'log_bid_vol__1_ewm',
 'log_bid_vol__1_rolling_mean',
 'log_bid_vol__2_ewm',
 'log_bid_vol__3_ewm',
 'log_bid_vol__4_ewm',
 'log_far_price',
 'market_urgency__1_ewm',
 'market_urgency__1_rolling_mean',
 'market_urgency__2_ewm',
 'market_urgency__3_ewm',
 'market_urgency__4_ewm',
 'matched_imbalance',
 'matched_size_ask_size_imb',
 'matched_size_bid_ask_ratio_imb',
 'matched_size_bid_plus_ask_sizes_bid_ask_ratio_imb2',
 'matched_size_bid_plus_ask_sizes_imb',
 'matched_size_bid_plus_ask_sizes_imb3',
 'matched_size_bid_size_ask_size_imb2',
 'mid_price',
 'mid_price__1_ewm',
 'mid_price__2_ewm',
 'mid_price__3_ewm',
 'mid_price__4_ewm',
 'mid_price_bid_ask_spread_imb',
 'mid_price_bid_ask_spread_imb3',
 'mid_price_bid_price_bid_ask_spread_imb2',
 'mid_price_bid_price_imb',
 'mid_price_bid_price_imb3',
 'mid_price_wap_bid_ask_spread_imb2',
 'near_price',
 'near_price_ask_price_bid_ask_spread_imb2',
 'near_price_ask_price_bid_price_imb2',
 'near_price_ask_price_imb',
 'near_price_ask_price_imb3',
 'near_price_ask_price_mid_price_imb2',
 'near_price_ask_price_wap_imb2',
 'near_price_bid_ask_spread_imb',
 'near_price_bid_ask_spread_imb3',
 'near_price_bid_price_bid_ask_spread_imb2',
 'near_price_bid_price_imb',
 'near_price_bid_price_imb3',
 'near_price_bid_price_wap_imb2',
 'near_price_mid_price_bid_ask_spread_imb2',
 'near_price_mid_price_bid_price_imb2',
 'near_price_mid_price_imb',
 'near_price_mid_price_imb3',
 'near_price_mid_price_wap_imb2',
 'near_price_wap_bid_ask_spread_imb2',
 'near_price_wap_imb',
 'near_price_wap_imb3',
 'price_diff_auction_vs_non_auction__1_ewm',
 'price_diff_auction_vs_non_auction__1_rolling_mean',
 'price_diff_auction_vs_non_auction__2_ewm',
 'price_diff_auction_vs_non_auction__3_ewm',
 'price_diff_auction_vs_non_auction__4_ewm',
 'price_spread',
 'ref_mid_spread_ratio__1_ewm',
 'ref_mid_spread_ratio__2_ewm',
 'ref_mid_spread_ratio__3_ewm',
 'ref_mid_spread_ratio__4_ewm',
 'reference_price_ask_price_bid_ask_spread_imb2',
 'reference_price_ask_price_bid_price_imb2',
 'reference_price_ask_price_imb3',
 'reference_price_ask_price_mid_price_imb2',
 'reference_price_ask_price_wap_imb2',
 'reference_price_bid_ask_spread_imb',
 'reference_price_bid_ask_spread_imb3',
 'reference_price_bid_price_bid_ask_spread_imb2',
 'reference_price_bid_price_imb3',
 'reference_price_bid_price_wap_imb2',
 'reference_price_far_price_ask_price_imb2',
 'reference_price_far_price_bid_ask_spread_imb2',
 'reference_price_far_price_bid_price_imb2',
 'reference_price_far_price_imb',
 'reference_price_far_price_imb3',
 'reference_price_far_price_mid_price_imb2',
 'reference_price_far_price_wap_imb2',
 'reference_price_mid_price_bid_ask_spread_imb2',
 'reference_price_mid_price_bid_price_imb2',
 'reference_price_mid_price_imb',
 'reference_price_mid_price_imb3',
 'reference_price_mid_price_wap_imb2',
 'reference_price_near_price_ask_price_imb2',
 'reference_price_near_price_bid_ask_spread_imb2',
 'reference_price_near_price_bid_price_imb2',
 'reference_price_near_price_imb',
 'reference_price_near_price_imb3',
 'reference_price_near_price_mid_price_imb2',
 'reference_price_near_price_wap_imb2',
 'reference_price_wap_bid_ask_spread_imb2',
 'wap',
 'wap__2_ewm',
 'wap__3_ewm',
 'wap__4_ewm',
 'wap_bid_ask_spread_imb',
 'wap_bid_ask_spread_imb3'
 'rsi_11_reference_price',
 'rsi_3_reference_price',
 'rsi_4_reference_price',
 'rsi_4_wap',
 'rsi_6_reference_price',
 'rsi_7_reference_price',
 'rsi_8_reference_price'
]

# Feature Engineering

#### Polars is used for majority of feature generation due to its faster computation vs pandas

## Function To Generate Single Feature

In [6]:
# Grouped metrics
grouped_bid = train.groupby('stock_id')['bid_size']
grouped_ask = train.groupby('stock_id')['ask_size']

# Various Aggregations
global_feats = {
    'median_sizes' : grouped_bid.median() + grouped_ask.median(),
    'std_sizes' : grouped_bid.std() + grouped_ask.std(),
    'max_sizes' : grouped_bid.max() + grouped_ask.max(),
    'min_sizes' : grouped_bid.min() + grouped_ask.min(),
    'first_sizes' : grouped_bid.first() + grouped_ask.first(),
    'std_price' : train.groupby('stock_id')['bid_price'].std() + train.groupby('stock_id')['ask_price'].std()
}

# Function to Generate Single Features
def generate_single_features(train):
    
    # Remove Missing Data
    train.dropna(subset = ['reference_price'], inplace = True)
    
    # Fill NAs with 1 for log transformation
    train.fillna(1, inplace = True)
    
    # Define columns to add 1 to for log transformation
    col_add_one = ['imbalance_size', 'reference_price', 'matched_size', 
                   'far_price', 'near_price', 'bid_price', 
                   'bid_size', 'ask_price', 'ask_size']
    
    # Add 1 to those specified columns where those columns are 0 so that log1 = 0
    for col in col_add_one:
        train[col] = np.where(train[col] == 0, 1, train[col])
    
# Volume Features

    # Ratio in bid ask size
    train['bid_ask_ratio'] = train.bid_size / train.ask_size
    
    # Log bid vol
    train['log_bid_vol'] = np.log(train.bid_size)
      
    # Total_vol
    train['bid_plus_ask_sizes'] = train['bid_size'] + train['ask_size']
    
    # Mapping aggregated features
    for key, value in global_feats.items():
        train[key] = train['stock_id'].map(value.to_dict())
    
# Price Features

    # Bid ask spread
    train['bid_ask_spread'] = train.ask_price - train.bid_price

    # Bid ask spread ratio
    train['bid_ask_spread_ratio'] = (train.ask_price - train.bid_price) / train.ask_price
    
    # Mid price
    train['mid_price'] = (train.ask_price + train.bid_price) / 2
    
    # Reference mid price spread ratio
    train['ref_mid_spread_ratio'] = (train.mid_price - train.reference_price) / train.mid_price
    
    # Far and near price logs
    train['log_far_price'] = np.log(train.far_price)
    
    # Far and near price ratio
    train['far_near_ratio'] = train.far_price / train.near_price
    
    # Bid ask spread ratio
    train['far_near_spread_ratio'] = \
        np.where(train.far_price != 0,
                 (train.far_price - train.near_price) / train.far_price, 0
                )
    
    # Reference price vs WAP
    train['price_diff_auction_vs_non_auction'] = train['reference_price'] - train['wap']

# Mixed features
    
    # Bid ask vol ratio
    train['bid_ask_spread_vols_ratio'] = (train.bid_price - train.ask_price) / (train.bid_size + train.ask_size)
    
    # Ask Bid spread
    train['price_spread'] = train.ask_price - train.bid_price
    
    # Liquidity imbalance using bid and ask size
    train['liquidity_imbalance'] = (train.bid_size - train.ask_size) / (train.bid_size + train.ask_size)
    
    # Market urgency: combination of price spread and liquidity imbalance
    train['market_urgency'] = train.price_spread * train.liquidity_imbalance
    
    # Liquidity imbalance using imblaance size and matched size
    train['matched_imbalance'] = (train.imbalance_size - train.matched_size) / (train.matched_size + train.imbalance_size)
    
    return train

## Function To Generate Imbalance Features

In [7]:
# Function to compute triplet imbalance in parallel using Numba (Inspired From Other Kaggle Users)
@njit(parallel=True)
def compute_triplet_imbalance(df_values, comb_indices):
    num_rows = df_values.shape[0]
    num_combinations = len(comb_indices)
    imbalance_features = np.empty((num_rows, num_combinations))

    # Loop through all combinations of triplets
    for i in prange(num_combinations):
        a, b, c = comb_indices[i]
        
        # Loop through rows of the DataFrame
        for j in range(num_rows):
            max_val = max(df_values[j, a], df_values[j, b], df_values[j, c])
            min_val = min(df_values[j, a], df_values[j, b], df_values[j, c])
            mid_val = df_values[j, a] + df_values[j, b] + df_values[j, c] - min_val - max_val
            
            # Prevent division by zero
            if mid_val == min_val:
                imbalance_features[j, i] = np.nan
            else:
                imbalance_features[j, i] = (max_val - mid_val) / (mid_val - min_val)

    return imbalance_features

# Function to calculate triplet imbalance for given price data and a DataFrame
def calculate_triplet_imbalance_numba(price, df):
    # Convert DataFrame to numpy array for Numba compatibility
    df_values = df[price].values
    comb_indices = [(price.index(a), price.index(b), price.index(c)) for a, b, c in combinations(price, 3)]

    # Calculate the triplet imbalance using the Numba-optimized function
    features_array = compute_triplet_imbalance(df_values, comb_indices)

    # Create a DataFrame from the results
    columns = [f'{a}_{b}_{c}_imb2' for a, b, c in combinations(price, 3)]
    features = pd.DataFrame(features_array, columns=columns)

    return features

def generate_imb_features(train):
       
# Computing Imbalance Features
    prices = ['reference_price','far_price', 'near_price', 'ask_price', 
              'mid_price', 'bid_price', 'wap', 'bid_ask_spread']
    
    volume = ['imbalance_size', 'matched_size', 'bid_size', 
              'ask_size', 'bid_plus_ask_sizes', 'bid_ask_ratio']
    
    correl_columns = ['reference_price_ask_price_imb', 'reference_price_bid_price_imb', 'target',
               'far_price_ask_price_bid_price_imb2', 'far_near_spread_ratio', 'reference_price_far_price_near_price_imb2', 
                'liquidity_imbalance', 'market_urgency', 'price_diff_auction_vs_non_auction', 'log_bid_vol', 'bid_ask_ratio', 
              'bid_size', 'ask_size', 'ref_mid_spread_ratio', 'imbalance_buy_sell_flag', 'bid_price', 'ask_price', 'mid_price', 'reference_price']
    
    rolling_columns = ['imbalance_size', 'matched_size', 'reference_price', 'imbalance_buy_sell_flag']
    
    loop_remove_cols = [col for col in useless if (col not in prices and col not in volume and col not in correl_columns and col not in rolling_columns)]  
    
    for groups in [prices, volume]:
        
        #imb1
        for c in combinations(groups, 2):
            if f'{c[0]}_{c[1]}_imb' not in useless:
                train[f'{c[0]}_{c[1]}_imb'] = train.eval(f'({c[0]} - {c[1]})/({c[0]} + {c[1]})')
        #imb3
            if f'{c[0]}_{c[1]}_imb3' not in useless:
                train[f'{c[0]}_{c[1]}_imb3'] = (train[c[0]] / train[c[0]].sum()) / (train[c[1]] / train[c[1]].sum())
        
        #imb2
        triplet_feature = calculate_triplet_imbalance_numba(groups, train)
        train[triplet_feature.columns] = triplet_feature.values
        
        del triplet_feature
        
        train = memory_management(train, loop_remove_cols)
        
        reduce_mem_cols = [col for col in train.columns if (col not in prices and col not in volume and col not in correl_columns and col not in rolling_columns)]
        
        train[reduce_mem_cols] = reduce_mem_usage(train[reduce_mem_cols])
        
    return train

## Function To Generate Lag Features

#### Generate lag features using `shift`, `diff` and `pct_change`

In [8]:
def generate_lag_features(train):
                 
    pl_df = pl.from_pandas(train)
    
    rolling_columns = ['imbalance_size', 'matched_size', 'reference_price', 'imbalance_buy_sell_flag']
    windows = [1, 2, 3, 10]  

    # Need to groupby stock and date to prevent spillovers from EOD to the following day
    group = ['stock_id', 'date_id']
    
    for col in rolling_columns:
        for window in windows:

            expressions = [
                 pl.col(col).shift(window)
                            .over(group)
                            .alias(f'{col}_shift_{window}'),
                 pl.col(col).diff(window)
                            .over(group)
                            .alias(f'{col}_diff_{window}'),
                 pl.col(col).pct_change(window)
                            .over(group)
                            .alias(f'{col}_pct_change_{window}')   
            ]

            pl_df = pl_df.with_columns(expressions)
    
    
    train = pl_df.to_pandas()
   
    return train

## Function To Generate Rolling Features
#### Generate rolling features using rolling mean (`rolling_mean`) and expoential moving average (`ewm_mean`)

In [9]:
def generate_rolling_features(train):
    
    # Relevant Features
    keep = ['liquidity_imbalance__1_rolling_mean', 'liquidity_imbalance', 'liquidity_imbalance__2_rolling_mean', 'market_urgency__1_rolling_mean', 
            'market_urgency', 'liquidity_imbalance__3_rolling_mean', 'price_diff_auction_vs_non_auction__1_rolling_mean', 'price_diff_auction_vs_non_auction', 
            'market_urgency__2_rolling_mean', 'liquidity_imbalance__4_rolling_mean', 'log_bid_vol', 'log_bid_vol__1_rolling_mean', 'market_urgency__3_rolling_mean', 
            'price_diff_auction_vs_non_auction__2_rolling_mean', 'bid_ask_ratio__1_rolling_mean', 'bid_ask_ratio', 'market_urgency__4_rolling_mean', 'log_bid_vol__2_rolling_mean', 
            'price_diff_auction_vs_non_auction__3_rolling_mean', 'price_diff_auction_vs_non_auction__4_rolling_mean', 'bid_ask_ratio__2_rolling_mean', 'log_bid_vol__3_rolling_mean', 
            'bid_ask_ratio__3_rolling_mean', 'log_bid_vol__4_rolling_mean', 'bid_size', 'bid_size__1_rolling_mean', 'bid_ask_ratio__4_rolling_mean', 'ask_size__1_rolling_mean', 
            'ask_size', 'bid_size__2_rolling_mean', 'ref_mid_spread_ratio__1_rolling_mean', 'ref_mid_spread_ratio', 'wap__2_rolling_mean', 'ask_size__2_rolling_mean', 
            'ref_mid_spread_ratio__2_rolling_mean', 'bid_size__3_rolling_mean', 'ref_mid_spread_ratio__3_rolling_mean', 'wap__3_rolling_mean', 'ref_mid_spread_ratio__4_rolling_mean', 
            'ask_size__3_rolling_mean', 'bid_size__4_rolling_mean', 'imbalance_buy_sell_flag', 'imbalance_buy_sell_flag__1_rolling_mean', 'wap__4_rolling_mean', 'ask_size__4_rolling_mean', 
            'mid_price__2_rolling_mean', 'bid_price__2_rolling_mean', 'bid_price__1_rolling_mean', 'bid_price', 'ask_price__2_rolling_mean', 'ask_price__1_rolling_mean', 'ask_price', 
            'imbalance_buy_sell_flag__2_rolling_mean', 'mid_price__1_rolling_mean', 'mid_price', 'mid_price__3_rolling_mean', 'bid_price__3_rolling_mean', 'ask_price__3_rolling_mean', 
            'imbalance_buy_sell_flag__3_rolling_mean', 'mid_price__4_rolling_mean']
    
    # Convert from pandas
    pl_df = pl.from_pandas(train)

    # Prepare the operations
    column = [col for col in train.columns if col not in ['stock_id', 'seconds_in_bucket', 'date_id', 'target', 'row_id', 'time_id']]
    windows = list(range(1,5))

    # Need to groupby stock and date to prevent spillovers from EOD to the following day
    group = ['stock_id', 'date_id']
    types = ['rolling_mean', 'ewm']
    
    for col in column:
        for window in windows:
            for i in types:
                if f'{col}__{window}_{i}' in keep: # Only generate the required rolling features
                    
                    expressions = [
                         pl.col(col).rolling_mean(window)
                                    .over(group)
                                    .alias(f'{col}__{window}_rolling_mean'),
                         pl.col(col).ewm_mean(half_life = window)
                                    .over(group)
                                    .last()
                                    .alias(f'{col}__{window}_ewm')
                    ]

                    # run the operations
                    pl_df = pl_df.with_columns(expressions)

    # back to pandas
    train = pl_df.to_pandas()
    
    return train

## Technical Analysis
#### Generate technical analysis - RSI, Bollinger Bands, Averate True Range, MACD, Historical Volatility, Keltner Channel

### RSI
#### Relative Strength Index (RSI) is a momentum indicator that measures the magnitude of recent price changes to analyze overbought or oversold conditions

In [10]:
def generate_rsi(train):
    
    pl_df = pl.from_pandas(train)

    rsi_col = ['reference_price', 'far_price', 'near_price', 'matched_size', 'wap']
    windows = [3, 7, 14]
    group = ['stock_id', 'date_id']

    for col in rsi_col:
        for window in windows: 
            if f'rsi_{window}_{col}' not in useless:
              
                U = pl.when(pl.col(col).pct_change() >= 0).then(pl.col(col).pct_change()).otherwise(0.0).alias('U')
                V = pl.when(pl.col(col).pct_change() < 0).then((pl.col(col).pct_change()).abs()).otherwise(0.0).alias('V')

                pl_df = pl_df.with_columns([U, V])

                U = pl.col('U').rolling_mean(window).over(group)
                V = pl.col('V').rolling_mean(window).over(group)

                rsi_expression = (100 * (U / (U + V))).alias(f'rsi_{window}_{col}')
                pl_df = pl_df.with_columns(rsi_expression)

    pl_df = pl_df.drop(['U', 'V'])

    train = pl_df.to_pandas()
                               
    return train

### Bollinger Bands
#### Generate oversold or overbought signals; identify sharp, short-term price movements and potential entry and exit points

In [None]:
def generate_bbands(train):
    
    pl_df = pl.from_pandas(train)
    
    bband_col = ['reference_price', 'far_price', 'near_price', 'imbalance_size', 'matched_size', 'wap']
    window = 20
    multiplier = 2
    group = ['stock_id', 'date_id']
    
    for col in bband_col:
        
        expressions = [
            (
                pl.col(col).rolling_mean(window).over(group)
                + (multiplier * pl.col(col).rolling_std(window).over(group))
            ).alias(f'upper_bband_{col}'),
            (
                pl.col(col).rolling_mean(window).over(group)
                - (multiplier * pl.col(col).rolling_std(window).over(group))
            ).alias(f'lower_band_{col}')
                ]
        pl_df = pl_df.with_columns(expressions)

    # Computing Bollinger Band breakout (narrow bands) over different windows    
    windows = [3, 7, 14]
    for col in bband_col:
        for window in windows:
        
            expressions = [
                (pl.col(f'upper_bband_{col}') - pl.col(f'lower_band_{col}')).shift(window)
                                                                            .over(group)
                                                                            .alias(f'bband_breakout_{window}_{col}')
            ]
            
            pl_df = pl_df.with_columns(expressions)
        
    train = pl_df.to_pandas()
                               
    return train

### Moving average convergence/divergence
#### Momentum indicator that shows the relationship between two moving averages of a security's price

In [None]:
def generate_macd(train):
    
    pl_df = pl.from_pandas(train)
    
    column = ['reference_price', 'far_price', 'near_price', 'imbalance_size', 'matched_size', 'wap']
    group = ['stock_id', 'date_id']
    
    for col in column:
    
        expressions = [
             (pl.col(col).ewm_mean(span = 12)
                        .over(group)
                        .last()
                        -
             pl.col(col).ewm_mean(span = 26)
                        .over(group)
                        .last())
                        .alias(f'MACD_{col}'),
             pl.col(col).ewm_mean(span = 9)
                        .over(group)
                        .last()
                        .alias(f'Signal_{col}')
        ]    

        pl_df = pl_df.with_columns(expressions)

    train = pl_df.to_pandas()
                               
    return train

### Average True Range
#### Volatility indicator

In [None]:
def generate_atr(train):
    
    pl_df = pl.from_pandas(train)
    
    groups =  ['reference_price', 'far_price', 'near_price', 'wap', 'bid_price', 'ask_price']
    group = ['stock_id', 'date_id']
    window = 14
    
    for c in combinations(groups, 2):
    
        expressions = [
            (pl.col(c[0]) - pl.col(c[1]))
                    .fill_null(0)
                    .abs()
                    .rolling_mean(window)
                    .over(group)
                    .alias(f'{c[0]}_{c[1]}_atr_{window}'),
        ]

        pl_df = pl_df.with_columns(expressions)
        
    train = pl_df.to_pandas()
                               
    return train

### Historical Volatility

In [None]:
def generate_HV(train):
    
    pl_df = pl.from_pandas(train)
    
    column = ['reference_price', 'far_price', 'near_price', 'wap', 'ask_price', 'bid_price']
    group = ['stock_id', 'date_id']
    
    windows = [5, 10, 14]
    for col in column:
        for win in windows:
            expressions = [            
       pl.col(col).pct_change()
                  .over(group).alias('pct_c')
            ]    

            pl_df = pl_df.with_columns(expressions)
            
            expressions = [
                pl.col('pct_c')
                  .rolling_mean(win)
                  .over(group)
                  .std()
                  .alias(f'HV_{win}_{col}')
            ]
            
            pl_df = pl_df.with_columns(expressions)
            
    pl_df.drop('pct_c')
    train = pl_df.to_pandas()
    
    return train

### Keltner Channel
#### Tracks volatility using an asset's exponential moving average and average true range

In [None]:
def generate_keltner(train):
    
    pl_df = pl.from_pandas(train)
    
    window = 14
    multiplier = 2
    group = ['stock_id','date_id']
    
    expressions = [
        ((pl.col('ask_price') - pl.col('bid_price'))
                .rolling_mean(window).over(group) + pl.col(f'bid_price_ask_price_atr_{window}') * multiplier).alias(f'keltner_{window}_upper_bid_ask'),
        ((pl.col('ask_price') - pl.col('bid_price'))
                .rolling_mean(window).over(group) - pl.col(f'bid_price_ask_price_atr_{window}') * multiplier).alias(f'keltner_{window}_lower_bid_ask'),
        ((pl.col('reference_price') - pl.col('wap'))
                .fill_null(0).abs()
                .rolling_mean(window).over(group) + pl.col(f'reference_price_wap_atr_{window}') * multiplier).alias(f'keltner_{window}_upper_ref_wap'),
        ((pl.col('reference_price') - pl.col('wap'))
                .fill_null(0).abs()
                .rolling_mean(window).over(group) - pl.col(f'reference_price_wap_atr_{window}') * multiplier).alias(f'keltner_{window}_lower_ref_wap')        
    ]    

    pl_df = pl_df.with_columns(expressions)
    
    train = pl_df.to_pandas()
                               
    return train

## Function To Generate All Technical Analysis Features
`reduce_mem_usage` is applied after every function call to handle memory

In [None]:
def generate_ta(train):

    train = generate_rsi(train)
    train = generate_bbands(train)
    
    train = reduce_mem_usage(train) 
    
    train = generate_atr(train)    
    train = generate_macd(train)
    
    train = reduce_mem_usage(train)
    
    train = generate_HV(train)
    train = generate_keltner(train)
    
    train = reduce_mem_usage(train)
    
    return train

## Standardization

In [11]:
def standardize_by_group(df, by):
    
    columns = ['imbalance_size', 'matched_size', 'ref_mid_spread_ratio', 'far_near_ratio']
    
    groups = df.groupby(by)
    
    mean = groups[columns].transform('mean')
    std = groups[columns].transform('std')
    normalized = (df[columns] - mean) / std

    merged = df.merge(
        normalized,
        how='outer', 
        left_index = True, 
        right_index = True, 
        suffixes = ('_x', '_standardized')
        )
    
    merged = merged[[x for x in merged.columns.tolist() if '_x' not in x]]
    
    return merged

## Correlation

In [12]:
def correl(dictionary, cols, train):
    
    pl_df = pl.from_pandas(train)

    # Aggregates Data to get the mean as the market Data
    for col_name in cols:
        market_movement = pl_df.select(['date_id', 'seconds_in_bucket', col_name]).group_by(['date_id', 'seconds_in_bucket']).mean().rename({col_name : 'market'})
        stock_movement = pl_df.select(['stock_id', 'date_id', 'seconds_in_bucket', col_name]).pivot(columns = 'stock_id', values = col_name, index = ['date_id', 'seconds_in_bucket'])

        correl = (market_movement.join(stock_movement, how = 'left',
                left_on = ['date_id','seconds_in_bucket'],
                right_on = ['date_id','seconds_in_bucket']).drop(['date_id','seconds_in_bucket']))

        correl = correl.to_pandas()

        correlation = \
            correl['market'] \
                .iloc[:-1,] \
                    .reset_index() \
                        .rename(
                            columns = {'index' : 'stock_id', 
                                    'market' : f'correl_{col_name}'}
                            )

        del correl, market_movement
        
        dictionary[f'correl_{col_name}'] = correlation[f'correl_{col_name}'].to_dict()
        
    return dictionary

def correlation_features(train):
    
    correl_columns = ['reference_price_ask_price_imb', 'reference_price_bid_price_imb', 'target',
               'far_price_ask_price_bid_price_imb2', 'far_near_spread_ratio', 'reference_price_far_price_near_price_imb2', 
                'liquidity_imbalance', 'market_urgency', 'price_diff_auction_vs_non_auction', 'log_bid_vol', 'bid_ask_ratio', 
              'bid_size', 'ask_size', 'ref_mid_spread_ratio', 'imbalance_buy_sell_flag', 'bid_price', 'ask_price', 'mid_price', 'reference_price']
    
    correl_feats = {}

    correl_feats = correl(correl_feats, correl_columns, train)
    
    return correl_feats

def map_correl_feats(df, feats):
    for key, values in feats.items():

        df[key] = df['stock_id'].map(values)

    return df


## Generate All Features

In [13]:
def generate_all_features(train, correl_feats = None):
    
    train = generate_single_features(train)
    train = generate_rolling_features(train)
    stand_df = train[['stock_id','imbalance_size', 'matched_size', 'ref_mid_spread_ratio', 'far_near_ratio']]
    train = generate_imb_features(train)
    train = generate_lag_features(train)
    train = generate_ta(train)

    if flag != 'test':
        correl_feats = correlation_features(train)
        train = standardize_by_group(train, 'stock_id')
        
    train = map_correl_feats(train, correl_feats)

    train = memory_management(train, useless)
    
    train = train.replace([np.inf], 9999)
    train = train.replace([-np.inf], -9999)
    
    if flag != 'test':
        return train, stand_df, correl_feats
    elif flag == 'test':
        return train

train1, stand_df, correl_feats = generate_all_features(train)
print(len(train1.columns))

Memory usage of dataframe is 2637.42 MB
Memory usage after optimization is: 819.20 MB
Decreased by 68.94%
Memory usage of dataframe is 1578.46 MB
Memory usage after optimization is: 1088.94 MB
Decreased by 31.01%
Memory usage of dataframe is 6353.79 MB
Memory usage after optimization is: 2662.40 MB
Decreased by 58.10%
Memory usage of dataframe is 4540.56 MB
Memory usage after optimization is: 2972.09 MB
Decreased by 34.54%
Memory usage of dataframe is 5040.07 MB
Memory usage after optimization is: 3201.87 MB
Decreased by 36.47%
261


# Model Specification

In [14]:
from sklearn.model_selection import train_test_split, KFold
import lightgbm as lgb
from sklearn.metrics import mean_absolute_error as mae

In [15]:
X = train1.drop(columns = ['date_id', 'target', 'row_id', 'time_id'])
Y = train1['target']

date_ids = train1['date_id'].values

del train1

y_min = np.min(Y)
y_max = np.max(Y)

In [16]:
def importance(final_model):
    importance = pd.DataFrame({
        'feature_name': final_model.booster_.feature_name(),
        'importance_gain': final_model.booster_.feature_importance(importance_type='gain'),
        'importance_split': final_model.booster_.feature_importance(importance_type='split'),
    }).sort_values('importance_gain', ascending=False).reset_index(drop=True)

    importance['pct'] = importance.importance_gain / importance.importance_gain.sum()
    importance['running'] = importance.pct.cumsum()
    
    return importance

# Model Training

In [17]:
num_folds = 5
models = []

if flag != 'submission':
    lgb_params = {
        'objective': 'mae',
        'n_estimators': 8000,
        'num_leaves': 32,
        'max_depth': 122,
        'learning_rate': 0.008,
        'device': 'gpu',
        'n_jobs': 4,
        'verbosity': -1,
        'importance_type': 'gain'
    }

    feature_name = list(X.columns)

    # The total number of date_ids is 480, we split them into 5 folds
    fold_size = 480 // num_folds
    gap = 5
    
    scores = []

    for i in range(num_folds):
        start = i * fold_size
        end = start + fold_size

    # Purging
        if i < num_folds - 1:  # No need to purge after the last fold
            purged_start = end - 2
            purged_end = end + gap + 2
            train_indices = (date_ids >= start) & (date_ids < purged_start) | (date_ids > purged_end)
        else:
            train_indices = (date_ids >= start) & (date_ids < end)

        test_indices = (date_ids >= end) & (date_ids < end + fold_size)

        # End Purge

        # Train Test Split
        df_fold_train = X[train_indices]
        df_fold_train_target = Y[train_indices]
        df_fold_valid = X[test_indices]
        df_fold_valid_target = Y[test_indices]

        print(f'Fold {i+1} model Training')

        # Train a LightGBM model for the current fold
        lgb_model = lgb.LGBMRegressor(**lgb_params)
        lgb_model.fit(
            df_fold_train[feature_name],
            df_fold_train_target,
            eval_set=[(df_fold_valid[feature_name], df_fold_valid_target)],
            callbacks=[
                lgb.callback.early_stopping(stopping_rounds=100),
                lgb.callback.log_evaluation(period=100),
            ],
        )

        # Append the model to the list
        models.append(lgb_model)
        model_filename = f'/kaggle/working/lgbm_{i+1}.pkl'
        joblib.dump(lgb_model, model_filename)

        # Evaluate model performance on the validation set
        fold_predictions = lgb_model.predict(df_fold_valid[feature_name])
        fold_score = mae(fold_predictions, df_fold_valid_target)
        scores.append(fold_score)
        print(f'Fold {i+1} MAE: {fold_score}')

        col_importance = importance(lgb_model)
        print(col_importance[col_importance.running < 0.80])
        print(col_importance[col_importance.running > 0.95].feature_name.tolist())
        print('COMPLETELY uselesss features')
        print(col_importance[col_importance.pct == 0].feature_name.tolist())

        # Free up memory by deleting fold specific variables
        del df_fold_train, df_fold_train_target, df_fold_valid, df_fold_valid_target
        gc.collect()

    # Calculate the average best iteration from all regular folds
    average_best_iteration = int(np.mean([model.best_iteration_ for model in models]))

    # Update the lgb_params with the average best iteration
    final_model_params = lgb_params.copy()
    final_model_params['n_estimators'] = average_best_iteration

    print(f'Training final model with average best iteration: {average_best_iteration}')

    # Train the final model on the entire dataset
    final_model = lgb.LGBMRegressor(**final_model_params)
    final_model.fit(
        X[feature_name],
        Y,
        callbacks=[
            lgb.callback.log_evaluation(period=100),
        ],
    )

    # Append the final model to the list of models
    models.append(final_model)
    model_filename = f'/kaggle/working/lgbm_6.pkl'
    joblib.dump(final_model, model_filename)

    # Average scores for all models
    print(f'Average MAE across all folds: {np.mean(scores)}')

# Test Data Processing

In [18]:
def standardize_new_data(train, new, by):
    columns = ['imbalance_size', 'matched_size', 'ref_mid_spread_ratio', 'far_near_ratio']
    groups = train.groupby(by)
    means = groups[columns].mean().reset_index()
    means = means.rename(columns={c: c+'_mean' for c in means.columns if c not in ['stock_id']})
    stds = groups[columns].std().reset_index()
    stds = stds.rename(columns={c: c+'_std' for c in stds.columns if c not in ['stock_id']})

    feat = new \
        .merge(
            means, 
            how = 'inner', 
            left_on = 'stock_id', 
            right_on = 'stock_id'
        ) \
            .merge(
                stds, 
                how = 'inner', 
                left_on = 'stock_id', 
                right_on = 'stock_id'
            )
    
    for column in columns:
        col_mean = column + '_mean'
        col_std = column + '_std'
        new_col = column + '_standardized'
        feat[new_col] = (feat[column] - feat[col_mean]) / feat[col_std]

    return feat

# Competition Submission

In [19]:
if flag == 'submission':
    def zero_sum(prices, volumes):
        std_error = np.sqrt(volumes)
        step = np.sum(prices)/np.sum(std_error)
        out = prices-std_error*step

        return out

    import optiver2023
    env = optiver2023.make_env()
    iter_test = env.iter_test()

    flag = 'test'
    counter = 0
    keep_col = [x for x in X.columns.tolist() if '_x' not in x]
    
    models = []
    for i in range(num_folds + 1):
        model_filename = f'/kaggle/input/trained-models/lgbm_{i+1}.pkl'
        m = joblib.load(model_filename)
        models.append(m)

    # equal weights for each model
    model_weights = [1 / len(models)] * len(models)

    # placeholder DF to cache tests (for computing rolling features)
    cache = pd.DataFrame()

    qps = []


    for (test, revealed_targets, sample_prediction) in iter_test:
        
        test1 = test.copy()

        test = test.drop('currently_scored', axis=1)
        now_time = time.time()   
        cache = pd.concat([cache, test], ignore_index=True, axis=0)

        if counter > 0:
            cache = cache.groupby(['stock_id']).tail(21).sort_values(by=['date_id', 'seconds_in_bucket', 'stock_id']).reset_index(drop=True)

        test = generate_all_features(cache, correl_feats)[-len(test):]
        test = \
        standardize_new_data(
            stand_df, 
            test, 
            'stock_id'
        )

        test = test[keep_col]  

        predictions = np.zeros(len(test))

        for model, weight in zip(models, model_weights):
            
            predictions += weight * model.predict(test)

        predictions = zero_sum(predictions, test1['bid_size'] + test1['ask_size'])

        clipped_predictions = np.clip(predictions, y_min, y_max)

        sample_prediction['target'] = clipped_predictions

        env.predict(sample_prediction)
        counter += 1
        qps.append(time.time() - now_time)

        if counter % 10 == 0:
            print(f'{counter} queries per second: {np.mean(qps)}')

    time_cost = 1.146 * np.mean(qps)
    print(f'The code will take approximately {np.round(time_cost, 2)} hours')

This version of the API is not optimized and should not be used to estimate the runtime of your code on the hidden test set.
10 queries per second: 1.673547124862671
20 queries per second: 1.6412729263305663
30 queries per second: 1.6507878065109254
40 queries per second: 1.6520367562770844
50 queries per second: 1.657225294113159
60 queries per second: 1.6639484484990439
70 queries per second: 1.691105318069458
80 queries per second: 1.6985257685184478
90 queries per second: 1.6995767831802369
100 queries per second: 1.702619800567627
110 queries per second: 1.7015141833912242
120 queries per second: 1.7143314599990844
130 queries per second: 1.7237979558797982
140 queries per second: 1.7240573780877249
150 queries per second: 1.7218062957127889
160 queries per second: 1.720225267112255
The code will take approximately 1.97 hours
