#1 Answer

Choosing a model for temporary market impact, $g_t(x)$, is critical for any effective order execution system. A simple linear model, $g_t(x)=beta_tx$, is a significant oversimplification. It incorrectly assumes that each share in an order has the same marginal cost. This is not how it works. The structure of a limit order book, with its discrete price levels and finite liquidity, means that larger orders must go through multiple levels in the order book consuming liquidity at progressively worse prices. This results in a non-linear concave impact function where impact grows more slowly than order size.

Analysis of the three tickers(SOUN,FROG,CRWV) demonstrates that no single fixed formula can reliably model market impact across different stocks. The best approach is an empirical journey, starting with simple models and progressing to more powerful, data driven methods.

# Square Root Model and Its Limitations
A strong theoretical starting point is the Square Root Impact Model, g_t(x)=
sigma_t*sqrt(x).However, when validated against the three tickers, its limitations became clear, as measured by the R-squared value


* SOUN(High-Liquidity): The model failed completely. The analysis showed consistently negative R-squared values, meaning its predictions were worse than a simple average.
* FROG(Medium-Liquidity): The model was inconsistent. It achieved a positive average R-squared of 0.21 but the fit was highly volatile.


* CRWV (Illiquid): The model was unreliable, with a volatile fit and an average R-squared of approximately 0.33.

While a Piecewise Model offers an improvement by penalizing large orders more heavily, it still relies on the rigid square root assumption.






# The Superior Solution: A Gradient Boosting Model
This machine learning model learns the complex patterns of market impact directly from the data without being confined to a fixed formula.ts superiority was demonstrated by its exceptional performance across all three stocks when using all 10 levels of order book data as features:


*   $SOUN(High Liquidity): R^2 = 0.5043$
*   $FROG(Medium Liquidity): R^2= 0.6022$


*   $CRWV(Illiquid):: R^2= 0.4847$

The Gradient Boosting model succeeded because it leverages a rich set of features including the spread, liquidity imbalance, and the depth of the entire order book to make highly accurate, context aware predictions for each specific stock.






In [1]:
import os
import json
import random
import warnings

import numpy as np
import pandas as pd

warnings.filterwarnings("ignore", category=FutureWarning)

# Modeling
USE_LGBM = True
try:
    import lightgbm as lgb
except Exception:
    USE_LGBM = False
    from sklearn.ensemble import GradientBoostingRegressor as FallbackGBR

from sklearn.model_selection import GroupShuffleSplit
from sklearn.metrics import r2_score, mean_absolute_error

# -------------------------
# Utility: Seeding
# -------------------------
def set_seed(seed=22):
    np.random.seed(seed)
    random.seed(seed)
    try:
        import torch
        torch.manual_seed(seed)
    except Exception:
        pass


In [2]:
def calculate_true_impact(row, order_size):
    bid0 = row.get("bid_px_00")
    ask0 = row.get("ask_px_00")
    if pd.isna(bid0) or pd.isna(ask0):
        return np.nan

    mid = (bid0 + ask0) / 2.0
    remaining, total_cost, filled = order_size, 0.0, 0.0

    for i in range(10):
        price = row.get(f"ask_px_{i:02d}")
        size = row.get(f"ask_sz_{i:02d}")
        if price is None or size is None or pd.isna(price) or pd.isna(size) or size <= 0:
            continue

        fill = min(remaining, size)
        total_cost += fill * price
        filled += fill
        remaining -= fill
        if remaining <= 0:
            break

    if remaining > 0 or filled == 0:
        return np.nan

    exec_avg = total_cost / filled
    return (exec_avg - mid) / mid


In [3]:
def find_timestamp_column(df):
    candidates = ["timestamp", "time", "datetime", "date", "ts"]
    for c in df.columns:
        if c.lower() in candidates:
            return c
    for c in df.columns:
        try:
            pd.to_datetime(df[c].head(5))
            return c
        except Exception:
            pass
    return None

In [4]:
def create_feature_dataset(df, num_snapshots=10000, test_sizes=None,
                           ts_col=None, normalize=False, random_state=42):
    rng = np.random.default_rng(random_state)

    df = df.reset_index(drop=False).rename(columns={"index": "snapshot_id"})
    if ts_col is not None and ts_col in df.columns:
        try:
            df[ts_col] = pd.to_datetime(df[ts_col])
        except Exception:
            pass

    if test_sizes is None:
        test_sizes = [50, 100, 200, 500, 1_000, 2_000, 5_000, 10_000, 100_000]

    n = len(df)
    k = min(num_snapshots, n)
    sample_indices = rng.choice(n, size=k, replace=False)

    rows = []
    for idx in sample_indices:
        row = df.iloc[idx]
        bid0, ask0 = row.get("bid_px_00"), row.get("ask_px_00")
        if pd.isna(bid0) or pd.isna(ask0):
            continue

        mid = (bid0 + ask0) / 2.0
        ask_liq = sum(row.get(f"ask_sz_{i:02d}", 0) for i in range(10))
        bid_liq = sum(row.get(f"bid_sz_{i:02d}", 0) for i in range(10))
        depth = ask_liq + bid_liq + 1e-9

        for size in test_sizes:
            impact = calculate_true_impact(row, size)
            if pd.isna(impact):
                continue

            feat = {
                "snapshot_id": row["snapshot_id"],
                "order_size": size,
                "target_impact": impact,
                "mid": mid,
                "spread": ask0 - bid0,
                "liq_imbalance": (bid_liq - ask_liq) / depth,
            }
            if ts_col is not None and ts_col in df.columns:
                feat["timestamp"] = row[ts_col]

            for i in range(10):
                ap = row.get(f"ask_px_{i:02d}")
                asz = row.get(f"ask_sz_{i:02d}")
                bp = row.get(f"bid_px_{i:02d}")
                bsz = row.get(f"bid_sz_{i:02d}")

                if normalize:
                    feat[f"ask_px_{i:02d}"] = (ap / mid) if pd.notna(ap) else np.nan
                    feat[f"bid_px_{i:02d}"] = (bp / mid) if pd.notna(bp) else np.nan
                    feat[f"ask_sz_{i:02d}"] = (asz / depth) if pd.notna(asz) else np.nan
                    feat[f"bid_sz_{i:02d}"] = (bsz / depth) if pd.notna(bsz) else np.nan
                else:
                    feat[f"ask_px_{i:02d}"] = ap
                    feat[f"ask_sz_{i:02d}"] = asz
                    feat[f"bid_px_{i:02d}"] = bp
                    feat[f"bid_sz_{i:02d}"] = bsz

            rows.append(feat)

    feat_df = pd.DataFrame(rows).dropna()
    return feat_df

In [5]:
# -------------------------
# 4) Splits
# -------------------------
def group_aware_split(feature_df, test_size=0.2, random_state=42):
    assert "snapshot_id" in feature_df.columns
    X = feature_df.drop(columns=["target_impact"])
    y = feature_df["target_impact"]
    groups = feature_df["snapshot_id"]

    gss = GroupShuffleSplit(n_splits=1, test_size=test_size, random_state=random_state)
    tr_idx, te_idx = next(gss.split(X, y, groups))
    return X.iloc[tr_idx].copy(), X.iloc[te_idx].copy(), y.iloc[tr_idx].copy(), y.iloc[te_idx].copy()


def time_based_split(feature_df, test_frac=0.2):
    assert "timestamp" in feature_df.columns
    df_sorted = feature_df.sort_values("timestamp").reset_index(drop=True)
    cutoff = int((1.0 - test_frac) * len(df_sorted))
    train_df = df_sorted.iloc[:cutoff].copy()
    test_df = df_sorted.iloc[cutoff:].copy()

    X_train = train_df.drop(columns=["target_impact"])
    y_train = train_df["target_impact"]
    X_test = test_df.drop(columns=["target_impact"])
    y_test = test_df["target_impact"]
    return X_train, X_test, y_train, y_test

In [6]:
# -------------------------
# 5) Training / Evaluation
# -------------------------
def train_and_eval(feature_df, prefer_time_split=True, test_size=0.2,
                   random_state=22, monotone_on_order_size=False):

    if prefer_time_split and "timestamp" in feature_df.columns:
        X_train, X_test, y_train, y_test = time_based_split(feature_df, test_frac=test_size)
        split_used = "time-based"
    else:
        X_train, X_test, y_train, y_test = group_aware_split(feature_df, test_size=test_size, random_state=random_state)
        split_used = "group-aware"

    # Drop non-model columns
    for col in ["snapshot_id", "timestamp"]:
        if col in X_train.columns:
            X_train = X_train.drop(columns=[col])
            X_test = X_test.drop(columns=[col])

    # Train
    if USE_LGBM:
        if monotone_on_order_size and "order_size" in X_train.columns:
            mono = [0] * X_train.shape[1]
            mono[X_train.columns.get_loc("order_size")] = 1
            model = lgb.LGBMRegressor(random_state=random_state, monotone_constraints=mono)
        else:
            model = lgb.LGBMRegressor(random_state=random_state)

        model.fit(X_train, y_train)
        pred = model.predict(X_test)
        fi = pd.Series(model.feature_importances_, index=X_train.columns).sort_values(ascending=False)
    else:
        model = FallbackGBR(random_state=random_state)
        model.fit(X_train, y_train)
        pred = model.predict(X_test)
        fi = None

    r2 = r2_score(y_test, pred)
    mae = mean_absolute_error(y_test, pred)

    return {
        "split_used": split_used,
        "X_train_shape": X_train.shape,
        "X_test_shape": X_test.shape,
        "R2": r2,
        "MAE": mae,
        "feature_importance": fi
    }

In [8]:
files = [
    "/content/CRWV_2025-04-03 00:00:00+00:00.csv",
    "/content/FROG_2025-04-03 00:00:00+00:00.csv",
    "/content/SOUN_2025-04-03 00:00:00+00:00.csv"   # replace with actual filename/path
]

for f in files:
    print(f"\n=== Running on {os.path.basename(f)} ===")
    df = pd.read_csv(f)

    # detect timestamp column if present
    ts_col = find_timestamp_column(df)

    # build feature dataset
    feat_df = create_feature_dataset(df, num_snapshots=5000, ts_col=ts_col)

    if len(feat_df) < 200:
        print(f"Too few rows after feature creation: {len(feat_df)}")
        continue

    # train and evaluate
    results = train_and_eval(feat_df, prefer_time_split=True)
    print(json.dumps({
        "split_used": results["split_used"],
        "R2": results["R2"],
        "MAE": results["MAE"],
        "top_features": results["feature_importance"].head(10).to_dict()
                           if results["feature_importance"] is not None else None
    }, indent=2))


=== Running on CRWV_2025-04-03 00:00:00+00:00.csv ===
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.003634 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 10493
[LightGBM] [Info] Number of data points in the train set: 22190, number of used features: 44
[LightGBM] [Info] Start training from score 0.002281
{
  "split_used": "time-based",
  "R2": 0.4846743637899744,
  "MAE": 0.0005728374570291515,
  "top_features": {
    "order_size": 421,
    "ask_sz_00": 316,
    "spread": 184,
    "ask_px_00": 126,
    "ask_sz_01": 126,
    "ask_px_09": 124,
    "ask_sz_02": 105,
    "ask_sz_03": 97,
    "mid": 93,
    "ask_sz_04": 89
  }
}

=== Running on FROG_2025-04-03 00:00:00+00:00.csv ===
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.003294 seconds.
You can set `force_row_wise=true` to remove the overh

# 2 Answer
 The goal is to execute a total of S shares over N discrete trading periods (390 one minute intervals) in a way that minimizes the total cost from temporary market impact. We need to find an allocation vector X that defines the number of shares to trade in each period i.
The total cost, C(X), is the sum of the costs of each individual trade. The cost of a single trade is its size multiplied by its impact.the most accurate impact forecast is provided by our trained Gradient Boosting Model, GBM_predict. This model serves as our cost function.

$\min_{C(X)} = \sum_{i=1}^{N} \text{Cost}_i(x_i) = \sum_{i=1}^{N} x_i \cdot \text{GBM\_predict}(\text{features}_i, x_i)$

Gradient Boosting model is not interpretable we cannot use simple calculus to find the perfect answer all at once. Instead, we use a smart, step by step algorithm that makes good decisions in real time.algorithm works by intelligently adjusting its trading speed based on historic conditions which the model should be trained. At any given moment, it knows the simple average number of shares it needs to trade per minute to finish the order on time the baseline rate(S/N). It then uses our trained ML model to predict the cost of trading right now. If the model predicts that the current impact is lower than the day's average (meaning it's a cheap time to trade), it will trade more than the baseline rate. Conversely, if the model predicts a high impact (an expensive time), it will trade less, saving shares to execute later when conditions might be better. This simple logic of buy more when it's cheap, less when it's expensive allows the algorithm to naturally reduce costs, while safety checks ensure the full order is completed by the end of the day.
