# Pattern Matching Kernel Test

This notebook tests the pattern matching kernel that searches for malicious patterns in UDP packets.

The kernel:
- Receives UDP packets from the network
- Searches for 256 predefined patterns in a sliding window
- Returns match results: each byte in the output indicates if a pattern was found at that position (pattern_id+1) or no match (0)

Test setup:
1. Program FPGA with the pattern matching design
2. Configure network layer and socket table
3. Send test data containing known patterns from host to FPGA
4. Verify FPGA correctly identifies pattern locations

In [1]:
#Uncomment the next line and run to reset the FPGA if it is not taking programming or otherwise misbehaving
!xbutil reset --device 0000:02:00.1 --force

Performing 'HOT Reset' on '0000:02:00.1'
Are you sure you wish to proceed? [Y/n]: Y (Force override)
Successfully reset Device[0000:02:00.1]


In [2]:
import pynq
import numpy as np
import vnx_utils
import os
import re
import time

## Check Available Devices

In [3]:
for i in range(len(pynq.Device.devices)):
    print("{}) {}".format(i, pynq.Device.devices[i].name))

0) xilinx_u55c_gen3x16_xdma_base_3


## Program FPGA

In [5]:
ol = pynq.Overlay("../benchmark_project.intf0.xilinx_u55c_gen3x16_xdma_3_202210_1/vnx_benchmark_project_if0.xclbin")

# Print available IPs
print("Available IPs:", list(ol.ip_dict.keys()))

# Chain: MM2S -> Throughput -> Project Kernel -> S2MM
krnl_mm2s = ol.krnl_mm2s_0
krnl_s2mm = ol.krnl_s2mm_0
stream_throughput = ol.stream_throughput_0

mem_bank = ol.HBM0
    
print(f"Using Memory Bank: {mem_bank}")

Available IPs: ['krnl_mm2s_0', 'krnl_s2mm_0', 'stream_throughput_0']
Using Memory Bank: <pynq.pl_server.xrt_device.XrtMemory object at 0x7fdc5185fdf0>


## Define Test Patterns

We'll use some of the actual patterns from patterns.h for testing

In [6]:

KERNEL_DWIDTH_BITS = 32 

# Derived Constants
BYTES_PER_BEAT = KERNEL_DWIDTH_BITS // 8

# Map DWIDTH to Numpy Data Types for viewing the output
DTYPE_MAP = {
    8:  np.uint8,
    16: np.uint16,
    32: np.uint32,
    64: np.uint64
}

if KERNEL_DWIDTH_BITS not in DTYPE_MAP:
    raise ValueError(f"Unsupported DWIDTH: {KERNEL_DWIDTH_BITS}. Please use 8, 16, 32, or 64.")

KERNEL_DTYPE = DTYPE_MAP[KERNEL_DWIDTH_BITS]

print(f"Configuration: DWIDTH={KERNEL_DWIDTH_BITS} bits ({BYTES_PER_BEAT} bytes/beat)")
print(f"Output View: {KERNEL_DTYPE}")

Configuration: DWIDTH=32 bits (4 bytes/beat)
Output View: <class 'numpy.uint32'>


In [7]:

def parse_patterns_header(file_path):
    """
    Parses C++ header 'patterns.h' to extract pattern data and IDs.
    Returns: { global_pattern_id : byte_array }
    """
    patterns_db = {}
    
    # Resolve absolute path if needed, or use relative
    if not os.path.exists(file_path):
        print(f"Error: {file_path} not found.")
        print("Please check the path to patterns.h relative to this notebook.")
        return {}

    with open(file_path, 'r') as f:
        content = f.read()


    re_data = re.search(r'const unsigned char PATTERN_DATA\[\d+\]\[\d+\]\s*=\s*\{(.*?)\};', content, re.DOTALL)
    re_len = re.search(r'const int PATTERN_LENGTHS\[\d+\]\[\d+\]\s*=\s*\{(.*?)\};', content, re.DOTALL)
    re_counts = re.search(r'const int NUM_PATTERNS_MATRIX\[\d+\]\s*=\s*\{([^}]+)\};', content)
    re_offsets = re.search(r'const int PATTERN_OFFSETS\[\d+\]\[\d+\]\s*=\s*\{(.*?)\};', content, re.DOTALL)

    if not (re_data and re_len and re_counts and re_offsets):
        print("Error: Could not parse array structures in patterns.h. Check file format.")
        return {}

    # Helper to clean and split C-style array strings
    def parse_c_array(raw_str):
        # Split by the closing brace of each row '},'
        rows = raw_str.split('},')
        matrix = []
        for row in rows:
            clean_row = row.replace('{', '').replace('}', '').strip()
            if clean_row:
                items = []
                for x in clean_row.split(','):
                    x = x.strip()
                    if not x: continue
                    try:
                        val = int(x, 16) if x.startswith('0x') else int(x)
                        items.append(val)
                    except ValueError:
                        continue
                matrix.append(items)
        return matrix

    data_matrix = parse_c_array(re_data.group(1))
    len_matrix = parse_c_array(re_len.group(1))
    offset_matrix = parse_c_array(re_offsets.group(1))
    counts = [int(x.strip()) for x in re_counts.group(1).split(',')]

    # Calculate Global ID Offsets (cumulative sum of counts)
    global_id_offsets = [0] * len(counts)
    for i in range(1, len(counts)):
        global_id_offsets[i] = global_id_offsets[i-1] + counts[i-1]

    # Build Dictionary
    for n in range(min(len(counts), len(data_matrix))):
        num_pats = counts[n]
        for p in range(num_pats):
            # Safety check for indices to prevent out-of-bounds
            if n >= len(len_matrix) or p >= len(len_matrix[n]): continue
            if n >= len(offset_matrix) or p >= len(offset_matrix[n]): continue
            
            p_len = len_matrix[n][p]
            p_start = offset_matrix[n][p]
            
            # Verify data bounds
            if p_len > 0 and (p_start + p_len) <= len(data_matrix[n]):
                # Extract the pattern bytes
                pat_bytes = data_matrix[n][p_start : p_start + p_len]
                global_id = p + global_id_offsets[n]
                patterns_db[global_id] = pat_bytes

    return patterns_db

# Load patterns
patterns_path = "../Project_kernels_HLS/src/patterns.h" 
patterns_map = parse_patterns_header(patterns_path)
print(f"Successfully parsed {len(patterns_map)} patterns.")

Successfully parsed 2662 patterns.


## Create Test Data with Patterns

This cell generates random traffic and injects known patterns to verify the kernel.

In [8]:
def generate_parametric_stream(total_bytes, patterns, num_injections=50):
    # Initialize with Zeros
    data = np.zeros(total_bytes, dtype=np.uint8) 
    expected = {}
    
    if not patterns: return data, expected

    keys = list(patterns.keys())
    
    # Start offset
    current_idx = 128 
    
    for _ in range(num_injections):
        if current_idx >= (total_bytes - 512): break
        
        pid = np.random.choice(keys)
        pat = patterns[pid]
        p_len = len(pat)
        
        # Inject Pattern
        data[current_idx : current_idx + p_len] = pat
        
        # --- PARAMETRIC BEAT CALCULATION ---
        end_idx = current_idx + p_len - 1
        beat = end_idx // BYTES_PER_BEAT
        
        expected[beat] = pid
        
        # Add padding (aligned to 64 bytes to be safe)
        current_idx += p_len + 64 
        
    return data, expected

# Setup
N_RATE = 1000
N_SAMPLES = 100
STREAM_SIZE = 64 * N_RATE * (N_SAMPLES + 1)

print(f"Generating data for {KERNEL_DWIDTH_BITS}-bit Kernel...")
input_data, expected_map = generate_parametric_stream(STREAM_SIZE, patterns_map, num_injections=100)

Generating data for 32-bit Kernel...


## Buffer Allocation

This cell allocates the memory on the FPGA card.

In [9]:

# Allocate Buffers in FPGA Memory
print("Allocating buffers...")
input_buffer = pynq.allocate(shape=(STREAM_SIZE,), dtype=np.uint8)
output_buffer = pynq.allocate(shape=(STREAM_SIZE,), dtype=np.uint8)

# Throughput Monitor Struct
sample_t = np.dtype([("cycles", "u4"), ("bytes", "u4"), ("ready_not_valid", "u4"), ("valid_not_ready", "u4")])
perf_buf = pynq.allocate((N_SAMPLES,), dtype=sample_t, target=mem_bank)

print("Populating input data...")
# Copy the test_data created in the previous cell into the PYNQ buffer
input_buffer[:] = input_data

# Sync data to the device (flush cache)
input_buffer.sync_to_device()



Allocating buffers...
Populating input data...


## Execution
Run the kernels and the throughput monitor

In [10]:
print("Starting kernels...")

dest_id = 0 

perf_w = stream_throughput.start(perf_buf, N_SAMPLES, N_RATE)

s2mm_handle = ol.krnl_s2mm_0.start(output_buffer, STREAM_SIZE)

mm2s_handle = ol.krnl_mm2s_0.start(input_buffer, STREAM_SIZE, dest_id)

print("Waiting for completion...")
mm2s_handle.wait()
s2mm_handle.wait()
perf_w.wait()

print("Kernels finished.")

# Retrieve Results
output_buffer.sync_from_device()

results = np.array(output_buffer)

print(f"Received {len(results)} bytes.")

Starting kernels...
Waiting for completion...
Kernels finished.
Received 6464000 bytes.


## Verify Results

In [11]:

# Retrieve results
perf_buf.sync_from_device()
output_buffer.sync_from_device()
output_ids = output_buffer.view(KERNEL_DTYPE)

# --- Throughput Calculation ---
total_bytes = perf_buf['bytes'][-1]
total_cycles = perf_buf['cycles'][-1]
freq_mhz = 300

duration = total_cycles / (freq_mhz * 1e6)
throughput = (total_bytes * 8) / (duration * 1e9)

print("\n--- Performance ---")
print(f"Transferred: {total_bytes} bytes")
print(f"Cycles:      {total_cycles}")
print(f"Throughput:  {throughput:.3f} Gbps")

# --- Pattern Matching Verification ---
print("\n--- Verification ---")
matches = 0
misses = 0

for beat, exp_id in expected_map.items():
    # Check a small window around the expected beat to account for pipeline latency
    found = False
    window = 10 # Check beat, beat+1...
    
    for w in range(window):
        if (beat + w) < len(output_ids):
            # Check if the output ID matches the expected Pattern ID
            if output_ids[beat + w] == exp_id:
                found = True
                break
    
    if found:
        matches += 1
    else:
        misses += 1

print(f"Total Injections: {len(expected_map)}")
print(f"Matches Found:    {matches}")
print(f"Misses:           {misses}")

if misses == 0 and matches > 0:
    print("TEST PASSED!")
else:
    print("TEST FAILED.")


--- Performance ---
Transferred: 4032 bytes
Cycles:      999
Throughput:  9.686 Gbps

--- Verification ---
Total Injections: 100
Matches Found:    100
Misses:           0
TEST PASSED!


## Debug

In [None]:
import numpy as np

def save_debug_report(filename, expected_map, output_buffer):
    print(f"Generating debug report: {filename} ...")
    
    # 1. Prepare Data
    output_buffer.sync_from_device()
    
    output_data = output_buffer.view(np.uint16)
    
    with open(filename, "w") as f:
        # --- SECTION 1: EXPECTED VS ACTUAL ---
        f.write("=======================================================\n")
        f.write("SECTION 1: VERIFICATION (Expected vs Actual)\n")
        f.write("=======================================================\n")
        f.write(f"{'BEAT':<12} | {'EXPECTED ID':<12} | {'ACTUAL ID':<12} | {'STATUS'}\n")
        f.write("-" * 60 + "\n")
        
        matches = 0
        misses = 0
        
        # Sort by beat to keep it chronological
        for beat, exp_id in sorted(expected_map.items()):
            # Safety check for bounds
            if beat < len(output_data):
                act_id = output_data[beat]
            else:
                act_id = -1 # Out of bounds
            
            # Check for exact match
            status = "MATCH" if act_id == exp_id else "MISS"
            
            # Check for near-miss (shifted by +/- 4 beats)
            if status == "MISS":
                for offset in range(-4, 5):
                    check_idx = beat + offset
                    if 0 <= check_idx < len(output_data):
                        if output_data[check_idx] == exp_id:
                            status = f"SHIFTED ({offset:+d})"
                            break
            
            if status == "MATCH": 
                matches += 1
            else: 
                misses += 1
                
            f.write(f"{beat:<12} | {exp_id:<12} | {act_id:<12} | {status}\n")
            
        f.write("-" * 60 + "\n")
        f.write(f"SUMMARY: Matches: {matches}, Misses: {misses}, Total: {len(expected_map)}\n\n\n")

        # --- SECTION 2: RAW HARDWARE DETECTIONS ---
        f.write("=======================================================\n")
        f.write("SECTION 2: ALL HARDWARE DETECTIONS (Non-zero outputs)\n")
        f.write("=======================================================\n")
        f.write(f"{'BEAT':<12} | {'DETECTED ID'}\n")
        f.write("-" * 30 + "\n")
        
        # Scan entire buffer for any non-zero value
        hw_detections = np.nonzero(output_data)[0]
        
        if len(hw_detections) == 0:
            f.write("No patterns detected (Output is all zeros).\n")
        else:
            for beat in hw_detections:
                val = output_data[beat]
                f.write(f"{beat:<12} | {val}\n")
                
    print(f"Report saved. Open '{filename}' to analyze.")

# Run the export (using the correct buffer variable 'output_buffer')
save_debug_report("debug_results.txt", expected_map, output_buffer)

## Cleanup

In [11]:
del output_buffer
del input_buffer
del perf_buf
ol.free()
print("Resources freed")

Resources freed


In [None]:
#  Reset FPGA
!xbutil reset --device 0000:02:00.1 --force

Performing 'HOT Reset' on '0000:02:00.1'
Are you sure you wish to proceed? [Y/n]: Y (Force override)
