# ITCH Parser on PYNQ-Z2 (DMA + AXI-Stream)

Test notebook for the ITCH parser with DMA streaming interface.

**Architecture:**
```
CPU → DDR buffer → DMA (M_AXIS_MM2S) → ITCH Parser (S00_AXIS) → Parsed Results → CPU reads via AXI-Lite
```

**Supported Message Types (9 total):**
| Type | Char | Length | Description |
|------|------|--------|-------------|
| 0 | 'A' | 36 | Add Order |
| 1 | 'X' | 23 | Cancel Order |
| 2 | 'D' | 9 | Delete Order |
| 3 | 'E' | 30 | Executed Order |
| 4 | 'U' | 27 | Replace Order |
| 5 | 'P' | 40 | Trade |
| 6 | 'F' | 40 | Add Order MPID |
| 7 | 'C' | 36 | Executed With Price |
| 8 | 'B' | 19 | Broken Trade |

In [None]:
from pynq import Overlay, allocate
import numpy as np
import struct
import time

# Load your bitstream (update filename to match yours)
overlay = Overlay('ITCH_parser.bit')
print("Bitstream loaded!")

# Show what's in the overlay
overlay?

In [None]:
# Get handles to DMA and ITCH IP
# NOTE: Check the exact names from overlay? output above
dma = overlay.axi_dma_0
itch = overlay.Itch_axi_stream_v1_0_0  # Update this name if different!

print(f"DMA: {dma}")
print(f"ITCH IP: {itch}")

In [None]:
# Register map
REG_RESERVED       = 0x00
REG_STATUS         = 0x04
REG_LATCHED_VALID  = 0x08
REG_LATCHED_TYPE   = 0x0C
REG_ORDER_REF_LO   = 0x10
REG_ORDER_REF_HI   = 0x14
REG_SIDE           = 0x18
REG_SHARES         = 0x1C
REG_PRICE          = 0x20
REG_NEW_ORDER_LO   = 0x24
REG_NEW_ORDER_HI   = 0x28
REG_TIMESTAMP_LO   = 0x2C
REG_TIMESTAMP_HI   = 0x30
REG_MISC_DATA_LO   = 0x34
REG_MISC_DATA_HI   = 0x38

# Message type mapping (matches decoder parsed_type values)
MSG_TYPES = {
    0: 'ADD',           # 'A' - add_order_decoder sets 4'd0
    1: 'CANCEL',        # 'X' - cancel_order_decoder sets 4'd1
    2: 'DELETE',        # 'D' - delete_order_decoder sets 4'd2
    3: 'EXEC',          # 'E' - executed_order_decoder sets 4'd3
    4: 'REPLACE',       # 'U' - replace_order_decoder sets 4'd4
    5: 'TRADE',         # 'P' - trade_decoder sets 4'd5
    6: 'ADD_MPID',      # 'F' - add_order_mpid_decoder sets 4'd6
    7: 'EXEC_PRICE',    # 'C' - executed_price_decoder sets 4'd7
    8: 'BROKEN',        # 'B' - broken_trade_decoder sets 4'd8
}

print("Register map defined.")

In [None]:
def send_message(data):
    """
    Send ITCH message bytes via DMA.
    """
    if isinstance(data, (list, tuple)):
        data = bytes(data)
    elif isinstance(data, bytearray):
        data = bytes(data)
    
    input_buffer = allocate(shape=(len(data),), dtype=np.uint8)
    for i, b in enumerate(data):
        input_buffer[i] = b
    
    dma.sendchannel.transfer(input_buffer)
    dma.sendchannel.wait()
    del input_buffer

def read_message():
    """Read parsed message from ITCH IP registers via AXI-Lite"""
    if not (itch.read(REG_LATCHED_VALID) & 1):
        return None
    
    msg_type = itch.read(REG_LATCHED_TYPE) & 0xF
    order_ref = (itch.read(REG_ORDER_REF_HI) << 32) | itch.read(REG_ORDER_REF_LO)
    
    return {
        'type': MSG_TYPES.get(msg_type, f'UNK({msg_type})'),
        'type_code': msg_type,
        'order_ref': order_ref,
        'side': 'SELL' if itch.read(REG_SIDE) & 1 else 'BUY',
        'shares': itch.read(REG_SHARES),
        'price': itch.read(REG_PRICE),
        'new_order_ref': (itch.read(REG_NEW_ORDER_HI) << 32) | itch.read(REG_NEW_ORDER_LO),
        'timestamp': ((itch.read(REG_TIMESTAMP_HI) & 0xFFFF) << 32) | itch.read(REG_TIMESTAMP_LO),
        'misc_data': (itch.read(REG_MISC_DATA_HI) << 32) | itch.read(REG_MISC_DATA_LO)
    }

def debug_registers():
    """Print all registers for debugging"""
    print(f"LATCHED_VALID:  {itch.read(REG_LATCHED_VALID)}")
    print(f"LATCHED_TYPE:   {itch.read(REG_LATCHED_TYPE)}")
    print(f"ORDER_REF_LO:   0x{itch.read(REG_ORDER_REF_LO):08X}")
    print(f"ORDER_REF_HI:   0x{itch.read(REG_ORDER_REF_HI):08X}")
    print(f"SIDE:           {itch.read(REG_SIDE)}")
    print(f"SHARES:         {itch.read(REG_SHARES)}")
    print(f"PRICE:          {itch.read(REG_PRICE)}")
    print(f"NEW_ORDER_LO:   0x{itch.read(REG_NEW_ORDER_LO):08X}")
    print(f"NEW_ORDER_HI:   0x{itch.read(REG_NEW_ORDER_HI):08X}")
    print(f"TIMESTAMP_LO:   0x{itch.read(REG_TIMESTAMP_LO):08X}")
    print(f"TIMESTAMP_HI:   0x{itch.read(REG_TIMESTAMP_HI):08X}")
    print(f"MISC_DATA_LO:   0x{itch.read(REG_MISC_DATA_LO):08X}")
    print(f"MISC_DATA_HI:   0x{itch.read(REG_MISC_DATA_HI):08X}")

print("Functions defined.")

In [None]:
# Payload generators for all 9 ITCH message types

def gen_add_order(order_ref, side, shares, symbol, price):
    """Add Order 'A' - 36 bytes"""
    p = bytearray(36)
    p[0] = ord('A')
    p[1:9] = struct.pack('>Q', order_ref)
    p[9] = ord('S') if side == 'S' else ord('B')
    p[10:14] = struct.pack('>I', shares)
    p[14:22] = symbol.ljust(8)[:8].encode()
    p[22:26] = struct.pack('>I', price)
    return bytes(p)

def gen_cancel_order(order_ref, shares):
    """Cancel Order 'X' - 23 bytes"""
    p = bytearray(23)
    p[0] = ord('X')
    p[1:9] = struct.pack('>Q', order_ref)
    p[9:13] = struct.pack('>I', shares)
    return bytes(p)

def gen_delete_order(order_ref):
    """Delete Order 'D' - 9 bytes"""
    p = bytearray(9)
    p[0] = ord('D')
    p[1:9] = struct.pack('>Q', order_ref)
    return bytes(p)

def gen_exec_order(timestamp, order_ref, shares, match_id):
    """Executed Order 'E' - 30 bytes"""
    p = bytearray(30)
    p[0] = ord('E')
    p[1:7] = struct.pack('>Q', timestamp)[2:8]  # 48-bit timestamp
    p[7:15] = struct.pack('>Q', order_ref)
    p[15:19] = struct.pack('>I', shares)
    p[19:27] = struct.pack('>Q', match_id)
    return bytes(p)

def gen_replace_order(old_ref, new_ref, shares, price):
    """Replace Order 'U' - 27 bytes"""
    p = bytearray(27)
    p[0] = ord('U')
    p[1:9] = struct.pack('>Q', old_ref)
    p[9:17] = struct.pack('>Q', new_ref)
    p[17:21] = struct.pack('>I', shares)
    p[21:25] = struct.pack('>I', price)
    return bytes(p)

def gen_trade(timestamp, order_ref, side, shares, symbol, price, match_id):
    """Trade 'P' - 40 bytes"""
    p = bytearray(40)
    p[0] = ord('P')
    p[1:7] = struct.pack('>Q', timestamp)[2:8]  # 48-bit timestamp
    p[7:15] = struct.pack('>Q', order_ref)
    p[15] = ord('S') if side == 'S' else ord('B')
    p[16:20] = struct.pack('>I', shares)
    p[20:28] = symbol.ljust(8)[:8].encode()
    p[28:32] = struct.pack('>I', price)
    p[32:40] = struct.pack('>Q', match_id)
    return bytes(p)

def gen_add_order_mpid(order_ref, side, shares, symbol, price, attribution):
    """Add Order MPID 'F' - 40 bytes"""
    p = bytearray(40)
    p[0] = ord('F')
    p[1:9] = struct.pack('>Q', order_ref)
    p[9] = ord('S') if side == 'S' else ord('B')
    p[10:14] = struct.pack('>I', shares)
    p[14:22] = symbol.ljust(8)[:8].encode()
    p[22:26] = struct.pack('>I', price)
    # p[26:30] = reserved (zeros)
    p[30:34] = attribution.ljust(4)[:4].encode()  # 4-char MPID
    return bytes(p)

def gen_exec_price(timestamp, order_ref, shares, match_id, printable, price):
    """Executed With Price 'C' - 36 bytes"""
    p = bytearray(36)
    p[0] = ord('C')
    p[1:7] = struct.pack('>Q', timestamp)[2:8]  # 48-bit timestamp
    p[7:15] = struct.pack('>Q', order_ref)
    p[15:19] = struct.pack('>I', shares)
    p[19:27] = struct.pack('>Q', match_id)
    p[27] = ord('Y') if printable else ord('N')
    p[28:32] = struct.pack('>I', price)
    return bytes(p)

def gen_broken_trade(timestamp, match_id):
    """Broken Trade 'B' - 19 bytes"""
    p = bytearray(19)
    p[0] = ord('B')
    p[1:7] = struct.pack('>Q', timestamp)[2:8]  # 48-bit timestamp
    p[7:15] = struct.pack('>Q', match_id)
    return bytes(p)

print("Payload generators defined (9 message types).")

---
## Test 1: Add Order ('A') - Type 0

In [None]:
print("=" * 60)
print("Test 1: ADD_ORDER ('A') - Type 0")
print("=" * 60)

order_ref = 0x123456789ABCDEF0
side = 'B'
shares = 100
symbol = 'AAPL'
price = 15000

msg_bytes = gen_add_order(order_ref, side, shares, symbol, price)
print(f"Sending ({len(msg_bytes)} bytes): {msg_bytes.hex()}")
print(f"Expected: order_ref=0x{order_ref:016X}, side={side}, shares={shares}, price={price}")

send_message(msg_bytes)
time.sleep(0.001)

msg = read_message()
if msg:
    print(f"\n✓ Result:")
    print(f"  Type: {msg['type']} (code={msg['type_code']})")
    print(f"  Order Ref: 0x{msg['order_ref']:016X}")
    print(f"  Side: {msg['side']}")
    print(f"  Shares: {msg['shares']}")
    print(f"  Price: {msg['price']}")
    print(f"  Misc Data (symbol): 0x{msg['misc_data']:016X}")
    
    # Verify
    assert msg['type'] == 'ADD', f"Type mismatch: {msg['type']}"
    assert msg['order_ref'] == order_ref, f"Order ref mismatch"
    print("\n✓ PASS")
else:
    print("\n✗ FAIL: No valid message!")

---
## Test 2: Cancel Order ('X') - Type 1

In [None]:
print("=" * 60)
print("Test 2: CANCEL_ORDER ('X') - Type 1")
print("=" * 60)

order_ref = 0xDEADBEEFCAFEBABE
shares = 50

msg_bytes = gen_cancel_order(order_ref, shares)
print(f"Sending ({len(msg_bytes)} bytes): {msg_bytes.hex()}")
print(f"Expected: order_ref=0x{order_ref:016X}, shares={shares}")

send_message(msg_bytes)
time.sleep(0.001)

msg = read_message()
if msg:
    print(f"\n✓ Result:")
    print(f"  Type: {msg['type']} (code={msg['type_code']})")
    print(f"  Order Ref: 0x{msg['order_ref']:016X}")
    print(f"  Shares: {msg['shares']}")
    
    assert msg['type'] == 'CANCEL', f"Type mismatch: {msg['type']}"
    assert msg['order_ref'] == order_ref, f"Order ref mismatch"
    assert msg['shares'] == shares, f"Shares mismatch"
    print("\n✓ PASS")
else:
    print("\n✗ FAIL: No valid message!")

---
## Test 3: Delete Order ('D') - Type 2

In [None]:
print("=" * 60)
print("Test 3: DELETE_ORDER ('D') - Type 2")
print("=" * 60)

order_ref = 0x0102030405060708

msg_bytes = gen_delete_order(order_ref)
print(f"Sending ({len(msg_bytes)} bytes): {msg_bytes.hex()}")
print(f"Expected: order_ref=0x{order_ref:016X}")

send_message(msg_bytes)
time.sleep(0.001)

msg = read_message()
if msg:
    print(f"\n✓ Result:")
    print(f"  Type: {msg['type']} (code={msg['type_code']})")
    print(f"  Order Ref: 0x{msg['order_ref']:016X}")
    
    assert msg['type'] == 'DELETE', f"Type mismatch: {msg['type']}"
    assert msg['order_ref'] == order_ref, f"Order ref mismatch"
    print("\n✓ PASS")
else:
    print("\n✗ FAIL: No valid message!")

---
## Test 4: Executed Order ('E') - Type 3

In [None]:
print("=" * 60)
print("Test 4: EXECUTED_ORDER ('E') - Type 3")
print("=" * 60)

timestamp = 0xAABBCCDDEEFF
order_ref = 0x1122334455667788
shares = 75
match_id = 0xFEDCBA9876543210

msg_bytes = gen_exec_order(timestamp, order_ref, shares, match_id)
print(f"Sending ({len(msg_bytes)} bytes): {msg_bytes.hex()}")
print(f"Expected: timestamp=0x{timestamp:012X}, order_ref=0x{order_ref:016X}, shares={shares}, match_id=0x{match_id:016X}")

send_message(msg_bytes)
time.sleep(0.001)

msg = read_message()
if msg:
    print(f"\n✓ Result:")
    print(f"  Type: {msg['type']} (code={msg['type_code']})")
    print(f"  Timestamp: 0x{msg['timestamp']:012X}")
    print(f"  Order Ref: 0x{msg['order_ref']:016X}")
    print(f"  Shares: {msg['shares']}")
    print(f"  Match ID (misc_data): 0x{msg['misc_data']:016X}")
    
    assert msg['type'] == 'EXEC', f"Type mismatch: {msg['type']}"
    assert msg['order_ref'] == order_ref, f"Order ref mismatch"
    assert msg['shares'] == shares, f"Shares mismatch"
    print("\n✓ PASS")
else:
    print("\n✗ FAIL: No valid message!")

---
## Test 5: Replace Order ('U') - Type 4

In [None]:
print("=" * 60)
print("Test 5: REPLACE_ORDER ('U') - Type 4")
print("=" * 60)

old_ref = 0x1111111111111111
new_ref = 0x2222222222222222
shares = 200
price = 25000

msg_bytes = gen_replace_order(old_ref, new_ref, shares, price)
print(f"Sending ({len(msg_bytes)} bytes): {msg_bytes.hex()}")
print(f"Expected: old_ref=0x{old_ref:016X}, new_ref=0x{new_ref:016X}, shares={shares}, price={price}")

send_message(msg_bytes)
time.sleep(0.001)

msg = read_message()
if msg:
    print(f"\n✓ Result:")
    print(f"  Type: {msg['type']} (code={msg['type_code']})")
    print(f"  Old Order Ref: 0x{msg['order_ref']:016X}")
    print(f"  New Order Ref: 0x{msg['new_order_ref']:016X}")
    print(f"  Shares: {msg['shares']}")
    print(f"  Price: {msg['price']}")
    
    assert msg['type'] == 'REPLACE', f"Type mismatch: {msg['type']}"
    assert msg['order_ref'] == old_ref, f"Old ref mismatch"
    assert msg['new_order_ref'] == new_ref, f"New ref mismatch"
    assert msg['shares'] == shares, f"Shares mismatch"
    assert msg['price'] == price, f"Price mismatch"
    print("\n✓ PASS")
else:
    print("\n✗ FAIL: No valid message!")

---
## Test 6: Trade ('P') - Type 5

In [None]:
print("=" * 60)
print("Test 6: TRADE ('P') - Type 5")
print("=" * 60)

timestamp = 0x123456789ABC
order_ref = 0xAABBCCDDEEFF0011
side = 'S'
shares = 500
symbol = 'MSFT'
price = 42000
match_id = 0x9988776655443322

msg_bytes = gen_trade(timestamp, order_ref, side, shares, symbol, price, match_id)
print(f"Sending ({len(msg_bytes)} bytes): {msg_bytes.hex()}")
print(f"Expected: timestamp=0x{timestamp:012X}, order_ref=0x{order_ref:016X}, side={side}, shares={shares}, price={price}")

send_message(msg_bytes)
time.sleep(0.001)

msg = read_message()
if msg:
    print(f"\n✓ Result:")
    print(f"  Type: {msg['type']} (code={msg['type_code']})")
    print(f"  Timestamp: 0x{msg['timestamp']:012X}")
    print(f"  Order Ref: 0x{msg['order_ref']:016X}")
    print(f"  Side: {msg['side']}")
    print(f"  Shares: {msg['shares']}")
    print(f"  Price: {msg['price']}")
    print(f"  Match ID (misc_data): 0x{msg['misc_data']:016X}")
    
    assert msg['type'] == 'TRADE', f"Type mismatch: {msg['type']}"
    assert msg['order_ref'] == order_ref, f"Order ref mismatch"
    assert msg['side'] == 'SELL', f"Side mismatch"
    assert msg['shares'] == shares, f"Shares mismatch"
    assert msg['price'] == price, f"Price mismatch"
    print("\n✓ PASS")
else:
    print("\n✗ FAIL: No valid message!")

---
## Test 7: Add Order MPID ('F') - Type 6

In [None]:
print("=" * 60)
print("Test 7: ADD_ORDER_MPID ('F') - Type 6")
print("=" * 60)

order_ref = 0xFEDCBA9876543210
side = 'B'
shares = 1000
symbol = 'GOOGL'
price = 280000
attribution = 'GSCO'  # Goldman Sachs MPID

msg_bytes = gen_add_order_mpid(order_ref, side, shares, symbol, price, attribution)
print(f"Sending ({len(msg_bytes)} bytes): {msg_bytes.hex()}")
print(f"Expected: order_ref=0x{order_ref:016X}, side={side}, shares={shares}, price={price}, mpid={attribution}")

send_message(msg_bytes)
time.sleep(0.001)

msg = read_message()
if msg:
    print(f"\n✓ Result:")
    print(f"  Type: {msg['type']} (code={msg['type_code']})")
    print(f"  Order Ref: 0x{msg['order_ref']:016X}")
    print(f"  Side: {msg['side']}")
    print(f"  Shares: {msg['shares']}")
    print(f"  Price: {msg['price']}")
    print(f"  Misc Data (symbol): 0x{msg['misc_data']:016X}")
    
    assert msg['type'] == 'ADD_MPID', f"Type mismatch: {msg['type']}"
    assert msg['order_ref'] == order_ref, f"Order ref mismatch"
    print("\n✓ PASS")
else:
    print("\n✗ FAIL: No valid message!")

---
## Test 8: Executed With Price ('C') - Type 7

In [None]:
print("=" * 60)
print("Test 8: EXEC_PRICE ('C') - Type 7")
print("=" * 60)

timestamp = 0x112233445566
order_ref = 0x5555555555555555
shares = 250
match_id = 0x6666666666666666
printable = True
price = 15075

msg_bytes = gen_exec_price(timestamp, order_ref, shares, match_id, printable, price)
print(f"Sending ({len(msg_bytes)} bytes): {msg_bytes.hex()}")
print(f"Expected: timestamp=0x{timestamp:012X}, order_ref=0x{order_ref:016X}, shares={shares}, price={price}")

send_message(msg_bytes)
time.sleep(0.001)

msg = read_message()
if msg:
    print(f"\n✓ Result:")
    print(f"  Type: {msg['type']} (code={msg['type_code']})")
    print(f"  Timestamp: 0x{msg['timestamp']:012X}")
    print(f"  Order Ref: 0x{msg['order_ref']:016X}")
    print(f"  Shares: {msg['shares']}")
    print(f"  Price: {msg['price']}")
    print(f"  Match ID (misc_data): 0x{msg['misc_data']:016X}")
    
    assert msg['type'] == 'EXEC_PRICE', f"Type mismatch: {msg['type']}"
    assert msg['order_ref'] == order_ref, f"Order ref mismatch"
    assert msg['shares'] == shares, f"Shares mismatch"
    assert msg['price'] == price, f"Price mismatch"
    print("\n✓ PASS")
else:
    print("\n✗ FAIL: No valid message!")

---
## Test 9: Broken Trade ('B') - Type 8

In [None]:
print("=" * 60)
print("Test 9: BROKEN_TRADE ('B') - Type 8")
print("=" * 60)

timestamp = 0xABCDEF123456
match_id = 0x7777777777777777

msg_bytes = gen_broken_trade(timestamp, match_id)
print(f"Sending ({len(msg_bytes)} bytes): {msg_bytes.hex()}")
print(f"Expected: timestamp=0x{timestamp:012X}, match_id=0x{match_id:016X}")

send_message(msg_bytes)
time.sleep(0.001)

msg = read_message()
if msg:
    print(f"\n✓ Result:")
    print(f"  Type: {msg['type']} (code={msg['type_code']})")
    print(f"  Timestamp: 0x{msg['timestamp']:012X}")
    print(f"  Match ID (misc_data): 0x{msg['misc_data']:016X}")
    
    assert msg['type'] == 'BROKEN', f"Type mismatch: {msg['type']}"
    assert msg['misc_data'] == match_id, f"Match ID mismatch"
    print("\n✓ PASS")
else:
    print("\n✗ FAIL: No valid message!")

---
## Test 10: Back-to-Back Messages

In [None]:
print("=" * 60)
print("Test 10: BACK-TO-BACK MESSAGES")
print("=" * 60)

stream = bytearray()
stream.extend(gen_add_order(0x1111, 'B', 100, 'GOOG', 5000))
stream.extend(gen_cancel_order(0x2222, 25))
stream.extend(gen_delete_order(0x3333))

print(f"Sending {len(stream)} bytes (3 messages: ADD → CANCEL → DELETE)")
send_message(stream)
time.sleep(0.01)

# Should show the LAST message (DELETE with order_ref=0x3333)
msg = read_message()
if msg:
    print(f"\nLast message latched:")
    print(f"  Type: {msg['type']} (code={msg['type_code']})")
    print(f"  Order Ref: 0x{msg['order_ref']:016X}")
    
    assert msg['type'] == 'DELETE', f"Expected DELETE, got {msg['type']}"
    assert msg['order_ref'] == 0x3333, f"Expected 0x3333, got 0x{msg['order_ref']:X}"
    print("\n✓ PASS")
else:
    print("\n✗ FAIL: No valid message!")

---
## Run All Tests Summary

In [None]:
def run_all_tests():
    """Run all 9 message type tests and report results"""
    results = []
    
    tests = [
        ('ADD',       gen_add_order(0x123456789ABCDEF0, 'B', 100, 'AAPL', 15000), 0),
        ('CANCEL',    gen_cancel_order(0xDEADBEEFCAFEBABE, 50), 1),
        ('DELETE',    gen_delete_order(0x0102030405060708), 2),
        ('EXEC',      gen_exec_order(0xAABBCCDDEEFF, 0x1122334455667788, 75, 0xFEDCBA9876543210), 3),
        ('REPLACE',   gen_replace_order(0x1111111111111111, 0x2222222222222222, 200, 25000), 4),
        ('TRADE',     gen_trade(0x123456789ABC, 0xAABBCCDDEEFF0011, 'S', 500, 'MSFT', 42000, 0x9988776655443322), 5),
        ('ADD_MPID',  gen_add_order_mpid(0xFEDCBA9876543210, 'B', 1000, 'GOOGL', 280000, 'GSCO'), 6),
        ('EXEC_PRICE', gen_exec_price(0x112233445566, 0x5555555555555555, 250, 0x6666666666666666, True, 15075), 7),
        ('BROKEN',    gen_broken_trade(0xABCDEF123456, 0x7777777777777777), 8),
    ]
    
    print("=" * 60)
    print("RUNNING ALL TESTS")
    print("=" * 60)
    
    for name, msg_bytes, expected_type in tests:
        send_message(msg_bytes)
        time.sleep(0.001)
        msg = read_message()
        
        if msg and msg['type_code'] == expected_type:
            results.append((name, 'PASS'))
            print(f"  ✓ {name:12} - PASS (type={msg['type_code']})")
        else:
            actual = msg['type_code'] if msg else 'None'
            results.append((name, 'FAIL'))
            print(f"  ✗ {name:12} - FAIL (expected={expected_type}, got={actual})")
    
    print("\n" + "=" * 60)
    passed = sum(1 for _, r in results if r == 'PASS')
    print(f"RESULTS: {passed}/{len(results)} tests passed")
    print("=" * 60)
    
    return results

# Run all tests
run_all_tests()

---
## Debug / Troubleshooting

In [None]:
# Debug: Print all registers
print("Current register state:")
debug_registers()

In [None]:
# Check DMA status
print("DMA Status:")
print(f"  Send channel idle: {dma.sendchannel.idle}")
print(f"  Send channel running: {dma.sendchannel.running}")