# LOBSTER Data Engineering + EDA + Feature/Label Pipeline

This notebook discovers LOBSTER message/orderbook pairs, validates alignment,
computes microstructure features and labels, runs EDA, and writes artifacts
to `results/`. It is parameterized via environment variables and `config.yaml`.


In [None]:
from __future__ import annotations

import json
import os
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterator, List, Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import yaml


In [None]:
@dataclass
class Config:
    num_levels: int = 10
    topk: int = 5
    horizons_events: List[int] | None = None
    export_format: str = "parquet"
    chunk_rows: int = 500000
    output_dir: str = "results"
    plotting: bool = True

    @staticmethod
    def load(path: Path) -> "Config":
        if not path.exists():
            raise FileNotFoundError(f"Config not found: {path}")
        with path.open("r", encoding="utf-8") as handle:
            data = yaml.safe_load(handle) or {}
        horizons = data.get("horizons_events", [1, 5, 10])
        return Config(
            num_levels=int(data.get("num_levels", 10)),
            topk=int(data.get("topk", 5)),
            horizons_events=[int(h) for h in horizons],
            export_format=str(data.get("export_format", "parquet")),
            chunk_rows=int(data.get("chunk_rows", 500000)),
            output_dir=str(data.get("output_dir", "results")),
            plotting=bool(data.get("plotting", True)),
        )


In [None]:
PAIR = Tuple[Path, Path, str, str]
MESSAGE_RE = re.compile(r"(?P<ticker>[A-Za-z]+)_(?P<date>\d{4}-\d{2}-\d{2}).*message.*\.csv$")
ORDERBOOK_RE = re.compile(r"(?P<ticker>[A-Za-z]+)_(?P<date>\d{4}-\d{2}-\d{2}).*orderbook.*\.csv$")

def _infer_from_filename(path: Path) -> Tuple[str, str] | None:
    match = MESSAGE_RE.match(path.name) or ORDERBOOK_RE.match(path.name)
    if not match:
        return None
    return match.group("ticker"), match.group("date")

def _pair_from_directory(directory: Path) -> Tuple[Path, Path] | None:
    message = directory / "message.csv"
    orderbook = directory / "orderbook.csv"
    if message.exists() and orderbook.exists():
        return message, orderbook
    return None

def discover_pairs(data_root: Path, repo_root: Path) -> List[PAIR]:
    pairs: Dict[Tuple[str, str], Tuple[Path, Path]] = {}
    if data_root.exists():
        for subdir in data_root.rglob("*"):
            if not subdir.is_dir():
                continue
            found = _pair_from_directory(subdir)
            if found:
                ticker = subdir.parent.name
                date = subdir.name
                pairs[(ticker, date)] = found

    if not pairs:
        for path in repo_root.rglob("*.csv"):
            if "message" in path.name:
                inferred = _infer_from_filename(path)
                if not inferred:
                    continue
                ticker, date = inferred
                orderbook_candidate = path.with_name(path.name.replace("message", "orderbook"))
                if orderbook_candidate.exists():
                    pairs[(ticker, date)] = (path, orderbook_candidate)

    return [
        (message, orderbook, ticker, date)
        for (ticker, date), (message, orderbook) in sorted(pairs.items())
    ]


In [None]:
MESSAGE_COLUMNS = ["time_sec", "event_type", "order_id", "size", "price", "direction"]
MESSAGE_DTYPES = {
    "time_sec": "float64",
    "event_type": "int8",
    "order_id": "int64",
    "size": "int64",
    "price": "int64",
    "direction": "int8",
}

def _orderbook_columns(num_levels: int) -> list[str]:
    cols = []
    for level in range(1, num_levels + 1):
        cols.extend([
            f"ask_price_{level}",
            f"ask_size_{level}",
            f"bid_price_{level}",
            f"bid_size_{level}",
        ])
    return cols

def _orderbook_dtypes(num_levels: int) -> dict[str, str]:
    dtypes: dict[str, str] = {}
    for level in range(1, num_levels + 1):
        dtypes[f"ask_price_{level}"] = "int64"
        dtypes[f"ask_size_{level}"] = "int64"
        dtypes[f"bid_price_{level}"] = "int64"
        dtypes[f"bid_size_{level}"] = "int64"
    return dtypes

def load_full(message_path: Path, orderbook_path: Path, num_levels: int) -> pd.DataFrame:
    message = pd.read_csv(
        message_path,
        header=None,
        names=MESSAGE_COLUMNS,
        dtype=MESSAGE_DTYPES,
    )
    orderbook = pd.read_csv(
        orderbook_path,
        header=None,
        names=_orderbook_columns(num_levels),
        dtype=_orderbook_dtypes(num_levels),
    )
    if len(message) != len(orderbook):
        raise ValueError("Message and orderbook rows are misaligned.")
    df = pd.concat([message, orderbook], axis=1)
    df["time_ns"] = np.round(df["time_sec"] * 1e9).astype("int64")
    return df

def iter_load(
    message_path: Path,
    orderbook_path: Path,
    num_levels: int,
    chunk_rows: int,
) -> Iterator[pd.DataFrame]:
    message_iter = pd.read_csv(
        message_path,
        header=None,
        names=MESSAGE_COLUMNS,
        dtype=MESSAGE_DTYPES,
        chunksize=chunk_rows,
    )
    orderbook_iter = pd.read_csv(
        orderbook_path,
        header=None,
        names=_orderbook_columns(num_levels),
        dtype=_orderbook_dtypes(num_levels),
        chunksize=chunk_rows,
    )
    for message_chunk, orderbook_chunk in zip(message_iter, orderbook_iter):
        if len(message_chunk) != len(orderbook_chunk):
            raise ValueError("Chunk size mismatch between message and orderbook.")
        df = pd.concat([message_chunk, orderbook_chunk], axis=1)
        df["time_ns"] = np.round(df["time_sec"] * 1e9).astype("int64")
        yield df

def load_lobster(
    message_path: Path,
    orderbook_path: Path,
    num_levels: int,
    chunk_rows: int | None = None,
) -> Tuple[pd.DataFrame | None, Iterator[pd.DataFrame] | None]:
    if chunk_rows:
        return None, iter_load(message_path, orderbook_path, num_levels, chunk_rows)
    return load_full(message_path, orderbook_path, num_levels), None


In [None]:
DUMMY_BID = -9999999999
DUMMY_ASK = 9999999999

def _level_columns(prefix: str, num_levels: int) -> List[str]:
    return [f"{prefix}_{level}" for level in range(1, num_levels + 1)]

def add_halt_flags(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["is_halt"] = (df["event_type"] == 7).astype("int8")
    return df

def sanity_checks(df: pd.DataFrame, num_levels: int) -> Dict[str, int]:
    best_bid = df["bid_price_1"].replace(DUMMY_BID, np.nan)
    best_ask = df["ask_price_1"].replace(DUMMY_ASK, np.nan)
    spread_ok = (best_bid < best_ask) | best_bid.isna() | best_ask.isna()

    bid_sizes = df[_level_columns("bid_size", num_levels)]
    ask_sizes = df[_level_columns("ask_size", num_levels)]
    sizes_ok = (bid_sizes >= 0).all(axis=None) and (ask_sizes >= 0).all(axis=None)

    bid_prices = df[_level_columns("bid_price", num_levels)].replace(DUMMY_BID, np.nan)
    ask_prices = df[_level_columns("ask_price", num_levels)].replace(DUMMY_ASK, np.nan)

    bid_monotonic = (
        np.nan_to_num(bid_prices.values, nan=-np.inf)[:, :-1]
        >= np.nan_to_num(bid_prices.values, nan=-np.inf)[:, 1:]
    ).all()
    ask_monotonic = (
        np.nan_to_num(ask_prices.values, nan=np.inf)[:, :-1]
        <= np.nan_to_num(ask_prices.values, nan=np.inf)[:, 1:]
    ).all()

    return {
        "rows": int(len(df)),
        "spread_violations": int((~spread_ok).sum()),
        "sizes_non_negative": int(sizes_ok),
        "bid_monotonic": int(bid_monotonic),
        "ask_monotonic": int(ask_monotonic),
        "halt_rows": int((df["event_type"] == 7).sum()),
    }

def write_sanity(summary: Dict[str, int], out_path: Path) -> None:
    out_path.parent.mkdir(parents=True, exist_ok=True)
    with out_path.open("w", encoding="utf-8") as handle:
        json.dump(summary, handle, indent=2)


In [None]:
def _level_arrays(df: pd.DataFrame, prefix: str, num_levels: int) -> np.ndarray:
    cols = [f"{prefix}_{level}" for level in range(1, num_levels + 1)]
    return df[cols].to_numpy()

def _mask_dummy_prices(prices: np.ndarray, side: str) -> np.ndarray:
    if side == "bid":
        return np.where(prices == DUMMY_BID, np.nan, prices)
    return np.where(prices == DUMMY_ASK, np.nan, prices)

def _topk_depth(sizes: np.ndarray, k: int) -> np.ndarray:
    return np.nansum(sizes[:, :k], axis=1)

def _rolling_event_rate(time_ns: np.ndarray, window_ns: int = int(1e9)) -> np.ndarray:
    rates = np.zeros_like(time_ns, dtype="float64")
    left = 0
    for idx, t in enumerate(time_ns):
        while time_ns[left] < t - window_ns:
            left += 1
        window_count = idx - left + 1
        rates[idx] = window_count / (window_ns / 1e9)
    return rates

def _rolling_sum(series: pd.Series, window: int) -> pd.Series:
    return series.rolling(window=window, min_periods=1).sum()

def _rolling_std(series: pd.Series, window: int) -> pd.Series:
    return series.rolling(window=window, min_periods=2).std().fillna(0.0)

def compute_features(df: pd.DataFrame, num_levels: int, topk: int) -> pd.DataFrame:
    bid_prices = _mask_dummy_prices(_level_arrays(df, "bid_price", num_levels), "bid")
    ask_prices = _mask_dummy_prices(_level_arrays(df, "ask_price", num_levels), "ask")
    bid_sizes = _level_arrays(df, "bid_size", num_levels)
    ask_sizes = _level_arrays(df, "ask_size", num_levels)

    best_bid = bid_prices[:, 0]
    best_ask = ask_prices[:, 0]
    mid = (best_bid + best_ask) / 2.0
    spread = best_ask - best_bid

    level1_imb = (bid_sizes[:, 0] - ask_sizes[:, 0]) / (
        bid_sizes[:, 0] + ask_sizes[:, 0]
    )
    bid_depth_k = _topk_depth(bid_sizes, topk)
    ask_depth_k = _topk_depth(ask_sizes, topk)
    topk_imb = (bid_depth_k - ask_depth_k) / (bid_depth_k + ask_depth_k)

    microprice = (
        best_bid * ask_sizes[:, 0] + best_ask * bid_sizes[:, 0]
    ) / (bid_sizes[:, 0] + ask_sizes[:, 0])

    time_ns = df["time_ns"].to_numpy()
    event_rate = _rolling_event_rate(time_ns)

    sign = np.select(
        [df["event_type"].isin([1]), df["event_type"].isin([2, 3, 4, 5])],
        [1, -1],
        default=0,
    )
    signed_flow = df["direction"] * df["size"] * sign
    order_flow_imb = _rolling_sum(signed_flow, window=100)

    mid_series = pd.Series(mid)
    mid_ret = mid_series.pct_change().fillna(0.0)
    realized_vol = _rolling_std(mid_ret, window=100)

    features = df.copy()
    features["best_bid"] = best_bid
    features["best_ask"] = best_ask
    features["mid"] = mid
    features["spread"] = spread
    features["level1_imbalance"] = level1_imb
    features["topk_imbalance"] = topk_imb
    features["depth_topk_bid"] = bid_depth_k
    features["depth_topk_ask"] = ask_depth_k
    features["depth_total_topk"] = bid_depth_k + ask_depth_k
    features["microprice"] = microprice
    features["event_rate"] = event_rate
    features["order_flow_imbalance"] = order_flow_imb
    features["realized_volatility"] = realized_vol
    return features


In [None]:
def generate_labels(df: pd.DataFrame, horizons: List[int]) -> pd.DataFrame:
    labeled = df.copy()
    for horizon in horizons:
        future_mid = labeled["mid"].shift(-horizon)
        raw_move = future_mid - labeled["mid"]
        threshold = np.maximum(labeled["spread"] / 2.0, 1)
        label = np.where(raw_move > threshold, 1, np.where(raw_move < -threshold, -1, 0))
        labeled[f"raw_move_H{horizon}"] = raw_move
        labeled[f"label_H{horizon}"] = label.astype("int8")
    return labeled


In [None]:
def _save_plot(fig: plt.Figure, out_path: Path) -> None:
    out_path.parent.mkdir(parents=True, exist_ok=True)
    fig.tight_layout()
    fig.savefig(out_path, dpi=150)
    plt.close(fig)

def run_eda(df: pd.DataFrame, out_dir: Path, ticker: str, date: str) -> Dict[str, float]:
    stats: Dict[str, float] = {}
    time_sec = df["time_ns"] / 1e9

    fig, ax = plt.subplots()
    ax.plot(time_sec, df["mid"], linewidth=0.8)
    ax.set_title("Midprice over time")
    ax.set_xlabel("Time (sec)")
    ax.set_ylabel("Midprice")
    _save_plot(fig, out_dir / f"midprice_{ticker}_{date}.png")

    fig, ax = plt.subplots()
    ax.plot(time_sec, df["spread"], linewidth=0.8)
    ax.set_title("Spread over time")
    ax.set_xlabel("Time (sec)")
    ax.set_ylabel("Spread")
    _save_plot(fig, out_dir / f"spread_{ticker}_{date}.png")

    fig, ax = plt.subplots()
    ax.hist(df["spread"].dropna(), bins=50)
    ax.set_title("Spread distribution")
    ax.set_xlabel("Spread")
    ax.set_ylabel("Count")
    _save_plot(fig, out_dir / f"spread_dist_{ticker}_{date}.png")

    fig, ax = plt.subplots()
    event_counts = df["event_type"].value_counts().sort_index()
    ax.bar(event_counts.index.astype(str), event_counts.values)
    ax.set_title("Event type distribution")
    ax.set_xlabel("Event type")
    ax.set_ylabel("Count")
    _save_plot(fig, out_dir / f"event_types_{ticker}_{date}.png")

    fig, ax = plt.subplots()
    seconds = (df["time_ns"] // 1_000_000_000).astype(int)
    events_per_sec = seconds.value_counts().sort_index()
    ax.plot(events_per_sec.index, events_per_sec.values)
    ax.set_title("Events per second")
    ax.set_xlabel("Second")
    ax.set_ylabel("Events")
    _save_plot(fig, out_dir / f"events_per_sec_{ticker}_{date}.png")

    fig, ax = plt.subplots()
    bins = pd.qcut(df["topk_imbalance"].fillna(0.0), q=10, duplicates="drop")
    grouped = df.groupby(bins)["raw_move_H1"].mean()
    ax.plot(grouped.index.astype(str), grouped.values, marker="o")
    ax.set_title("Imbalance vs future return")
    ax.set_xlabel("Imbalance decile")
    ax.set_ylabel("Mean future move (H1)")
    ax.tick_params(axis="x", rotation=45)
    _save_plot(fig, out_dir / f"imbalance_future_{ticker}_{date}.png")

    fig, ax = plt.subplots()
    ax.scatter(df["realized_volatility"], df["spread"], s=3, alpha=0.5)
    ax.set_title("Volatility vs spread")
    ax.set_xlabel("Realized volatility")
    ax.set_ylabel("Spread")
    _save_plot(fig, out_dir / f"vol_vs_spread_{ticker}_{date}.png")

    fig, ax = plt.subplots()
    is_cancel = df["event_type"].isin([2, 3]).astype(int)
    is_submit = (df["event_type"] == 1).astype(int)
    ratio = (
        is_cancel.groupby(seconds).sum() / is_submit.groupby(seconds).sum().replace(0, np.nan)
    )
    ax.plot(ratio.index, ratio.values)
    ax.set_title("Cancellation/submission ratio")
    ax.set_xlabel("Second")
    ax.set_ylabel("Cancel/Submit")
    _save_plot(fig, out_dir / f"cancel_submit_{ticker}_{date}.png")

    spread_q = df["spread"].quantile([0.33, 0.66])
    vol_q = df["realized_volatility"].quantile([0.33, 0.66])

    spread_regime = pd.cut(
        df["spread"],
        bins=[-np.inf, spread_q.iloc[0], spread_q.iloc[1], np.inf],
        labels=["tight", "normal", "wide"],
    )
    vol_regime = pd.cut(
        df["realized_volatility"],
        bins=[-np.inf, vol_q.iloc[0], vol_q.iloc[1], np.inf],
        labels=["low", "normal", "high"],
    )
    regime_counts = df.groupby([spread_regime, vol_regime]).size().rename("count")

    stats["rows"] = float(len(df))
    stats["time_start"] = float(time_sec.min())
    stats["time_end"] = float(time_sec.max())
    stats["avg_spread"] = float(df["spread"].mean())
    stats["avg_mid"] = float(df["mid"].mean())
    stats["regime_counts"] = regime_counts.to_dict()
    return stats


In [None]:
def write_report(
    out_path: Path,
    ticker: str,
    date: str,
    stats: Dict[str, float],
    sanity: Dict[str, int],
    horizons: list[int],
) -> None:
    out_path.parent.mkdir(parents=True, exist_ok=True)
    plot_prefix = f"{ticker}_{date}"
    plot_files = [
        f"plots/midprice_{plot_prefix}.png",
        f"plots/spread_{plot_prefix}.png",
        f"plots/spread_dist_{plot_prefix}.png",
        f"plots/event_types_{plot_prefix}.png",
        f"plots/events_per_sec_{plot_prefix}.png",
        f"plots/imbalance_future_{plot_prefix}.png",
        f"plots/vol_vs_spread_{plot_prefix}.png",
        f"plots/cancel_submit_{plot_prefix}.png",
    ]
    regime_counts = stats.get("regime_counts", {})
    regime_lines = "\n".join([f"- {k}: {v}" for k, v in regime_counts.items()])

    content = f"""# EDA Report: {ticker} {date}

## Dataset summary
- Rows: {int(stats.get('rows', 0))}
- Time range (sec): {stats.get('time_start', 0):.2f} to {stats.get('time_end', 0):.2f}
- Average spread: {stats.get('avg_spread', 0):.2f}
- Average mid: {stats.get('avg_mid', 0):.2f}

## Sanity checks
- Spread violations: {sanity.get('spread_violations', 0)}
- Non-negative sizes: {sanity.get('sizes_non_negative', 0)}
- Bid monotonic: {sanity.get('bid_monotonic', 0)}
- Ask monotonic: {sanity.get('ask_monotonic', 0)}
- Halt rows: {sanity.get('halt_rows', 0)}

## Key EDA findings
- Spread and midprice dynamics are shown in the plots below.
- Event type distribution highlights liquidity dynamics (submissions vs cancellations).
- Imbalance/future return plot provides a directional signal proxy at H=1 event horizon.
- Volatility vs spread highlights liquidity regimes.

## Regime segmentation counts
{regime_lines or '- Not available'}

## Plots
""" + "\n".join([f"- {plot}" for plot in plot_files]) + f"""

## How this feeds the project
1) Exported features include: best bid/ask, mid, spread, imbalance (L1/topK), depth, microprice,
   event rate, order flow imbalance, and realized volatility.
2) Recommended ML horizons: {', '.join(str(h) for h in horizons)} events, balancing fast reaction
   (short horizon) and stability (longer horizon).
3) Market making is typically most favorable in tight spread and low/normal volatility regimes.
"""

    out_path.write_text(content, encoding="utf-8")


In [None]:
def _export(df: pd.DataFrame, path: Path, export_format: str) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    if export_format == "parquet":
        df.to_parquet(path, index=False)
    else:
        df.to_csv(path, index=False)

def _process_pair(message_path: Path, orderbook_path: Path, ticker: str, date: str, cfg: Config) -> None:
    chunk_rows = cfg.chunk_rows if message_path.stat().st_size > 200 * 1024 * 1024 else None
    full_df, iterator = load_lobster(message_path, orderbook_path, cfg.num_levels, chunk_rows)
    if iterator is not None:
        frames = [chunk for chunk in iterator]
        df = pd.concat(frames, ignore_index=True)
    else:
        df = full_df
    df = add_halt_flags(df)

    sanity = sanity_checks(df, cfg.num_levels)
    sanity_path = Path(cfg.output_dir) / "sanity" / f"sanity_{ticker}_{date}.json"
    write_sanity(sanity, sanity_path)

    features = compute_features(df, cfg.num_levels, cfg.topk)
    labeled = generate_labels(features, cfg.horizons_events)

    processed_path = Path(cfg.output_dir) / "processed" / f"processed_{ticker}_{date}.{cfg.export_format}"
    _export(labeled, processed_path, cfg.export_format)

    for horizon in cfg.horizons_events:
        cols = [
            "time_ns",
            "event_type",
            "order_id",
            "size",
            "price",
            "direction",
            "best_bid",
            "best_ask",
            "mid",
            "spread",
            "is_halt",
            "level1_imbalance",
            "topk_imbalance",
            "depth_topk_bid",
            "depth_topk_ask",
            "depth_total_topk",
            "microprice",
            "event_rate",
            "order_flow_imbalance",
            "realized_volatility",
            f"raw_move_H{horizon}",
            f"label_H{horizon}",
        ]
        feature_df = labeled[cols]
        feature_path = (
            Path(cfg.output_dir)
            / "features"
            / f"features_H{horizon}_{ticker}_{date}.{cfg.export_format}"
        )
        _export(feature_df, feature_path, cfg.export_format)

    if cfg.plotting:
        plot_dir = Path(cfg.output_dir) / "plots"
        stats = run_eda(labeled, plot_dir, ticker, date)
    else:
        stats = {}

    report_path = Path(cfg.output_dir) / "reports" / f"eda_report_{ticker}_{date}.md"
    write_report(report_path, ticker, date, stats, sanity, cfg.horizons_events)

def run_pipeline() -> None:
    data_root = Path(os.getenv("LOB_DATA_ROOT", "data/raw"))
    out_dir = os.getenv("LOB_OUT", None)
    ticker = os.getenv("LOB_TICKER", None)
    date = os.getenv("LOB_DATE", None)
    run_all = os.getenv("LOB_ALL", "0") == "1"
    export_format = os.getenv("LOB_FORMAT", None)
    no_plots = os.getenv("LOB_NO_PLOTS", "0") == "1"

    cfg = Config.load(Path("config.yaml"))
    if out_dir:
        cfg.output_dir = out_dir
    if export_format:
        cfg.export_format = export_format
    if no_plots:
        cfg.plotting = False

    pairs = discover_pairs(data_root, Path("."))
    if not pairs:
        raise SystemExit("No dataset pairs found.")

    if run_all:
        selected = pairs
    else:
        if not ticker or not date:
            raise SystemExit("Provide LOB_TICKER and LOB_DATE or set LOB_ALL=1.")
        selected = [
            pair
            for pair in pairs
            if pair[2].lower() == ticker.lower() and pair[3] == date
        ]
        if not selected:
            raise SystemExit("Requested ticker/date not found in discovered pairs.")

    for message_path, orderbook_path, ticker, date in selected:
        _process_pair(message_path, orderbook_path, ticker, date, cfg)


In [None]:
run_pipeline()
