In [None]:

from cluster_features import ClusterFeatureAdder
from __future__ import annotations

from typing import List, Optional, Dict, Tuple

import numpy as np
import pandas as pd
from joblib import dump, load

from sklearn.base import BaseEstimator, TransformerMixin, RegressorMixin
from sklearn.cluster import KMeans
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.neighbors import KNeighborsRegressor
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVR

In [2]:
# 2. GLOBALS

# Paths to data (adjust to your folder structure)
EXEC_PATH = "./assignment 3/execs_from_fix.csv"
QUOTES_PATH = "./assignment 4/quotes_2025-09-10_small.csv.gz"

# Optional: limit to a subset of tickers during development
KEEP_TICKERS: Optional[List[str]] = [
    "AAPL", "MSFT", "NVDA",
    "BRK.B", "HXHX", "SFHG", "DIDIY",
    "INEO", "THH", "FMCC", "LKNCY", "IVFH"
]

# Market hours filter
MARKET_OPEN = pd.to_datetime("09:30").time()
MARKET_CLOSE = pd.to_datetime("16:00").time()

# Random seed for reproducibility
RANDOM_SEED = 42

# Feature and target names used for modeling
FEATURE_COLS = [
    "side",        # 1 = buy, other values = sell
    "order_qty",
    "limit_price",
    "bid_price",
    "ask_price",
    "bid_size",
    "ask_size",
]

TARGET_COL = "price_improvement"
GROUP_COL = "exchange"  # train one model per exchange


In [3]:
# 3. DATA LOADING FUNCTIONS

def load_quotes(path: str, tickers: Optional[List[str]]) -> pd.DataFrame:
    """Load quotes file in chunks, filter by tickers and market hours."""
    dtypes = {
        "ticker":    "string",
        "bid_price": "float32",
        "ask_price": "float32",
        "bid_size":  "int32",
        "ask_size":  "int32",
    }

    chunks: List[pd.DataFrame] = []
    chunk_size = 1_000_000

    for chunk in pd.read_csv(
        path,
        compression="gzip",
        dtype=dtypes,
        low_memory=False,
        chunksize=chunk_size,
    ):
        if tickers is not None:
            chunk = chunk[chunk["ticker"].isin(tickers)]

        if chunk.empty:
            continue

        # Convert timestamp to datetime
        chunk["sip_timestamp"] = pd.to_datetime(
            chunk["sip_timestamp"],
            unit="ns",
            errors="coerce",
        )

        # Keep only regular market hours
        t = chunk["sip_timestamp"].dt.time
        chunk = chunk[(t >= MARKET_OPEN) & (t <= MARKET_CLOSE)]

        if not chunk.empty:
            chunks.append(chunk)

    if chunks:
        quotes = pd.concat(chunks, ignore_index=True)
    else:
        quotes = pd.DataFrame(
            columns=[
                "ticker", "sip_timestamp",
                "bid_price", "ask_price",
                "bid_size", "ask_size",
                "bid_exchange", "ask_exchange",
            ]
        )

    return quotes


def load_executions(path: str, tickers: Optional[List[str]]) -> pd.DataFrame:
    """Load executions from execs_from_fix.csv and filter by tickers and market hours."""
    dtypes = {
        "order_id":         "string",
        "symbol":           "string",
        "side":             "int8",
        "order_qty":        "int32",
        "limit_price":      "float32",
        "execution_price":  "float32",
        "exchange":         "string",
    }

    df = pd.read_csv(
        path,
        dtype=dtypes,
        parse_dates=["order_time", "execution_time"],
        infer_datetime_format=True,
        low_memory=False,
    )

    if tickers is not None:
        df = df[df["symbol"].isin(tickers)]

    # Keep only regular market hours based on order time
    t = df["order_time"].dt.time
    df = df[(t >= MARKET_OPEN) & (t <= MARKET_CLOSE)]

    return df


def attach_quotes(executions: pd.DataFrame, quotes: pd.DataFrame) -> pd.DataFrame:
    """Attach the most recent quote at or before execution_time for each symbol."""
    # Sort by time first, then by symbol, as required by merge_asof
    executions.sort_values(["execution_time", "symbol"], inplace=True)
    quotes.sort_values(["sip_timestamp", "ticker"], inplace=True)

    merged = pd.merge_asof(
        executions,
        quotes,
        left_on="execution_time",
        right_on="sip_timestamp",
        left_by="symbol",
        right_by="ticker",
        direction="backward",
        allow_exact_matches=True,
    )

    return merged


In [None]:
# 4. FEATURE ENGINEERING
def add_price_improvement(df: pd.DataFrame) -> None:
    """Add price_improvement column in place.

    For buys (side == 1): improvement = ask_price - execution_price
    For sells:           improvement = execution_price - bid_price

    Positive values mean better-than-NBBO execution.
    """
    is_buy = df["side"] == 1

    ref_price = np.where(is_buy, df["ask_price"], df["bid_price"])
    improvement = np.where(
        is_buy,
        ref_price - df["execution_price"],   # buy: smaller exec price better
        df["execution_price"] - ref_price,   # sell: higher exec price better
    )

    df[TARGET_COL] = improvement.astype("float32")


def prepare_model_data(df: pd.DataFrame) -> pd.DataFrame:
    """Drop rows with missing values in features or target (in place)."""
    cols_needed = FEATURE_COLS + [TARGET_COL, GROUP_COL]
    df.dropna(subset=cols_needed, inplace=True)
    return df


In [5]:
# 5. PIPELINES & TRAINING LOGIC

def make_model_candidates(random_state: int = RANDOM_SEED) -> Dict[str, Tuple[Pipeline, List[Dict]]]:
    """Define candidate pipelines and their hyperparameter grids.

    Returns:
        A dict model_name -> (pipeline, param_grid_list)
        where param_grid_list is passed directly to GridSearchCV.
    """
    candidates: Dict[str, Tuple[Pipeline, List[Dict]]] = {}

    # 1) Linear regression baseline
    linear_pipe = Pipeline(
        steps=[
            ("scaler", StandardScaler()),
            ("model", LinearRegression()),
        ]
    )
    linear_grid = [{}]  # no hyperparameters to tune

    candidates["linear"] = (linear_pipe, linear_grid)

    # 2) Random Forest regressor
    rf_pipe = Pipeline(
        steps=[
            ("scaler", StandardScaler()),
            ("model", RandomForestRegressor(
                random_state=random_state,
                n_jobs=-1,
            )),
        ]
    )
    rf_grid = [{
        "model__n_estimators": [100, 300],
        "model__max_depth": [10, None],
        "model__min_samples_leaf": [1, 5],
    }]
    candidates["random_forest"] = (rf_pipe, rf_grid)

    # 3) Support Vector Regressor (SVM)
    svr_pipe = Pipeline(
        steps=[
            ("scaler", StandardScaler()),
            ("model", SVR()),
        ]
    )
    svr_grid = [{
        "model__C": [0.1, 1.0, 10.0],
        "model__epsilon": [0.01, 0.1],
        "model__gamma": ["scale", "auto"],
        "model__kernel": ["rbf"],
    }]
    candidates["svr"] = (svr_pipe, svr_grid)

    # 4) Cluster-based Random Forest: add k-means cluster as feature
    cluster_rf_pipe = Pipeline(
        steps=[
            ("scaler", StandardScaler()),
            ("cluster", ClusterFeatureAdder()),
            ("model", RandomForestRegressor(
                random_state=random_state,
                n_jobs=-1,
            )),
        ]
    )
    cluster_rf_grid = [{
        "cluster__n_clusters": [3, 5, 10],
        "model__n_estimators": [100, 200],
        "model__max_depth": [10, None],
    }]
    candidates["cluster_rf"] = (cluster_rf_pipe, cluster_rf_grid)

    return candidates


def train_best_models_per_exchange(
    df: pd.DataFrame,
    test_size: float = 0.2,
    random_state: int = RANDOM_SEED,
) -> Tuple[Dict[str, RegressorMixin], pd.DataFrame]:
    """Train multiple models per exchange and select the best by out-of-sample R².

    Returns:
        models: dict exchange -> best fitted pipeline
        results: DataFrame with per-exchange, per-model metrics
                 (R² train/test, MSE train/test, n_train, n_test)
    """
    candidates = make_model_candidates(random_state=random_state)
    models: Dict[str, RegressorMixin] = {}
    results_rows: List[Dict] = []

    for exch, d in df.groupby(GROUP_COL):
        # Drop rows with missing features/target for this exchange
        d_local = d.dropna(subset=FEATURE_COLS + [TARGET_COL])
        if len(d_local) < 50:
            # Not enough data to train robust models
            continue

        X = d_local[FEATURE_COLS].astype("float32")
        y = d_local[TARGET_COL].astype("float32")

        X_train, X_test, y_train, y_test = train_test_split(
            X,
            y,
            test_size=test_size,
            random_state=random_state,
        )

        best_model_name: Optional[str] = None
        best_model: Optional[RegressorMixin] = None
        best_r2_test: float = float("-inf")

        for model_name, (pipe, param_grid) in candidates.items():
            grid = GridSearchCV(
                estimator=pipe,
                param_grid=param_grid,
                cv=3,
                n_jobs=-1,
                scoring="r2",
            )
            grid.fit(X_train, y_train)

            best_estimator = grid.best_estimator_

            # In-sample and out-of-sample predictions
            y_train_pred = best_estimator.predict(X_train)
            y_test_pred = best_estimator.predict(X_test)

            r2_train = r2_score(y_train, y_train_pred)
            r2_test = r2_score(y_test, y_test_pred)
            mse_train = mean_squared_error(y_train, y_train_pred)
            mse_test = mean_squared_error(y_test, y_test_pred)

            results_rows.append(
                {
                    "exchange": exch,
                    "model": model_name,
                    "r2_train": r2_train,
                    "r2_test": r2_test,
                    "mse_train": mse_train,
                    "mse_test": mse_test,
                    "n_train": len(X_train),
                    "n_test": len(X_test),
                }
            )

            # Select best model for this exchange by highest test R²
            if r2_test > best_r2_test:
                best_r2_test = r2_test
                best_model_name = model_name
                best_model = best_estimator

        if best_model is not None:
            models[exch] = best_model
            print(
                f"Best model for exchange {exch}: {best_model_name} "
                f"(R² test = {best_r2_test:.3f})"
            )

    results_df = pd.DataFrame(results_rows)
    return models, results_df


In [6]:
# 6. EXECUTION: LOAD DATA, TRAIN MODELS, REPORT METRICS, SAVE JOBLIB

# 6.1 Load raw data
quotes = load_quotes(QUOTES_PATH, KEEP_TICKERS)
executions = load_executions(EXEC_PATH, KEEP_TICKERS)

print(f"Quotes shape: {quotes.shape}")
print(f"Executions shape: {executions.shape}")

# 6.2 Merge executions with quotes and create features
df_features = attach_quotes(executions, quotes)
print(f"Merged features shape (before PI): {df_features.shape}")

add_price_improvement(df_features)
prepare_model_data(df_features)

print(f"Merged features shape (after cleaning): {df_features.shape}")

# 6.3 Train best models per exchange and get metrics
models, results_df = train_best_models_per_exchange(df_features)

print("\nPer-exchange, per-model performance summary:")
display(results_df.sort_values(["exchange", "r2_test"], ascending=[True, False]))

# 6.4 Save the chosen models (one per exchange) for use in somewhat_smart_order_router.py
dump(models, "per_exchange_price_improvement_models.joblib")
print("\nSaved models to per_exchange_price_improvement_models.joblib")


  df = pd.read_csv(


Quotes shape: (2274181, 8)
Executions shape: (1161, 9)
Merged features shape (before PI): (1161, 17)
Merged features shape (after cleaning): (1142, 18)
Best model for exchange ID1516: random_forest (R² test = 0.636)
Best model for exchange ID29608: cluster_rf (R² test = 0.459)

Per-exchange, per-model performance summary:


Unnamed: 0,exchange,model,r2_train,r2_test,mse_train,mse_test,n_train,n_test
1,ID1516,random_forest,0.975748,0.636028,0.001025,0.005908,860,215
3,ID1516,cluster_rf,0.974194,0.635866,0.001091,0.005911,860,215
0,ID1516,linear,0.458215,0.281625,0.022904,0.011661,860,215
2,ID1516,svr,0.232991,-0.056724,0.032425,0.017153,860,215
7,ID29608,cluster_rf,0.896729,0.459261,0.006365,0.033375,48,13
5,ID29608,random_forest,0.541618,0.334971,0.028253,0.041047,48,13
6,ID29608,svr,0.178064,0.054188,0.050661,0.058377,48,13
4,ID29608,linear,0.768948,-1.619523,0.014241,0.161682,48,13



Saved models to per_exchange_price_improvement_models.joblib


In [14]:
from importlib import reload
import somewhat_smart_order_router as ssor

ssor = reload(ssor)

row = df_features[df_features["exchange"].isin(["ID1516", "ID29608"])].sample(1, random_state=1).iloc[0]
side_char = "B" if row["side"] == 1 else "S"

router_args = {
    "symbol":      str(row["symbol"]),
    "side":        side_char,
    "quantity":    int(row["order_qty"]),
    "limit_price": float(row["limit_price"]),
    "bid_price":   float(row["bid_price"]),
    "ask_price":   float(row["ask_price"]),
    "bid_size":    int(row["bid_size"]),
    "ask_size":    int(row["ask_size"]),
}

exchange, predicted_pi = ssor.best_price_improvement(**router_args)

print("Recommended exchange:", exchange)
print("Predicted price improvement:", predicted_pi)
print("Actual exchange:", row["exchange"])
print("Actual price_improvement:", row["price_improvement"])


Recommended exchange: ID29608
Predicted price improvement: 0.34085572719573975
Actual exchange: ID1516
Actual price_improvement: 0.0023956299


In [12]:
import pytest

In [13]:
!pytest test_somewhat_smart_order_router.py

platform win32 -- Python 3.13.7, pytest-8.4.2, pluggy-1.6.0
rootdir: c:\Users\gonza\Documents\UChicago\Quarters\Fall I\FINM 32400 - Python for Financial Data Science\assignment4_order_router
plugins: anyio-4.11.0
collected 3 items

test_somewhat_smart_order_router.py [31mF[0m[31mF[0m[31mF[0m[31m                                  [100%][0m

[31m[1m______________________________ test_normal_order ______________________________[0m

    [0m[94mdef[39;49;00m[90m [39;49;00m[92mtest_normal_order[39;49;00m():[90m[39;49;00m
    [90m    [39;49;00m[33m"""Tests that a normal buy order returns a valid exchange"""[39;49;00m[90m[39;49;00m
>       exchange, price_improvement = best_price_improvement([90m[39;49;00m
            symbol=[33m"[39;49;00m[33mAAPL[39;49;00m[33m"[39;49;00m,[90m[39;49;00m
            side=[33m"[39;49;00m[33mB[39;49;00m[33m"[39;49;00m,[90m[39;49;00m
            quantity=[94m100[39;49;00m,[90m[39;49;00m
            limit_price=[94m1