In [1]:
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import databento as db
from numba import njit
from dataclasses import dataclass
import warnings
warnings.filterwarnings('ignore')

In [2]:
DEPTH = 20
MAX_LEVELS = 2000
SNAPSHOT_INTERVAL_NS = 100_000_000 

In [3]:
def adapt_databento_df(path: str) -> pd.DataFrame:
    df = db.DBNStore.from_file(path).to_df()

    df = df[df["action"].isin(["A", "C"])]
    df = df[df["side"].isin(["B", "A"])]
    df = df[df["price"].notna()]

    df["action"] = (df["action"] == "C").astype(np.int8)
    df["side"] = (df["side"] == "B").astype(np.int8)
    df["price"] = df["price"].astype(np.float64)
    df["ts_event"] = df["ts_event"].view("int64")

    return df[["ts_event", "price", "size", "side", "action"]]

In [7]:
def prepare_df(df: pd.DataFrame) -> pd.DataFrame:
    return df.sort_values("ts_event").reset_index(drop=True)

In [9]:
def estimate_snapshots(ts: np.ndarray) -> int:
    return int((ts[-1] - ts[0]) // SNAPSHOT_INTERVAL_NS + 1)

In [11]:
@njit
def process_stream(
    ts, price, size, side, action,
    out_ts, out_bp, out_bs, out_ap, out_as
):
    bids_p = np.empty(MAX_LEVELS, np.float64)
    bids_s = np.empty(MAX_LEVELS, np.int64)
    asks_p = np.empty(MAX_LEVELS, np.float64)
    asks_s = np.empty(MAX_LEVELS, np.int64)

    nb = 0
    na = 0
    snap_i = 0
    next_snap = ts[0] + SNAPSHOT_INTERVAL_NS

    for i in range(len(ts)):
        p = price[i]
        s = size[i]

        if side[i] == 1:  # bid
            prices, sizes, n, desc = bids_p, bids_s, nb, True
        else:             # ask
            prices, sizes, n, desc = asks_p, asks_s, na, False

        pos = 0
        while pos < n:
            if prices[pos] == p:
                break
            if desc and prices[pos] < p:
                break
            if not desc and prices[pos] > p:
                break
            pos += 1

        if action[i] == 0:
            if pos < n and prices[pos] == p:
                sizes[pos] = s
            else:
                if n < MAX_LEVELS:
                    for j in range(n, pos, -1):
                        prices[j] = prices[j - 1]
                        sizes[j] = sizes[j - 1]
                    prices[pos] = p
                    sizes[pos] = s
                    n += 1

        else:
            if pos < n and prices[pos] == p:
                for j in range(pos, n - 1):
                    prices[j] = prices[j + 1]
                    sizes[j] = sizes[j + 1]
                n -= 1

        if side[i] == 1:
            nb = n
        else:
            na = n

        if ts[i] >= next_snap:
            out_ts[snap_i] = ts[i]

            last_price = bids_p[0] if nb > 0 else 0
            for k in range(DEPTH):
                if k < nb:
                    out_bp[snap_i, k] = bids_p[k]
                    out_bs[snap_i, k] = bids_s[k]
                    last_price = bids_p[k]
                else:
                    out_bp[snap_i, k] = last_price
                    out_bs[snap_i, k] = 0

        
            last_price = asks_p[0] if na > 0 else 0
            for k in range(DEPTH):
                if k < na:
                    out_ap[snap_i, k] = asks_p[k]
                    out_as[snap_i, k] = asks_s[k]
                    last_price = asks_p[k]
                else:
                    out_ap[snap_i, k] = last_price
                    out_as[snap_i, k] = 0

            snap_i += 1
            next_snap += SNAPSHOT_INTERVAL_NS

    return snap_i

In [13]:
def build_lob_snapshots(df: pd.DataFrame):
    df = prepare_df(df)

    ts = df["ts_event"].values
    price = df["price"].values
    size = df["size"].values
    side = df["side"].values
    action = df["action"].values

    n_snaps = estimate_snapshots(ts)

    out_ts = np.empty(n_snaps, np.int64)
    out_bp = np.empty((n_snaps, DEPTH), np.float64)
    out_bs = np.empty((n_snaps, DEPTH), np.int64)
    out_ap = np.empty((n_snaps, DEPTH), np.float64)
    out_as = np.empty((n_snaps, DEPTH), np.int64)

    real_n = process_stream(
        ts, price, size, side, action,
        out_ts, out_bp, out_bs, out_ap, out_as
    )

    return (
        out_ts[:real_n],
        out_bp[:real_n],
        out_bs[:real_n],
        out_ap[:real_n],
        out_as[:real_n],
    )

In [17]:
def snapshots_to_dataframe(ts, bp, bs, ap, a_s):
    data = {"ts": ts}
    depth = bp.shape[1]

    for k in range(depth):
        data[f"bids[{k}].price"] = bp[:, k]
        data[f"bids[{k}].amount"] = bs[:, k]
        data[f"asks[{k}].price"] = ap[:, k]
        data[f"asks[{k}].amount"] = a_s[:, k]

    data = pd.DataFrame(data)
    data["datetime"] = pd.to_datetime(data["ts"], unit="ns")
    lob = {}
    data = data.set_index('datetime')

    for k in range(depth):
        lob[f"bids[{k}].price"] = data[f"bids[{k}].price"].resample("100ms").agg('mean').ffill()
        lob[f"bids[{k}].amount"] = data[f"bids[{k}].amount"].resample("100ms").agg('sum').fillna(0)
        lob[f"asks[{k}].price"] = data[f"asks[{k}].price"].resample("100ms").agg('mean').ffill().fillna(0)
        lob[f"asks[{k}].amount"] = data[f"asks[{k}].amount"].resample("100ms").agg('sum').fillna(0)
    
    return pd.DataFrame(lob)

In [19]:
def total_processing(dates):
    for asset in ['QQQ', 'SPY', 'FAST', 'MRVL', 'VRSK', 'VXX', 'ANSS']:
        for date in dates:
            df = adapt_databento_df(f'C:/Users/Эвелина Новикова/Downloads/xnas-itch-{date}.mbo.{asset}.dbn.zst')
            ts, bp, bs, ap, a_s = build_lob_snapshots(df)
            lob = snapshots_to_dataframe(ts, bp, bs, ap, a_s)
            
            lob_table = pa.Table.from_pandas(lob)
            pq.write_table(lob_table, f'{asset}_{date}.parquet')

In [1]:
# dates = [20250630, 20250701, 20250702, 20250703]
# total_processing(dates)