In [41]:
from typing import Tuple
import numpy as np
import pandas as pd
import databento as db
from numba import njit, int64, int32, boolean
import math
import warnings
warnings.filterwarnings('ignore')

In [25]:
def db_preparing(path: str) -> pd.DataFrame:
    store = db.DBNStore.from_file(path)
    df = store.to_df()
    df['ts_event'] = df['ts_event'].astype('int64')
    df_prepared = df[['ts_event', 'action', 'order_id', 'price', 'size', 'flags', 'side']]
    return df_prepared

In [43]:
# df_prepared = db_preparing(r'C:\Users\Эвелина Новикова\Downloads\xnas-itch-20250731.mbo.SPY.dbn.zst')
# df_prepared.head()

In [None]:
SNAP_INTERVAL_NS = 100_000_000 
PRICE_SCALE = 1_000_000         
F_TOB_FLAG = 1 << 0         
MAX_DEPTH_DEFAULT = 20


@njit
def _insert_top_k_price(top_prices, top_sizes, k, price, size):
    if price == -1:
        return
    
    for i in range(k):
        if top_prices[i] == -1:
            top_prices[i] = price
            top_sizes[i] = size
            return
    return

@njit
def _insert_descending(top_prices, top_sizes, k, price, size):
    if price == -1:
        return
    
    i = 0
    while i < k and top_prices[i] != -1 and top_prices[i] > price:
        i += 1
    if i >= k:
        return
    j = k - 1
    while j > i:
        top_prices[j] = top_prices[j - 1]
        top_sizes[j] = top_sizes[j - 1]
        j -= 1
    top_prices[i] = price
    top_sizes[i] = size

@njit
def _insert_ascending(top_prices, top_sizes, k, price, size):
    if price == -1:
        return
    i = 0
    while i < k and top_prices[i] != -1 and top_prices[i] < price:
        i += 1
    if i >= k:
        return
    j = k - 1
    while j > i:
        top_prices[j] = top_prices[j - 1]
        top_sizes[j] = top_sizes[j - 1]
        j -= 1
    top_prices[i] = price
    top_sizes[i] = size

In [19]:
@njit
def nb_add_order(order_slot_arr, order_active_arr, order_level_idx_arr, order_side_arr, order_size_arr,
                 level_sizes_b, level_counts_b, level_sizes_a, level_counts_a,
                 order_slot, level_idx, side, size, flags):

    if flags & F_TOB_FLAG:
        if side == 1:
            for i in range(level_sizes_b.shape[0]):
                level_sizes_b[i] = 0
                level_counts_b[i] = 0
        else:
            for i in range(level_sizes_a.shape[0]):
                level_sizes_a[i] = 0
                level_counts_a[i] = 0

    order_slot_arr[order_slot] = order_slot 
    order_active_arr[order_slot] = 1
    order_level_idx_arr[order_slot] = level_idx
    order_side_arr[order_slot] = side
    order_size_arr[order_slot] = size

    if side == 1:
        level_sizes_b[level_idx] += size
        level_counts_b[level_idx] += 1
    else:
        level_sizes_a[level_idx] += size
        level_counts_a[level_idx] += 1

@njit
def nb_cancel_order(order_active_arr, order_level_idx_arr, order_side_arr, order_size_arr,
                    level_sizes_b, level_counts_b, level_sizes_a, level_counts_a,
                    order_slot, cancel_size):
    if order_slot < 0:
        return
    if order_active_arr[order_slot] == 0:
        return
    lvl = order_level_idx_arr[order_slot]
    side = order_side_arr[order_slot]
    current_size = order_size_arr[order_slot]
    # defensive
    if cancel_size > current_size:
        cancel_size = current_size
    new_size = current_size - cancel_size
    order_size_arr[order_slot] = new_size
    if side == 1:
        level_sizes_b[lvl] -= cancel_size
        if level_sizes_b[lvl] < 0:
            level_sizes_b[lvl] = 0
    else:
        level_sizes_a[lvl] -= cancel_size
        if level_sizes_a[lvl] < 0:
            level_sizes_a[lvl] = 0

    if new_size == 0:
        order_active_arr[order_slot] = 0
        if side == 1:
            level_counts_b[lvl] -= 1
            if level_counts_b[lvl] < 0:
                level_counts_b[lvl] = 0
        else:
            level_counts_a[lvl] -= 1
            if level_counts_a[lvl] < 0:
                level_counts_a[lvl] = 0

@njit
def nb_modify_order(order_active_arr, order_level_idx_arr, order_side_arr, order_size_arr,
                    level_sizes_b, level_counts_b, level_sizes_a, level_counts_a,
                    order_slot, new_level_idx, new_size):
    if order_slot < 0:
        return
    if order_active_arr[order_slot] == 0:
        order_level_idx_arr[order_slot] = new_level_idx
        order_size_arr[order_slot] = new_size
        order_active_arr[order_slot] = 1
        side = order_side_arr[order_slot]
        if side == 1:
            level_sizes_b[new_level_idx] += new_size
            level_counts_b[new_level_idx] += 1
        else:
            level_sizes_a[new_level_idx] += new_size
            level_counts_a[new_level_idx] += 1
        return

    old_lvl = order_level_idx_arr[order_slot]
    side = order_side_arr[order_slot]
    old_size = order_size_arr[order_slot]

    if side == 1:
        level_sizes_b[old_lvl] -= old_size
        level_counts_b[old_lvl] -= 1
        if level_sizes_b[old_lvl] < 0:
            level_sizes_b[old_lvl] = 0
        if level_counts_b[old_lvl] < 0:
            level_counts_b[old_lvl] = 0
    else:
        level_sizes_a[old_lvl] -= old_size
        level_counts_a[old_lvl] -= 1
        if level_sizes_a[old_lvl] < 0:
            level_sizes_a[old_lvl] = 0
        if level_counts_a[old_lvl] < 0:
            level_counts_a[old_lvl] = 0

    order_level_idx_arr[order_slot] = new_level_idx
    order_size_arr[order_slot] = new_size

    if side == 1:
        level_sizes_b[new_level_idx] += new_size
        level_counts_b[new_level_idx] += 1
    else:
        level_sizes_a[new_level_idx] += new_size
        level_counts_a[new_level_idx] += 1

@njit
def nb_clear_all(order_active_arr, order_size_arr, level_sizes_b, level_counts_b, level_sizes_a, level_counts_a):
    n_orders = order_active_arr.shape[0]
    n_levels = level_sizes_b.shape[0]
    for i in range(n_orders):
        order_active_arr[i] = 0
        order_size_arr[i] = 0
    for j in range(n_levels):
        level_sizes_b[j] = 0
        level_counts_b[j] = 0
        level_sizes_a[j] = 0
        level_counts_a[j] = 0

@njit
def nb_get_top_k(level_prices, level_sizes, level_counts, n_levels, depth, out_prices, out_sizes, is_bid):
    for i in range(depth):
        out_prices[i] = -1
        out_sizes[i] = 0

    for i in range(n_levels):
        if level_counts[i] <= 0:
            continue
        p = level_prices[i]
        s = level_sizes[i]
        if is_bid:
            _insert_descending(out_prices, out_sizes, depth, p, s)
        else:
            _insert_ascending(out_prices, out_sizes, depth, p, s)


In [17]:
@njit
def process_stream_numba(msg_ts, msg_action, msg_order_slot, msg_level_idx, msg_price_int, msg_size, msg_flags, msg_side,
                         n_orders, n_levels, depth):
    N_msgs = msg_ts.shape[0]
    if N_msgs == 0:
        return 0, np.empty(0, dtype=np.int64), np.empty((0, depth), dtype=np.int64), np.empty((0, depth), dtype=np.int64), np.empty((0, depth), dtype=np.int64), np.empty((0, depth), dtype=np.int64)

    first_ts = msg_ts[0]
    last_ts = msg_ts[N_msgs - 1]
    max_snaps = int((last_ts - first_ts) // SNAP_INTERVAL_NS) + 5
    snaps_ts = np.zeros(max_snaps, dtype=np.int64)
    bids_prices = np.full((max_snaps, depth), -1, dtype=np.int64)
    bids_sizes  = np.zeros((max_snaps, depth), dtype=np.int64)
    asks_prices = np.full((max_snaps, depth), -1, dtype=np.int64)
    asks_sizes  = np.zeros((max_snaps, depth), dtype=np.int64)

    order_active = np.zeros(n_orders, dtype=np.int32)
    order_slot_arr = np.full(n_orders, -1, dtype=np.int32)
    order_level_idx_arr = np.full(n_orders, -1, dtype=np.int32)
    order_side_arr = np.zeros(n_orders, dtype=np.int32)
    order_size_arr = np.zeros(n_orders, dtype=np.int64)

    level_sizes_b = np.zeros(n_levels, dtype=np.int64)
    level_counts_b = np.zeros(n_levels, dtype=np.int32)
    level_sizes_a = np.zeros(n_levels, dtype=np.int64)
    level_counts_a = np.zeros(n_levels, dtype=np.int32)

    next_snapshot_ts = (first_ts // SNAP_INTERVAL_NS) * SNAP_INTERVAL_NS
    out_i = 0

    for i in range(N_msgs):
        act = msg_action[i]
        slot = msg_order_slot[i]
        lvl = msg_level_idx[i]
        size = msg_size[i]
        flags = msg_flags[i]
        side = msg_side[i]
        if act == 1:  
            nb_add_order(order_slot_arr, order_active, order_level_idx_arr, order_side_arr, order_size_arr,
                         level_sizes_b, level_counts_b, level_sizes_a, level_counts_a,
                         slot, lvl, side, size, flags)
        elif act == 2:  
            nb_cancel_order(order_active, order_level_idx_arr, order_side_arr, order_size_arr,
                            level_sizes_b, level_counts_b, level_sizes_a, level_counts_a,
                            slot, size)
        elif act == 3: 
            nb_modify_order(order_active, order_level_idx_arr, order_side_arr, order_size_arr,
                            level_sizes_b, level_counts_b, level_sizes_a, level_counts_a,
                            slot, lvl, size)
        elif act == 4: 
            nb_clear_all(order_active, order_size_arr, level_sizes_b, level_counts_b, level_sizes_a, level_counts_a)
    
        while msg_ts[i] >= next_snapshot_ts:
            nb_get_top_k(level_prices_global, level_sizes_b, level_counts_b, n_levels, depth, bids_prices[out_i], bids_sizes[out_i], True)
            nb_get_top_k(level_prices_global, level_sizes_a, level_counts_a, n_levels, depth, asks_prices[out_i], asks_sizes[out_i], False)
            snaps_ts[out_i] = next_snapshot_ts
            next_snapshot_ts += SNAP_INTERVAL_NS
            out_i += 1
            if out_i >= max_snaps:
                break

    return out_i, snaps_ts[:out_i], bids_prices[:out_i].copy(), bids_sizes[:out_i].copy(), asks_prices[:out_i].copy(), asks_sizes[:out_i].copy()


level_prices_global = np.array([], dtype=np.int64) 

In [33]:
def prepare_messages_for_numba(df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    df2 = df.sort_values("ts_event").reset_index(drop=True)
    price_floats = df2["price"].to_numpy(dtype=float)
    price_ints = np.rint(price_floats * PRICE_SCALE).astype(np.int64)

    unique_prices = np.unique(price_ints)
    unique_prices.sort()
    n_levels = unique_prices.shape[0]

    level_idx = np.searchsorted(unique_prices, price_ints) 
    order_ids = df2["order_id"].to_numpy(dtype=np.int64)
    unique_order_ids = np.unique(order_ids)
    order_id_to_slot = {int(oid): i for i, oid in enumerate(unique_order_ids)}
    n_orders = unique_order_ids.shape[0]
    msg_slots = np.empty(order_ids.shape[0], dtype=np.int32)
    for i, oid in enumerate(order_ids):
        msg_slots[i] = order_id_to_slot[int(oid)]

    act_map = {"A": 1, "C": 2, "M": 3, "R": 4}
    actions = df2["action"].astype(str).to_numpy()
    msg_actions = np.zeros(actions.shape[0], dtype=np.int32)
    for i, a in enumerate(actions):
        msg_actions[i] = act_map.get(a, 0)

    sides = df2["side"].astype(str).to_numpy()
    msg_sides = np.zeros(sides.shape[0], dtype=np.int32)
    for i, s in enumerate(sides):
        msg_sides[i] = 1 if s == "B" else 0

    msg_sizes = df2["size"].to_numpy(dtype=np.int64)
    msg_flags = df2["flags"].to_numpy(dtype=np.int64)
    msg_ts = df2["ts_event"].to_numpy(dtype=np.int64)


    global level_prices_global
    level_prices_global = unique_prices.astype(np.int64)

    return msg_ts, msg_actions, msg_slots, level_idx.astype(np.int32), price_ints, msg_sizes, msg_flags, msg_sides, n_orders, n_levels


def run_pipeline(df: pd.DataFrame, depth: int = MAX_DEPTH_DEFAULT) -> pd.DataFrame:
    msg_ts, msg_actions, msg_slots, msg_level_idx, msg_price_ints, msg_sizes, msg_flags, msg_sides, n_orders, n_levels = prepare_messages_for_numba(df)

    global level_prices_global
    level_prices_global = level_prices_global 
    out = process_stream_numba(msg_ts, msg_actions, msg_slots, msg_level_idx, msg_price_ints, msg_sizes, msg_flags, msg_sides, n_orders, n_levels, depth)
    snaps_count = out[0]
    if snaps_count == 0:
        return pd.DataFrame()  

    snaps_ts = out[1]
    bids_prices = out[2]   
    bids_sizes  = out[3]
    asks_prices = out[4]
    asks_sizes  = out[5]

    rows = []
    for i in range(snaps_count):
        row = {"timestamp_ns": int(snaps_ts[i])}
        # bids
        for d in range(depth):
            p = bids_prices[i, d]
            s = bids_sizes[i, d]
            if p == -1:
                row[f"bids[{d}].price"] = None
                row[f"bids[{d}].amount"] = None
            else:
                row[f"bids[{d}].price"] = p / PRICE_SCALE
                row[f"bids[{d}].amount"] = int(s)
        # asks
        for d in range(depth):
            p = asks_prices[i, d]
            s = asks_sizes[i, d]
            if p == -1:
                row[f"asks[{d}].price"] = None
                row[f"asks[{d}].amount"] = None
            else:
                row[f"asks[{d}].price"] = p / PRICE_SCALE
                row[f"asks[{d}].amount"] = int(s)
        rows.append(row)

    df_out = pd.DataFrame(rows)
   
    try:
        df_out["timestamp"] = pd.to_datetime(df_out["timestamp_ns"].astype(np.int64))
        df_out = df_out.set_index("timestamp")
    except Exception:
        pass

    return df_out

In [39]:
# result = run_pipeline(df_prepared, 20)
# result.head()