In [13]:
import heapq
from typing import List
from dataclasses import dataclass
import pandas as pd
import numpy as np

In [14]:
# Clock generator
def clock_gen():
    t = 0
    while True:
        t += 1
        yield t

clock = clock_gen()

# Order ID generator
def order_id_gen_func():
    oid = 0
    while True:
        oid += 1
        yield oid

order_id_gen = order_id_gen_func()

In [15]:
@dataclass
class Order:
    order_id: int
    side: str
    price: float
    qty: int
    remaining_qty: int
    timestamp: int
    status: str = "NEW"

@dataclass
class Trade:
    price: float
    qty: int
    buyer_id: int
    seller_id: int
    timestamp: int


In [16]:
class OrderBook:
    def __init__(self, snapshot_interval=1.0, depth_levels=5):
        self.bids = []   # max-heap: (-price, timestamp, order_id, order)
        self.asks = []   # min-heap: (price, timestamp, order_id, order)
        self.trades: List[Trade] = []
        self.order_lookup = {}
        self.tape: List[dict] = []          # Trade tape
        self.l1_snapshots: List[dict] = []  # Best bid/ask snapshots
        self.l2_snapshots: List[dict] = []  # Depth snapshots
        self.snapshot_interval = snapshot_interval
        self.depth_levels = depth_levels
        self.last_snapshot_time = 0

    # ----------------------------
    # Add Order
    # ----------------------------
    def add_order(self, side, price, qty):
        order = Order(
            order_id=next(order_id_gen),
            side=side,
            price=price,
            qty=qty,
            remaining_qty=qty,
            timestamp=next(clock)
        )
        self.order_lookup[order.order_id] = order
        self.match(order)
        if order.remaining_qty > 0 and price is not None:
            self._add_to_book(order)

    # ----------------------------
    # Add to Book
    # ----------------------------
    def _add_to_book(self, order: Order):
        if order.side.lower() == "buy":
            heapq.heappush(self.bids, (-order.price, order.timestamp, order.order_id, order))
        else:
            heapq.heappush(self.asks, (order.price, order.timestamp, order.order_id, order))

    # ----------------------------
    # L1 snapshot
    # ----------------------------
    def maybe_snapshot_l1(self, current_time):
        if current_time - self.last_snapshot_time >= self.snapshot_interval:
            best_bid = -self.bids[0][0] if self.bids else None
            best_ask = self.asks[0][0] if self.asks else None
            if best_bid is not None and best_ask is not None:
                mid_price = (best_bid + best_ask) / 2
                spread = best_ask - best_bid
            else:
                mid_price = None
                spread = None
            self.l1_snapshots.append({
                "timestamp": current_time,
                "best_bid": best_bid,
                "best_ask": best_ask,
                "spread": spread,
                "mid_price": mid_price
            })
            self.last_snapshot_time = current_time
            # Also take L2 snapshot at same time
            self.snapshot_l2(current_time)

    # ----------------------------
    # L2 snapshot (top N levels)
    # ----------------------------
    def snapshot_l2(self, current_time):
        # Top N bids
        bid_levels = []
        for p, _, _, o in sorted(self.bids, reverse=True)[:self.depth_levels]:
            bid_levels.append(( -p, o.remaining_qty))
        # Top N asks
        ask_levels = []
        for p, _, _, o in sorted(self.asks)[:self.depth_levels]:
            ask_levels.append((p, o.remaining_qty))
        self.l2_snapshots.append({
            "timestamp": current_time,
            "bid_levels": bid_levels,
            "ask_levels": ask_levels
        })

    # ----------------------------
    # Matching Engine
    # ----------------------------
    def match(self, incoming: Order):
        while incoming.remaining_qty > 0:
            # Advance clock & maybe snapshot L1/L2
            current_time = next(clock)
            self.maybe_snapshot_l1(current_time)

            # Buy side
            if incoming.side.lower() == "buy":
                if not self.asks:
                    break
                while self.asks:
                    best_price, _, _, resting = self.asks[0]
                    if resting.status == "CANCELLED":
                        heapq.heappop(self.asks)
                        continue
                    break
                if not self.asks or (incoming.price is not None and incoming.price < best_price):
                    break
            # Sell side
            else:
                if not self.bids:
                    break
                while self.bids:
                    best_price, _, _, resting = self.bids[0]
                    best_price = -best_price
                    if resting.status == "CANCELLED":
                        heapq.heappop(self.bids)
                        continue
                    break
                if not self.bids or (incoming.price is not None and incoming.price > best_price):
                    break

            # Execute trade
            executed_qty = min(incoming.remaining_qty, resting.remaining_qty)
            trade_price = resting.price

            buyer_id = incoming.order_id if incoming.side.lower() == "buy" else resting.order_id
            seller_id = resting.order_id if incoming.side.lower() == "buy" else incoming.order_id

            trade_timestamp = next(clock)

            # Record trade
            self.trades.append(
                Trade(price=trade_price, qty=executed_qty, buyer_id=buyer_id, seller_id=seller_id, timestamp=trade_timestamp)
            )

            # Append to tape
            self.tape.append({
                "timestamp": trade_timestamp,
                "price": trade_price,
                "size": executed_qty,
                "aggressor_side": incoming.side.upper()
            })

            # Update quantities
            incoming.remaining_qty -= executed_qty
            resting.remaining_qty -= executed_qty

            if resting.remaining_qty == 0:
                resting.status = "FILLED"
                if resting.side.lower() == "buy":
                    heapq.heappop(self.bids)
                else:
                    heapq.heappop(self.asks)
            else:
                resting.status = "PARTIALLY_FILLED"

        incoming.status = "FILLED" if incoming.remaining_qty == 0 else "PARTIALLY_FILLED"


In [17]:
# Initialize book
book = OrderBook(snapshot_interval=1.0, depth_levels=3)

# Add orders
book.add_order("buy", 100, 10)
book.add_order("sell", 101, 5)
book.add_order("buy", 102, 20)
book.add_order("sell", 102, 15)
book.add_order("buy", 103, 5)
book.add_order("sell", 104, 10)


In [18]:
print("Trade Tape:")
for row in book.tape:
    print(row)


Trade Tape:
{'timestamp': 7, 'price': 101, 'size': 5, 'aggressor_side': 'BUY'}
{'timestamp': 11, 'price': 102, 'size': 15, 'aggressor_side': 'SELL'}


In [19]:
print("L1 Snapshots:")
for snap in book.l1_snapshots:
    print(snap)


L1 Snapshots:
{'timestamp': 2, 'best_bid': None, 'best_ask': None, 'spread': None, 'mid_price': None}
{'timestamp': 4, 'best_bid': 100, 'best_ask': None, 'spread': None, 'mid_price': None}
{'timestamp': 6, 'best_bid': 100, 'best_ask': 101, 'spread': 1, 'mid_price': 100.5}
{'timestamp': 8, 'best_bid': 100, 'best_ask': None, 'spread': None, 'mid_price': None}
{'timestamp': 10, 'best_bid': 102, 'best_ask': None, 'spread': None, 'mid_price': None}
{'timestamp': 13, 'best_bid': 100, 'best_ask': None, 'spread': None, 'mid_price': None}
{'timestamp': 15, 'best_bid': 103, 'best_ask': None, 'spread': None, 'mid_price': None}


In [20]:
print("L2 Snapshots:")
for snap in book.l2_snapshots:
    print(snap)


L2 Snapshots:
{'timestamp': 2, 'bid_levels': [], 'ask_levels': []}
{'timestamp': 4, 'bid_levels': [(100, 10)], 'ask_levels': []}
{'timestamp': 6, 'bid_levels': [(100, 10)], 'ask_levels': [(101, 5)]}
{'timestamp': 8, 'bid_levels': [(100, 10)], 'ask_levels': []}
{'timestamp': 10, 'bid_levels': [(100, 10), (102, 15)], 'ask_levels': []}
{'timestamp': 13, 'bid_levels': [(100, 10)], 'ask_levels': []}
{'timestamp': 15, 'bid_levels': [(100, 10), (103, 5)], 'ask_levels': []}


In [21]:
# Convert tape to DataFrame
df_tape = pd.DataFrame(book.tape)
df_l1 = pd.DataFrame(book.l1_snapshots)

# VWAP
vwap = (df_tape['price'] * df_tape['size']).sum() / df_tape['size'].sum()
print("VWAP:", vwap)

# Spread statistics
spread_mean = df_l1['spread'].mean()
spread_min = df_l1['spread'].min()
spread_max = df_l1['spread'].max()
print(f"Spread -> mean: {spread_mean}, min: {spread_min}, max: {spread_max}")

# Mid-price volatility
df_l1['mid_price_return'] = df_l1['mid_price'].pct_change()
mid_vol = df_l1['mid_price_return'].std()
print("Mid-price Volatility:", mid_vol)


VWAP: 101.75
Spread -> mean: 1.0, min: 1.0, max: 1.0
Mid-price Volatility: 0.0


  df_l1['mid_price_return'] = df_l1['mid_price'].pct_change()
