Using Heapq

In [1]:
import heapq
import numpy as np
import time
import random

class HeapOrderBook:
    def __init__(self):
        self.bids = []  # Max-Heap (simulated via negative prices)
        self.asks = []  # Min-Heap
        self.trades = []

    def add_order(self, current_step, side, price, quantity):
        if side == 'buy':
            # HEAP TRICK: Invert price to simulate Max-Heap with Python's Min-Heap
            # Storing: [-price, quantity, current_step]
            heapq.heappush(self.bids, [-price, quantity, current_step])
        else:
            # Storing: [price, quantity, current_step]
            heapq.heappush(self.asks, [price, quantity, current_step])
            
        self.match()

    def match(self):
        # Check if both sides have orders
        # AND check if Best Bid >= Best Ask
        # Note: self.bids[0][0] is negative, so we negate it back to compare
        while self.bids and self.asks and -self.bids[0][0] >= self.asks[0][0]:
            
            best_bid = self.bids[0] # List reference (mutable)
            best_ask = self.asks[0] # List reference (mutable)
            
            bid_price = -best_bid[0] # Restore actual price
            ask_price = best_ask[0]
            
            # Match Logic
            trade_price = bid_price # Using Bid price as match price (aggressive)
            trade_qty = min(best_bid[1], best_ask[1])
            
            self.trades.append({'price': trade_price, 'qty': trade_qty})
            
            # Update Quantities (in-place modification preserves heap invariant regarding price)
            best_bid[1] -= trade_qty
            best_ask[1] -= trade_qty
            
            # Remove filled orders
            if best_bid[1] == 0:
                heapq.heappop(self.bids)
            if best_ask[1] == 0:
                heapq.heappop(self.asks)

    def get_l2_state(self, depth=5):
        # We cannot slice a heap like a list (heap[:depth] is NOT sorted).
        # We use nsmallest to retrieve the top N items efficiently.
        
        # Retrieve top N bids (remember they are negative)
        top_bids = heapq.nsmallest(depth, self.bids)
        bid_prices = [-order[0] for order in top_bids] # Negate back to positive
        
        # Retrieve top N asks
        top_asks = heapq.nsmallest(depth, self.asks)
        ask_prices = [order[0] for order in top_asks]
        
        # Padding if not enough orders
        bid_prices += [0] * (depth - len(bid_prices))
        ask_prices += [0] * (depth - len(ask_prices))
        
        return np.array(bid_prices + ask_prices, dtype=np.float32)

10000 orders

In [2]:
# --- Helper to generate random orders ---
def generate_orders(n=10000):
    orders = []
    for i in range(n):
        side = 'buy' if random.random() > 0.5 else 'sell'
        price = round(random.uniform(90, 110), 2)
        qty = random.randint(1, 10)
        orders.append((i, side, price, qty))
    return orders

# --- Benchmarking ---
orders_data = generate_orders(10000)

# 1. Test Original List-Based OrderBook
# (Paste your original class code here or ensure it's imported as `ListOrderBook`)
# Assuming your original class is named `ListOrderBook` for this test:
class ListOrderBook:
    def __init__(self):
        self.bids = []  
        self.asks = []  
        self.trades = [] 
    def add_order(self, current_step, side, price, quantity):
        if side == 'buy':
            self.bids.append([price, quantity, current_step])
            self.bids = sorted(self.bids, key=lambda x: x[0], reverse=True)
        else:
            self.asks.append([price, quantity, current_step])
            self.asks = sorted(self.asks, key=lambda x: x[0])
        self.match()
    def match(self):
        while self.bids and self.asks and self.bids[0][0] >= self.asks[0][0]:
            best_bid = self.bids[0] 
            best_ask = self.asks[0] 
            trade_qty = min(best_bid[1], best_ask[1])
            best_bid[1] -= trade_qty
            best_ask[1] -= trade_qty
            if best_bid[1] == 0: self.bids.pop(0)
            if best_ask[1] == 0: self.asks.pop(0)

print(f"--- Benchmarking {len(orders_data)} Orders ---")

# Measure List Implementation
list_book = ListOrderBook()
start_time = time.time()
for step, side, price, qty in orders_data:
    list_book.add_order(step, side, price, qty)
print(f"List Implementation: {time.time() - start_time:.4f} seconds")

# Measure Heap Implementation
heap_book = HeapOrderBook()
start_time = time.time()
for step, side, price, qty in orders_data:
    heap_book.add_order(step, side, price, qty)
print(f"Heap Implementation: {time.time() - start_time:.4f} seconds")

--- Benchmarking 10000 Orders ---
List Implementation: 0.2909 seconds
Heap Implementation: 0.0217 seconds
