# Step 4 / Point 3 â€” Feature Layer + Regime Labels

Loads a snapshot created by `01_extract_snapshot.ipynb` and builds a *pre-trade* feature layer.

Outputs:
- `features.parquet` (trade-level features)

Notes:
- We keep this pre-trade only (no leakage): features are computed using only data at/before the trade timestamp.


In [1]:
username = "gabagool22"

# If None, auto-pick latest snapshot for this username.
snapshot_dir = None  # e.g. "/Users/.../polybot/research/data/snapshots/gabagool22-..."

# Complete-set detection window (seconds)
complete_set_window_seconds = 10

# Market trade feature windows
return_windows_seconds = [30, 300]  # 30s and 5m
flow_window_seconds = 60


In [2]:
import os
import sys
from pathlib import Path


def _find_repo_root(start: Path) -> Path:
    for p in [start, *start.parents]:
        if (p / "research" / "snapshot.py").exists():
            return p
    raise RuntimeError(
        "Cannot locate repo root (expected to find `research/snapshot.py`). "
        "Start Jupyter from the repo root or set REPO_ROOT=/path/to/polybot."
    )


repo_root_env = os.getenv("REPO_ROOT")
repo_root = Path(repo_root_env) if repo_root_env else _find_repo_root(Path.cwd())

print("repo_root:", repo_root)

snapshots_root = repo_root / "research" / "data" / "snapshots"
print("snapshots_root:", snapshots_root)


repo_root: /Users/antoniostano/programming/polybot
snapshots_root: /Users/antoniostano/programming/polybot/research/data/snapshots


In [3]:
import pandas as pd


def _pick_latest_snapshot_dir(root: Path, username: str) -> Path:
    candidates = sorted(root.glob(f"{username}-*"))
    if not candidates:
        raise RuntimeError(f"No snapshots found under {root} for username={username}")
    return max(candidates, key=lambda p: p.stat().st_mtime)


snapshot_path = Path(snapshot_dir) if snapshot_dir else _pick_latest_snapshot_dir(snapshots_root, username)
print("snapshot_path:", snapshot_path)

trades_path = snapshot_path / "trades.parquet"
market_trades_path = snapshot_path / "market_trades.parquet"
clob_tob_path = snapshot_path / "clob_tob.parquet"

trades = pd.read_parquet(trades_path)
market_trades = pd.read_parquet(market_trades_path) if market_trades_path.exists() else pd.DataFrame()
clob_tob = pd.read_parquet(clob_tob_path) if clob_tob_path.exists() else pd.DataFrame()

trades.shape, market_trades.shape, clob_tob.shape

snapshot_path: /Users/antoniostano/programming/polybot/research/data/snapshots/gabagool22-20251214T200720+0000


((7221, 44), (4027, 17), (4854, 24))

In [4]:
import numpy as np


def _to_utc(series: pd.Series) -> pd.Series:
    # clickhouse-connect returns tz-naive or tz-aware depending on settings; normalize to UTC
    dt = pd.to_datetime(series, errors="coerce", utc=True)
    return dt


trades = trades.copy()
trades["ts"] = _to_utc(trades["ts"])
if not market_trades.empty:
    market_trades = market_trades.copy()
    market_trades["ts"] = _to_utc(market_trades["ts"])
if not clob_tob.empty:
    clob_tob = clob_tob.copy()
    clob_tob["trade_at"] = _to_utc(clob_tob["trade_at"])
    clob_tob["captured_at"] = _to_utc(clob_tob["captured_at"])

trades[["ts", "market_slug", "token_id", "side", "outcome", "price", "size", "series", "exec_type", "seconds_to_end"]].head()

Unnamed: 0,ts,market_slug,token_id,side,outcome,price,size,series,exec_type,seconds_to_end
0,2025-12-14 11:45:37+00:00,eth-updown-15m-1765712700,4639703364689995573571175229848151106931398707...,BUY,Up,0.23,5.0,updown-15m,UNKNOWN,863
1,2025-12-14 11:45:37+00:00,btc-updown-15m-1765712700,5108999353819477633166653263465403529775473065...,BUY,Up,0.57,20.0,updown-15m,UNKNOWN,863
2,2025-12-14 11:45:37+00:00,btc-updown-15m-1765712700,5108999353819477633166653263465403529775473065...,BUY,Up,0.54,20.0,updown-15m,UNKNOWN,863
3,2025-12-14 11:45:37+00:00,btc-updown-15m-1765712700,9749576552879489345227050541994885550913688913...,BUY,Down,0.53,20.0,updown-15m,UNKNOWN,863
4,2025-12-14 11:45:39+00:00,btc-updown-15m-1765712700,5108999353819477633166653263465403529775473065...,BUY,Up,0.55,20.0,updown-15m,UNKNOWN,861


## Market-trade context features (pre-trade)

Compute, per user trade:
- last observed market trade price (same `token_id`)
- returns over short windows (e.g. 30s, 5m)
- market trade count + volume in last 60s (liquidity/activity proxy)


In [5]:
features = trades.copy()

if market_trades.empty:
    print("market_trades is empty; skipping market context features")
else:
    market_trades_sorted = market_trades.sort_values(["token_id", "ts"], kind="stable")
    features = features.sort_values(["token_id", "ts"], kind="stable")

    # Allocate output columns
    features["mt_last_price"] = np.nan
    features["mt_last_ts"] = pd.NaT
    features["mt_last_age_ms"] = np.nan
    for w in return_windows_seconds:
        features[f"mt_return_{w}s"] = np.nan
    features[f"mt_trades_{flow_window_seconds}s"] = np.nan
    features[f"mt_volume_{flow_window_seconds}s"] = np.nan
    features[f"mt_notional_{flow_window_seconds}s"] = np.nan

    # Grouped searchsorted for efficiency (keeps it scalable for 10k-100k rows)
    for token_id, mt in market_trades_sorted.groupby("token_id", sort=False):
        idx = features["token_id"] == token_id
        if not idx.any():
            continue

        mt_ts = mt["ts"].to_numpy(dtype="datetime64[ns]").astype("int64")
        mt_price = mt["price"].to_numpy(dtype="float64")
        mt_size = mt["size"].to_numpy(dtype="float64")
        mt_notional = (mt_price * mt_size).astype("float64")

        mt_cum_size = np.cumsum(mt_size)
        mt_cum_notional = np.cumsum(mt_notional)

        tt_ts = features.loc[idx, "ts"].to_numpy(dtype="datetime64[ns]").astype("int64")

        # last observed market trade at/before user trade
        end = np.searchsorted(mt_ts, tt_ts, side="right")
        has_last = end > 0
        last_ix = np.maximum(end - 1, 0)

        last_price = np.where(has_last, mt_price[last_ix], np.nan)
        last_ts = np.where(has_last, mt_ts[last_ix], np.datetime64("NaT").astype("int64"))

        features.loc[idx, "mt_last_price"] = last_price
        features.loc[idx, "mt_last_ts"] = pd.to_datetime(last_ts, utc=True, errors="coerce")
        features.loc[idx, "mt_last_age_ms"] = np.where(has_last, (tt_ts - last_ts) / 1_000_000.0, np.nan)

        # returns over windows: last_price - price_at(t - window)
        for w in return_windows_seconds:
            w_ns = int(w * 1e9)
            start = np.searchsorted(mt_ts, tt_ts - w_ns, side="right")
            has_start = start > 0
            start_ix = np.maximum(start - 1, 0)
            start_price = np.where(has_start, mt_price[start_ix], np.nan)
            features.loc[idx, f"mt_return_{w}s"] = last_price - start_price

        # flow features for last N seconds
        w = flow_window_seconds
        w_ns = int(w * 1e9)
        start = np.searchsorted(mt_ts, tt_ts - w_ns, side="right")

        cnt = end - start
        vol = np.where(end > 0, mt_cum_size[last_ix], 0.0) - np.where(start > 0, mt_cum_size[start - 1], 0.0)
        notional = np.where(end > 0, mt_cum_notional[last_ix], 0.0) - np.where(start > 0, mt_cum_notional[start - 1], 0.0)

        features.loc[idx, f"mt_trades_{w}s"] = cnt
        features.loc[idx, f"mt_volume_{w}s"] = vol
        features.loc[idx, f"mt_notional_{w}s"] = notional

features[["ts", "token_id", "mt_last_price", "mt_return_30s", "mt_trades_60s", "mt_volume_60s"]].head()

               '2025-12-14 18:29:39+00:00', '2025-12-14 18:29:39+00:00',
               '2025-12-14 18:29:39+00:00', '2025-12-14 18:29:39+00:00',
               '2025-12-14 18:29:39+00:00', '2025-12-14 18:29:39+00:00',
               '2025-12-14 18:29:39+00:00', '2025-12-14 18:29:39+00:00',
               '2025-12-14 18:29:39+00:00', '2025-12-14 18:32:39+00:00',
               '2025-12-14 18:32:39+00:00', '2025-12-14 18:32:39+00:00',
               '2025-12-14 18:32:39+00:00', '2025-12-14 18:32:39+00:00',
               '2025-12-14 18:32:39+00:00', '2025-12-14 18:32:39+00:00',
               '2025-12-14 18:32:39+00:00', '2025-12-14 18:35:37+00:00',
               '2025-12-14 18:35:37+00:00', '2025-12-14 18:37:07+00:00',
               '2025-12-14 18:37:07+00:00', '2025-12-14 18:37:09+00:00',
               '2025-12-14 18:37:09+00:00', '2025-12-14 18:37:09+00:00',
               '2025-12-14 18:37:09+00:00', '2025-12-14 18:41:39+00:00',
               '2025-12-14 18:41:39+00:00'],
      

Unnamed: 0,ts,token_id,mt_last_price,mt_return_30s,mt_trades_60s,mt_volume_60s
52,2025-12-14 12:00:21+00:00,1000225558155636032152044876624006520594352313...,,,,
68,2025-12-14 12:00:33+00:00,1000225558155636032152044876624006520594352313...,,,,
69,2025-12-14 12:00:33+00:00,1000225558155636032152044876624006520594352313...,,,,
70,2025-12-14 12:00:43+00:00,1000225558155636032152044876624006520594352313...,,,,
72,2025-12-14 12:00:43+00:00,1000225558155636032152044876624006520594352313...,,,,


## Regime labels: COMPLETE_SET vs DIRECTIONAL

He often buys both sides within short windows in the same market. We label each trade window as a potential *complete-set arbitrage* regime.


In [6]:
# Only consider BUY trades for binary up/down markets
mask_ud_buy = (features["side"] == "BUY") & (features["outcome"].isin(["Up", "Down"]))

tmp = features.loc[mask_ud_buy, ["market_slug", "ts", "outcome", "size"]].copy()
tmp["bucket"] = tmp["ts"].dt.floor(f"{int(complete_set_window_seconds)}s")

by_bucket = (
    tmp.pivot_table(
        index=["market_slug", "bucket"],
        columns="outcome",
        values="size",
        aggfunc="sum",
        fill_value=0.0,
    )
    .rename(columns={"Up": "up_shares", "Down": "down_shares"})
    .reset_index()
)
if "up_shares" not in by_bucket.columns:
    by_bucket["up_shares"] = 0.0
if "down_shares" not in by_bucket.columns:
    by_bucket["down_shares"] = 0.0

by_bucket["complete_set_flag"] = (by_bucket["up_shares"] > 0) & (by_bucket["down_shares"] > 0)
by_bucket["complete_set_shares"] = np.minimum(by_bucket["up_shares"], by_bucket["down_shares"])

features["bucket"] = features["ts"].dt.floor(f"{int(complete_set_window_seconds)}s")
features = features.merge(
    by_bucket[["market_slug", "bucket", "complete_set_flag", "complete_set_shares"]],
    how="left",
    on=["market_slug", "bucket"],
)

features["complete_set_flag"] = features["complete_set_flag"].fillna(False)
features["complete_set_shares"] = features["complete_set_shares"].fillna(0.0)

# Regime label (simple first pass)
is_ud = features["outcome"].isin(["Up", "Down"])
is_buy = features["side"] == "BUY"

features["regime"] = "OTHER"
features.loc[is_buy & is_ud, "regime"] = "DIRECTIONAL"
features.loc[is_buy & is_ud & features["complete_set_flag"], "regime"] = "COMPLETE_SET_ARBITRAGE"

features["regime"].value_counts(dropna=False)

regime
DIRECTIONAL               4556
COMPLETE_SET_ARBITRAGE    2665
Name: count, dtype: int64

## Quick sanity checks


In [7]:
resolved = features[features["realized_pnl"].notna()].copy()
resolved.groupby(["regime", "exec_type"], dropna=False).agg(
    trades=("event_key", "count"),
    pnl_usd=("realized_pnl", "sum"),
    avg_pnl=("realized_pnl", "mean"),
).sort_values("pnl_usd", ascending=False).head(20)

Unnamed: 0_level_0,Unnamed: 1_level_0,trades,pnl_usd,avg_pnl
regime,exec_type,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
DIRECTIONAL,MAKER_LIKE,1063,1731.60215,1.628977
COMPLETE_SET_ARBITRAGE,MAKER_LIKE,560,1078.86989,1.926553
COMPLETE_SET_ARBITRAGE,INSIDE,368,280.611105,0.76253
DIRECTIONAL,UNKNOWN,455,264.2445,0.580757
COMPLETE_SET_ARBITRAGE,UNKNOWN,314,26.7024,0.085039
DIRECTIONAL,INSIDE,749,-188.766495,-0.252025
DIRECTIONAL,TAKER_LIKE,838,-1039.37571,-1.240305
COMPLETE_SET_ARBITRAGE,TAKER_LIKE,535,-1486.87426,-2.779204


In [8]:
# Save features for downstream notebooks
out_path = snapshot_path / "features.parquet"
features.to_parquet(out_path, index=False)
out_path

PosixPath('/Users/antoniostano/programming/polybot/research/data/snapshots/gabagool22-20251214T200720+0000/features.parquet')