# 1. Factor Engineering

In [25]:
import numpy as np
import plotly.express as px

import pandas as pd
import polars as pl
import glob
import os

list_book_train = glob.glob(
    'data/optiver-realized-volatility-prediction/book_train.parquet/*')
list_trade_train = glob.glob(
    'data/optiver-realized-volatility-prediction/trade_train.parquet/*')
train_data = pl.read_csv(
    'data/optiver-realized-volatility-prediction/train.csv')

In [None]:
def raise_if_nan(df: pl.DataFrame):
    nan_counts = df.select([pl.col(col).is_nan().sum().alias(col) for col in df.columns])
    total_nan = nan_counts.to_series().sum()
    if total_nan > 0:
        raise ValueError("DataFrame contains NaN values.")

## 1.1 Feature Generation
### 1.1.1 Book Features
We first processes raw order book data and extracts meaningful features for modeling. The following table is the summary of the feature we generated from the original datasets.

| Feature Name            | Formula                                                                                                                                         | Reason for Inclusion                                                                                         |
| ----------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------ |
| **`wap1/2`**              | $ \frac{\text{ask\_price}_{1/2} \cdot \text{bid\_size}_{1/2} + \text{bid\_price}_{1/2} \cdot \text{ask\_size}_{1/2}}{\text{ask\_size}_{1/2} + \text{bid\_size}_{1/2}}$ | Weighted Average Price at depth 1/2. This smooths out the impact of individual order prices using size as weight. It approximates the "fair" market price better than simple mid-price. Including wap2 captures more of the limit order book’s shape. 
| **`bid_size_diff`**     | $\text{bid\_size}_1 - \text{bid\_size}_2$                                                                                      | Captures the change in demand depth between level 1 and level 2. Large differences may imply lower resilience in the order book.                  |
| **`ask_size_diff`**     | $\text{ask\_size}_1-\text{ask\_size}_2$                                                                                | Measures ask-side change. Can signal a lack of liquidity on the sell side or increased selling pressure.                              |
| **`price_spread`**      | $ \frac{\text{ask\_price}_1}{\text{bid\_price}_1} - 1$                                                                                |A measure of transaction cost and short-term illiquidity.       |
| **`order_imbalance_1/2`** | $\frac{\text{bid\_size1/2} - \text{ask\_size1/2}}{\text{bid\_size1/2} + \text{ask\_size1/2}}$                                                     | Measures demand-supply pressure at level 1/2. Positive values imply buying pressure, negative values suggest selling pressure.              
| **`depth_ratio`**       | $\frac{\text{bid\_size}_1 + \text{bid\_size}_2}{\text{ask\_size}_1 + \text{ask\_size}_2}$                                             | Compares total buy-side depth to sell-side depth. Values > 1 suggest buy-side dominance.                                                   |
| **`total_volume`**      | $ \text{bid\_size}_1 + \text{bid\_size}_2 + \text{ask\_size}_1 + \text{ask\_size}_2$                                                  | Captures overall liquidity in the limit order book. High volume typically corresponds to tighter spreads and lower volatility.             |
| **`wap_diff`**          | $ \text{wap1} - \text{wap2}$                                                                                                              | Measures how much prices diverge between the top and next levels of the book. Large values may indicate price pressure or volatility. Useful for detecting price trends or pressure. |
| **`log_return1/2`**       | $ \log\left( \frac{\text{wap1/2}_t}{\text{wap1/2}_{t-1}} \right)$                                                                        | Log return of using wap1/2. Used in volatility estimation and price dynamics modeling.                        |


In [21]:
output_dir_book_features = 'data/training_data/book_features'
os.makedirs(output_dir_book_features, exist_ok=True)

for file_path in list_book_train:
    stock_id = int(file_path.split('=')[1])
    df = pl.read_parquet(file_path)

    # Step 1: Add price-level and liquidity-related features
    df = df.with_columns([
        # wap1/2
        ((pl.col("ask_price1") * pl.col("bid_size1") + pl.col("bid_price1") * pl.col("ask_size1")) /
            (pl.col("ask_size1") + pl.col("bid_size1"))
         ).alias("wap1"),
        ((pl.col("ask_price2") * pl.col("bid_size2") + pl.col("bid_price2") * pl.col("ask_size2")) /
            (pl.col("ask_size2") + pl.col("bid_size2"))
         ).alias("wap2"),
        # bid_size_diff, ask_size_diff
        (pl.col("bid_size1") - pl.col("bid_size2")-1).alias("bid_size_diff"),
        (pl.col("ask_size1") / pl.col("ask_size2")-1).alias("ask_size_diff"),
        # price_spread
        (pl.col("ask_price1") / pl.col("bid_price1")-1).alias("price_spread"),
        # order_imbalance_1/2
        ((pl.col("bid_size1") - pl.col("ask_size1")) /
         (pl.col("bid_size1") + pl.col("ask_size1"))).alias("order_imbalance_1"),
        ((pl.col("bid_size2") - pl.col("ask_size2")) /
         (pl.col("bid_size2") + pl.col("ask_size2"))).alias("order_imbalance_2"),
        # depth_ratio
        ((pl.col("bid_size1")+pl.col("bid_size2")) /
         (pl.col("ask_size1") + pl.col("ask_size2"))).alias("depth_ratio"),
        # total_volume
        (pl.col("bid_size1")+pl.col("bid_size2")+pl.col("ask_size1") +
         pl.col("ask_size2")).alias("total_volume")
    ])

    # Step 2: Add dynamic (time-based) features
    df = df.with_columns([
        # wap_diff
        (pl.col("wap1")-pl.col("wap2")).alias("wap_diff"),
        # log_return1/2
        ((pl.col("wap1") / pl.col("wap1").shift(1)).log()).alias("log_return1"),
        ((pl.col("wap2") / pl.col("wap2").shift(1)).log()).alias("log_return2")
    ])

    # Step 3: Drop raw order book columns to reduce storage and redundancy
    df = df.drop([
        "ask_price1", "ask_price2", "ask_size1", "ask_size2",
        "bid_price1", "bid_price2", "bid_size1", "bid_size2"
    ])

    # Rows with null values for log_return1/2 are dropped. These nulls are a result of the `.diff()` operation within the calculation.
    df = df.drop_nulls(["log_return1", "log_return2"])
    raise_if_nan(df)

    # Step 4: Save processed features per stock
    file_name = f"stock_id={stock_id}"
    file_path_o = os.path.join(output_dir_book_features, file_name)
    df.write_parquet(file_path_o)

In [22]:
pl.read_parquet('data/training_data/book_features/stock_id=5').head()

time_id,seconds_in_bucket,wap1,wap2,bid_size_diff,ask_size_diff,price_spread,order_imbalance_1,order_imbalance_2,depth_ratio,total_volume,wap_diff,log_return1,log_return2
i16,i16,f64,f64,i32,f64,f32,f64,f64,f64,i32,f64,f64,f64
5,1,1.001327,1.000508,296,-0.636364,0.001599,0.974359,0.0,21.266667,334,0.000819,5e-06,0.0
5,3,1.001329,1.000508,316,-0.636364,0.001599,0.975904,0.0,22.6,354,0.000821,1e-06,0.0
5,4,1.001331,1.000508,366,-0.636364,0.001599,0.979058,0.0,25.933333,404,0.000823,3e-06,0.0
5,5,1.001321,1.000812,206,-0.636364,0.001572,0.965517,0.3125,16.6,264,0.000509,-1e-05,0.000304
5,6,1.001328,1.001023,267,-0.636364,0.001545,0.974359,0.568627,23.2,363,0.000306,7e-06,0.00021


### 1.1.2 10-Minute Time Window Book Features
This code performs 10min-time-interval aggregation on order book data. It groups data by time_id within specified time intervals and computes key features such as realized volatility, logarithmic price range, and various mean values related to prices, order imbalances, and volume. The aggregated features are then saved as Parquet files named by stock ID, enabling efficient downstream analysis and modeling. 

In [23]:
def aggregate_interval_book(df: pl.DataFrame, interval=None, interval_length=None) -> pl.DataFrame:
    if interval:
        start = interval * interval_length
        end = (interval + 1) * interval_length

        df_interval = df.filter((pl.col("seconds_in_bucket") >= start) & (
            pl.col("seconds_in_bucket") < end))
    else:
        df_interval = df
    df_agg = df_interval.group_by("time_id").agg(

        # realized_volatility
        (pl.col("log_return1").pow(2).sum()).sqrt().alias("realized_volatility1"),
        (pl.col("log_return2").pow(2).sum()).sqrt().alias("realized_volatility2"),
        
        # high_low_range
        (pl.col("wap1").max() / pl.col("wap1").min()).log().alias("high_low_range"),

        # simple means
        pl.col("wap1").mean().alias("wap1_mean"),
        pl.col("wap2").mean().alias("wap2_mean"),
        pl.col("wap_diff").mean().alias("wap_diff_mean"),
        pl.col("order_imbalance_1").mean().alias("order_imbalance_1_mean"),
        pl.col("order_imbalance_2").mean().alias("order_imbalance_2_mean"),
        pl.col("depth_ratio").mean().alias("depth_ratio_mean"),
        pl.col("total_volume").mean().alias("total_volume_mean"),
        pl.col("price_spread").mean().alias("price_spread_mean"),

        pl.col("bid_size_diff").mean().alias("bid_size_diff_mean"),
        pl.col("ask_size_diff").mean().alias("ask_size_diff_mean"))

    if interval != None:
        df_agg = df_agg.rename({col: f"{col}_{interval}" if col !=
                               "time_id" else "time_id" for col in df_agg.columns})
    return df_agg

In [24]:
list_book_feature= glob.glob('data/training_data/book_features/*')
output_dir_book_10min = 'data/training_data/book_features_10min'
os.makedirs(output_dir_book_10min, exist_ok=True)

for file_path in list_book_feature:
    df = pl.read_parquet(file_path)

    stock_id = int(file_path.split('=')[1])
    stats = aggregate_interval_book(df)
    raise_if_nan(stats)
    file_name = f"stock_id={stock_id}"
    file_path_o = os.path.join(output_dir_book_10min, file_name)

    stats.write_parquet(file_path_o)

In [16]:
pl.read_parquet('data/training_data/book_features_10min/stock_id=5').head()

time_id,realized_volatility1,realized_volatility2,high_low_range,wap1_mean,wap2_mean,wap_diff_mean,order_imbalance_1_mean,order_imbalance_2_mean,depth_ratio_mean,total_volume_mean,price_spread_mean,bid_size_diff_mean,ask_size_diff_mean
i16,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f32,f64,f64
32646,0.007767,0.010397,0.002783,0.99851,0.998529,-1.9e-05,-0.07288,0.018719,4.618142,261.519553,0.0013,11.96648,8.515984
1057,0.003417,0.007791,0.004666,1.00184,1.00187,-3e-05,0.081909,0.170616,3.744964,275.017778,0.000674,0.933333,16.639692
6961,0.006789,0.009901,0.002613,0.999476,0.999361,0.000116,0.11866,0.022966,7.526675,257.713018,0.00109,32.446746,20.148408
14449,0.002718,0.003452,0.001743,0.999671,0.999832,-0.000161,0.328045,0.508676,16.257335,156.810185,0.000488,5.12037,11.067204
8938,0.006302,0.010583,0.007024,0.99933,0.99915,0.000181,0.216479,-0.078864,8.752918,216.449231,0.001351,41.96,6.384912


### 1.1.3 150-Second Time Segment Book Features
This code processes order book data by dividing each trading period into fixed-length intervals (in this case, 4 intervals of 150 seconds each). For each stock file, it aggregates features separately for each time interval using the aggregate_interval_book function, and then merges these interval-based feature sets into a single DataFrame keyed by time_id. The resulting combined features capture finer-grained temporal dynamics within the trading period. Finally, the aggregated data is saved as Parquet files organized by stock ID.

In [17]:
list_book_features_10min = glob.glob(
    'data/training_data/book_features_10min/*')
output_dir_book_150sec = 'data/training_data/book_features_150sec'
os.makedirs(output_dir_book_150sec, exist_ok=True)

n_intervals = 4
interval_length = 150
for file_path in list_book_feature:
    stock_id = int(file_path.split('=')[1].split('.')[0])
    df = pl.read_parquet(file_path)

    merged_df = None
    for interval in range(n_intervals):
        df_agg = aggregate_interval_book(df, interval, interval_length)
        merged_df = df_agg if merged_df is None else merged_df.join(
            df_agg, on="time_id", how="left")
    raise_if_nan(merged_df)
    file_name = f"stock_id={stock_id}"
    file_path_o = os.path.join(output_dir_book_150sec, file_name)
    merged_df.write_parquet(file_path_o)

In [18]:
pl.read_parquet('data/training_data/book_features_150sec/stock_id=5').head()

time_id,realized_volatility1_0,realized_volatility2_0,high_low_range_0,wap1_mean_0,wap2_mean_0,wap_diff_mean_0,order_imbalance_1_mean_0,order_imbalance_2_mean_0,depth_ratio_mean_0,total_volume_mean_0,price_spread_mean_0,bid_size_diff_mean_0,ask_size_diff_mean_0,realized_volatility1_1,realized_volatility2_1,high_low_range_1,wap1_mean_1,wap2_mean_1,wap_diff_mean_1,order_imbalance_1_mean_1,order_imbalance_2_mean_1,depth_ratio_mean_1,total_volume_mean_1,price_spread_mean_1,bid_size_diff_mean_1,ask_size_diff_mean_1,realized_volatility1_2,realized_volatility2_2,high_low_range_2,wap1_mean_2,wap2_mean_2,wap_diff_mean_2,order_imbalance_1_mean_2,order_imbalance_2_mean_2,depth_ratio_mean_2,total_volume_mean_2,price_spread_mean_2,bid_size_diff_mean_2,ask_size_diff_mean_2,realized_volatility1_3,realized_volatility2_3,high_low_range_3,wap1_mean_3,wap2_mean_3,wap_diff_mean_3,order_imbalance_1_mean_3,order_imbalance_2_mean_3,depth_ratio_mean_3,total_volume_mean_3,price_spread_mean_3,bid_size_diff_mean_3,ask_size_diff_mean_3
i16,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f32,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f32,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f32,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f32,f64,f64
30547,0.006015,0.008576,0.00404,0.999129,0.999,0.000128,0.215959,0.121593,8.336699,296.849765,0.001229,-17.981221,10.679583,0.004141,0.004387,0.002749,0.999521,0.999645,-0.000124,-0.149258,0.060261,1.373644,391.237288,0.001213,-67.508475,6.871591,0.00311,0.002826,0.002848,0.999084,0.999033,5.1e-05,0.117405,0.25692,9.407518,204.137255,0.001213,-19.235294,1.377748,0.00264,0.005645,0.001527,0.99819,0.997606,0.000584,0.569069,-0.122372,9.852087,273.727273,0.001335,30.681818,1.17684
5392,0.004102,0.005832,0.004712,0.996065,0.996041,2.4e-05,-0.333597,-0.33047,1.668498,242.944853,0.000585,-37.220588,5.630982,0.001576,0.003042,0.001538,0.99515,0.995156,-6e-06,-0.460082,-0.371156,1.698817,207.507042,0.00063,-28.732394,7.330091,0.002074,0.003149,0.001633,0.995306,0.995424,-0.000118,-0.261058,0.116803,3.118714,283.716418,0.000635,-61.41791,9.35995,0.001011,0.001545,0.001509,0.995001,0.994866,0.000135,-0.396953,-0.597733,0.938764,237.282609,0.000463,-11.478261,-0.180047
9343,0.006852,0.008016,0.005926,0.996283,0.996348,-6.5e-05,-0.086474,0.123769,12.706524,161.668394,0.000671,-26.541451,5.714986,0.002872,0.004387,0.005588,0.996119,0.996047,7.2e-05,-0.117447,-0.202635,2.183346,132.661017,0.000656,-14.220339,3.370797,0.001797,0.00278,0.001795,0.994126,0.994148,-2.3e-05,-0.588484,-0.367322,2.905517,157.494624,0.000732,-18.419355,4.95948,0.002422,0.003006,0.002415,0.995686,0.995829,-0.000143,0.127995,0.462921,24.365892,149.030769,0.000681,-14.261538,5.995368
13276,0.003559,0.004024,0.001621,0.999992,0.999828,0.000164,-0.305966,-0.571466,0.632861,277.314815,0.000677,34.808642,16.296238,0.000959,0.001546,0.000653,1.000159,0.999992,0.000167,-0.458335,-0.731747,0.307994,245.409091,0.0006,45.568182,21.856908,0.001043,0.001212,0.001219,0.99965,0.999605,4.5e-05,-0.495523,-0.587422,0.70244,204.023256,0.000455,21.348837,8.035399,0.001072,0.001007,0.001201,0.999747,0.999244,0.000503,0.097511,-0.654429,1.055237,168.111111,0.001051,63.444444,5.457412
18483,0.009602,0.009498,0.004285,1.003473,1.003471,2e-06,0.078312,-0.00153,2.079671,254.763636,0.001309,26.486364,9.833753,0.002614,0.003352,0.00156,1.002805,1.002739,6.6e-05,0.14277,0.038894,1.507675,363.73913,0.001424,46.0,3.18549,0.003051,0.00511,0.002508,1.002967,1.00288,8.7e-05,0.14606,0.032599,1.269478,217.241935,0.001423,30.016129,5.916149,0.003932,0.003798,0.002703,1.004749,1.004891,-0.000142,0.012923,0.151611,3.877646,202.971429,0.001203,8.671429,22.712517


### 1.1.4 Trade Features 
This code processes trade data for stocks, computing additional features such as average trade size per order and the logarithmic return of trade prices. The logarithmic return is calculated as the log of the ratio between the current price and the previous price. Rows with null values resulting from this calculation (due to missing previous prices) are removed to ensure data quality. The cleaned and enriched DataFrame is then saved as Parquet files, organized by stock ID.

In [27]:
output_dir_trade_features = 'data/training_data/trade_features'
os.makedirs(output_dir_trade_features, exist_ok=True)

for file_path in list_trade_train:
    stock_id = int(file_path.split('=')[1])
    df = pl.read_parquet(file_path)

    df = df.with_columns([
        # size_per_order
        (pl.col("size")/pl.col("order_count")).alias("size_per_order"),
        # trade_log_return
         ((pl.col("price") / pl.col("price").shift(1)).log()).alias("trade_log_return")
    ])

    # Rows with null values for trade_log_return are dropped. These nulls are a result of the `.diff()` operation within the calculation.
    df = df.drop_nulls(["trade_log_return"])
    raise_if_nan(df)
    file_name = f"stock_id={stock_id}"
    file_path_o = os.path.join(output_dir_trade_features, file_name)
    df.write_parquet(file_path_o)

In [28]:
pl.read_parquet('data/training_data/trade_features/stock_id=5').head()

time_id,seconds_in_bucket,price,size,order_count,size_per_order,trade_log_return
i16,i16,f32,i32,i16,f64,f32
5,14,1.001395,6,4,1.5,0.000304
5,15,1.001639,2,2,1.0,0.000244
5,25,1.001375,1,1,1.0,-0.000264
5,31,1.00128,2,1,2.0,-9.5e-05
5,38,1.001009,4,1,4.0,-0.000271


### 1.1.5 10-Minute Time Window Trade Features
This code aggregates trade data into 10-minute intervals by grouping on time_id. For each interval, it calculates various statistical features, including realized volatility from squared log returns, price range, mean price, mean order size, total traded size, and standard deviation of trade sizes. The processed features are saved to separate Parquet files for each stock, enabling efficient downstream analysis.

In [52]:
list_trade_feature = glob.glob('data/training_data/trade_features/*')
output_dir_trade_10min = 'data/training_data/trade_features_10min'
os.makedirs(output_dir_trade_10min, exist_ok=True)

for file_path in list_trade_feature:
    df = pl.read_parquet(file_path)

    stock_id = int(file_path.split('=')[1])

    stats = df.group_by("time_id").agg(

        # trade_realized_volatility
        (pl.col("trade_log_return").pow(2).sum()).sqrt().alias(
            "trade_realized_volatility"),

        # trade_price_range
        (pl.col("price").max() / pl.col("price").min()
         ).log().alias("trade_price_range"),

        # means
        pl.col("price").mean().alias("trade_price_mean"),
        pl.col("size_per_order").mean().alias("size_per_order_mean"),
        pl.col("order_count").mean().alias("order_count_mean"),

        # sums
        pl.col("size").sum().alias("total_size"),
        pl.col("order_count").sum().alias("total_order"),

        # std
        pl.col("size").std().alias("size_std"))
    stats = stats.drop_nulls(["size_std"])
    raise_if_nan(stats)
    file_name = f"stock_id={stock_id}"
    file_path_o = os.path.join(output_dir_trade_10min, file_name)

    stats.write_parquet(file_path_o)

In [53]:
pl.read_parquet('data/training_data/trade_features_10min/stock_id=5')

time_id,trade_realized_volatility,trade_price_range,trade_price_mean,size_per_order_mean,order_count_mean,total_size,total_order,size_std
i16,f32,f32,f32,f64,f64,i32,i64,f64
31616,0.005685,0.006168,0.999802,20.183201,3.62963,2667,98,135.384732
19034,0.001793,0.003602,1.000157,9.846065,2.75,1121,66,81.35776
5648,0.00262,0.00382,1.001939,7.303922,2.294118,491,39,46.120606
12728,0.00577,0.001369,1.000697,18.029984,3.157895,1091,60,85.751784
8664,0.004336,0.003202,1.000435,8.2861,2.5,1204,65,145.031519
…,…,…,…,…,…,…,…,…
23836,0.003161,0.001728,0.999713,32.433729,3.076923,2432,80,151.024033
1688,0.005305,0.006925,0.99995,12.74,2.566667,1482,77,68.649334
5728,0.00785,0.017071,0.994098,21.780793,7.705882,6251,262,248.185309
4037,0.004132,0.00689,1.002741,19.911834,4.97619,6432,209,434.57608
