# Test Cascade NAS deployed on the XEM7310 Artix-7 or XEM6310 Spartan-6 board using the OKAERTool

This notebook is a simple example of how to test the Cascade NAS model deployed on the XEM7310 Artix-7 or XEM6310 Spartan-6 board using the OKAERTool (deployed on the same FPGA board too). The pyOKAERTool is a Python package that provides a simple interface to interact with the OKAERTool IP block. The OKAERTool IP an open-source hardware platform that can be used to deploy and test AER based systems on the edge.

## Monitoring the CNAS outputs
The okaertool is deployed on the FPGA where a stereo 64 channels Cascade NAS is deployed. The script below initializes the OKAERTool and configure the settings to pyNAVIS tool. This tool is used to visualize the AER events received from the Neuromorphic Auditory Sensor (NAS).

In [None]:
import sys
import os
import time

# Add the parent directory to the path to import pyOKAERTool (only if the package is not installed)
sys.path.insert(0, os.path.abspath('..'))
import pyOKAERTool as okt
from pyNAVIS import *
import os

# Define bitfile path
bitfile_path = '../../bitfiles/CNAS_okaertool_XEM6310.bit'
# bitfile_path = None  # Set to None if no .bit file is to be used

# Validate the existence of the .bit file
if bitfile_path is None:
    None
elif not os.path.exists(bitfile_path):
    print(f"El archivo .bit no existe en la ruta especificada: {bitfile_path}")
    sys.exit(1)

# Create a new intance of the OkaerTool class and initialize it
okaer = okt.Okaertool(bit_file=bitfile_path)
okaer.init()

# Create a new instance of the PyNAVIS class
settings = MainSettings(num_channels=64, mono_stereo=1, on_off_both=1, address_size=4, ts_tick=0.01, bin_size=10000)

### Configure Cascade NAS
The NAS sensor needs to be configure before testing. The configuration is done using the `set_config` method of the OKAERTool class. The configuration parameters are passed as a list of values. The parameters are:
- PDM2Spikes_DEFAULT_parameter
- I2S2Spikes_DEFAULT_parameter
- CASCADE_FILTER_DEFAULT_parameter


In [None]:
PDM2Spikes_DEFAULT_parameter = [0x0005, 0x0006, 0x734B, 0x39C8]
I2S2Spikes_DEFAULT_parameter = [0x000F]

### Load the config file exported from pyOpenNAS
The configuration parameters are loaded from a config file exported from the pyOpenNAS tool. The config file is a text file that contains the configuration parameters in a specific format. This file has to be parsed to extract the parameters and save them in a list.

In [None]:
import re
import os

config_file_path = './CFBank_64_20_22000.vhd'

def _tok_to_int(tok):
    tok = tok.strip().rstrip(',').strip()
    if tok.lower().startswith('x"') and tok.endswith('"'):
        return int(tok[2:-1], 16)
    if tok.lower().startswith('0x'):
        return int(tok, 16)
    m = re.match(r'16#([0-9A-Fa-f]+)#', tok)
    if m:
        return int(m.group(1), 16)
    if tok.isdigit():
        return int(tok, 10)
    raise ValueError(f"Unrecognized token: {tok!r}")

def parse_cascade_vhd(path):
    if not os.path.exists(path):
        raise FileNotFoundError(path)
    text = open(path, 'r', encoding='utf-8', errors='ignore').read()

    # Find successive groups of the four parameters in the file order
    pattern = re.compile(
        r'FREQ_DIV\s*=>\s*(?P<f>[^,\n;]+)\s*,\s*'
        r'SPIKES_DIV_FB\s*=>\s*(?P<fb>[^,\n;]+)\s*,\s*'
        r'SPIKES_DIV_OUT\s*=>\s*(?P<out>[^,\n;]+)\s*,\s*'
        r'SPIKES_DIV_BPF\s*=>\s*(?P<bpf>[^,\n;]+)',
        re.IGNORECASE | re.DOTALL
    )

    values = []
    for m in pattern.finditer(text):
        f = _tok_to_int(m.group('f'))
        fb = _tok_to_int(m.group('fb'))
        out = _tok_to_int(m.group('out'))
        bpf = _tok_to_int(m.group('bpf'))
        values.extend([f, fb, out, bpf])

    return values

CASCADE_FILTER_DEFAULT_parameter = parse_cascade_vhd(config_file_path)

# quick validation / pretty print
filters = len(CASCADE_FILTER_DEFAULT_parameter) // 4
print(f"Parsed {filters} filters ({len(CASCADE_FILTER_DEFAULT_parameter)} values).")
print("CASCADE_FILTER_DEFAULT_parameter = [")
for v in CASCADE_FILTER_DEFAULT_parameter:
    # print as hex literal (4 hex digits minimum)
    width = max(2, (v.bit_length() + 3) // 4)
    print(f"    0x{v:0{width}X},")
print("]")

El siguiente código configura el NAS con los parámetros cargados en las listas PDM2Spikes_DEFAULT_parameter, I2S2Spikes_DEFAULT_parameter y CASCADE_FILTER_DEFAULT_parameter.

In [None]:
# Reset the OkaerTool
okaer.reset_board(mode='internal')

# Configure the PDM2Spikes (left and right) for both NAS
register_address = 0x0000
okaer.logger.info("Configuring PDM2Spikes modules")
# Left cochlea
okaer.logger.info("Left cochlea")
for value in PDM2Spikes_DEFAULT_parameter:
    okaer.set_config('port_a', register_address, value)
    # okaer.set_config('port_b', register_address, value)
    register_address += 1
# Right cochlea
okaer.logger.info("Right cochlea")
for value in PDM2Spikes_DEFAULT_parameter:
    okaer.set_config('port_a', register_address, value)
    # okaer.set_config('port_b', register_address, value)
    register_address += 1

register_address = 0x08
okaer.logger.info("Configuring I2S2Spikes modules")
# Configure I2S2Spikes modules for both NAS
for value in I2S2Spikes_DEFAULT_parameter:
    okaer.set_config('port_a', register_address, value)
    # okaer.set_config('port_b', register_address, value)

# Configure the filters for CASCADE NAS
okaer.logger.info("Configuring filters for Cascade NAS")
# Left cochlea
register_address = 0x09
okaer.logger.info("Left cochlea")
for value in CASCADE_FILTER_DEFAULT_parameter:
    okaer.set_config('port_a', register_address, value)
    register_address += 1
    # # Config only 32 filters
    # if register_address >= 0x09 + 32*4:
    #     break
# Right cochlea
register_address = 0x010D
okaer.logger.info("Right cochlea")
for value in CASCADE_FILTER_DEFAULT_parameter:
    okaer.set_config('port_a', register_address, value)
    register_address += 1
    # # Config only 32 filters
    # if register_address >= 0x010D + 32*4:
    #     break

### Monitor de NAS outputs
The script below monitors the output of the NAS.

In [None]:
# Monitor the inputs
MAX_INPUTS = 3
INPUTS = ['port_a'] # Monitor only port_a where the CNAS outputs are sent. port_b is not used in this configuration
MAX_SPIKES = 100000
DURATION = 5 # in seconds
USB_TRANSFER_LENGTH = 64 * 1024
# Set USB transfer length and number of buffers
okaer.USB_TRANSFER_LENGTH = USB_TRANSFER_LENGTH
MONITOR_MODE_TEST = 'live'  # 'duration', 'num_spikes', 'live'

# Reset the okaerTool board before monitoring to ensure a clean state
okaer.reset_board(mode='internal')

match MONITOR_MODE_TEST:
    case 'duration':
        okaer.logger.info("Monitoring for a duration of %d seconds", DURATION)
        spikes = okaer.monitor(inputs=INPUTS, duration=DURATION)
    case 'num_spikes':
        okaer.logger.info("Monitoring until %d spikes are collected", MAX_SPIKES)
        spikes = okaer.monitor(inputs=INPUTS, max_spikes=MAX_SPIKES)
    case 'live':
        okaer.monitor(inputs=INPUTS, live=True)
        start = time.time()
        spikes = None
        while (time.time() - start) < DURATION:
            # time.sleep(0.01)
            partial_spikes = okaer.get_live_spikes()  # Obtener spikes sin detener
            if partial_spikes is not None:
                if spikes is None:
                    spikes = partial_spikes
                else:
                    [spikes[i].addresses.extend(partial_spikes[i].addresses) for i in range(MAX_INPUTS)]
                    [spikes[i].timestamps.extend(partial_spikes[i].timestamps) for i in range(MAX_INPUTS)]
                    okaer.logger.debug("Collected partial spikes during live monitoring")
            
        final_spikes = okaer.stop_monitor()  # Detener y obtener todos los spikes
        [spikes[i].addresses.extend(final_spikes[i].addresses) for i in range(MAX_INPUTS)]
        [spikes[i].timestamps.extend(final_spikes[i].timestamps) for i in range(MAX_INPUTS)]
        okaer.logger.info("Monitoring in live mode for a duration of %d seconds", DURATION)
    case _:
        okaer.logger.error("Invalid MONITOR_MODE_TEST: %s", MONITOR_MODE_TEST)
        sys.exit(1)

# Check if spikes is not None. If so, finish the script
if spikes is None:
    okaer.logger.error("No spikes were recorded. Exiting the script.")
    sys.exit(1)
    
# Print the number of spikes for each input
for i in range(MAX_INPUTS):
    okaer.logger.info("Input %d: %d spikes", i, spikes[i].get_num_spikes())

# Create pyNAVIS spike_file only if there are spikes for a specific input
okaer.logger.info("Creating spike files for all selected inputs")
spike_files = []
for i in range(MAX_INPUTS):
    if spikes[i].get_num_spikes() > 0:
        spike_files.append(SpikesFile(addresses=spikes[i].addresses, timestamps=spikes[i].timestamps))

# Add this cell BEFORE plotting to verify data integrity
import numpy as np

okaer.logger.info("=== DATA INTEGRITY CHECK ===")

# IMPORTANT: Timestamps are in 10ns ticks (hardware clock period)
TIMESTAMP_TICK_US = 0.01  # Each tick = 10ns = 0.01 microseconds

for i in range(len(spike_files)):
    if len(spike_files[i].timestamps) == 0:
        okaer.logger.warning(f"Input {INPUTS[i]}: No spikes recorded")
        continue
    
    timestamps = np.array(spike_files[i].timestamps)
    addresses = np.array(spike_files[i].addresses)
    
    okaer.logger.info(f"\--- Input {INPUTS[i]} ---")
    okaer.logger.info(f"Total spikes: {len(timestamps)}")
    okaer.logger.info(f"Timestamp range (ticks): {timestamps.min()} - {timestamps.max()}")
    okaer.logger.info(f"Timestamp range (µs): {timestamps.min() * TIMESTAMP_TICK_US:.2f} - {timestamps.max() * TIMESTAMP_TICK_US:.2f}")
    okaer.logger.info(f"Duration (ms): {(timestamps.max() - timestamps.min()) * TIMESTAMP_TICK_US / 1000:.2f}")
    okaer.logger.info(f"Address range: {addresses.min()} - {addresses.max()}")
    
    # Check for timestamp ordering
    if not np.all(timestamps[:-1] <= timestamps[1:]):
        okaer.logger.error(f"Timestamps are NOT in ascending order!")
        bad_idx = np.where(timestamps[:-1] > timestamps[1:])[0][0]
        okaer.logger.error(f"First violation at index {bad_idx}: {timestamps[bad_idx]} > {timestamps[bad_idx+1]}")
    else:
        okaer.logger.info("✓ Timestamps are in ascending order")
    
    # Check for negative timestamps
    if np.any(timestamps < 0):
        okaer.logger.error(f"Found {np.sum(timestamps < 0)} negative timestamps!")
    else:
        okaer.logger.info("✓ No negative timestamps")
    
    # Check timestamp deltas (should be reasonable for audio events)
    if len(timestamps) > 1:
        deltas = np.diff(timestamps)
        mean_delta_ns = np.mean(deltas) * 10  # Convert ticks to nanoseconds
        median_delta_ns = np.median(deltas) * 10
        max_delta_ns = np.max(deltas) * 10
        
        okaer.logger.info(f"Timestamp deltas (ns): mean={mean_delta_ns:.1f}, median={median_delta_ns:.1f}, max={max_delta_ns:.1f}")
        
        # Expected delta for sequential addresses (e.g., ~24 ticks = 240ns)
        if addresses.max() - addresses.min() > 200:  # If we have many addresses
            okaer.logger.info(f"Expected delta for sequential scan: ~240ns (24 ticks @ 10ns)")
    
    # Calculate event rate
    duration_s = (timestamps.max() - timestamps.min()) * TIMESTAMP_TICK_US / 1e6  # Convert to seconds
    if duration_s > 0:
        event_rate = len(timestamps) / duration_s
        okaer.logger.info(f"Event rate: {event_rate:.0f} spikes/sec")
        
        # Check if rate is reasonable (typical audio: 1k-1M events/sec)
        if event_rate > 10_000_000:
            okaer.logger.warning(f"Event rate seems very high: {event_rate:.0f} spikes/sec")
        elif event_rate < 100:
            okaer.logger.warning(f"Event rate seems very low: {event_rate:.0f} spikes/sec")
        else:
            okaer.logger.info("✓ Event rate within reasonable range")
    
    # Check address distribution
    unique_addrs = np.unique(addresses)
    okaer.logger.info(f"Unique addresses: {len(unique_addrs)}")
    okaer.logger.info(f"Address range: {addresses.min()} to {addresses.max()}")
    
    # Check if addresses are sequential (as expected after reset)
    if len(unique_addrs) > 10:
        expected_sequential = np.arange(addresses.min(), addresses.max() + 1)
        if np.array_equal(np.sort(unique_addrs), expected_sequential):
            okaer.logger.info("✓ Addresses are sequential (as expected after reset)")
        else:
            missing = set(expected_sequential) - set(unique_addrs)
            if missing:
                okaer.logger.info(f"Some addresses missing: {sorted(missing)[:10]}...")
    
    # Show distribution for most active addresses
    addr_counts = np.bincount(addresses.astype(int))
    top_10_indices = np.argsort(addr_counts)[-10:][::-1]
    top_10_counts = addr_counts[top_10_indices]
    okaer.logger.info(f"Top 10 addresses by count:")
    for addr, count in zip(top_10_indices, top_10_counts):
        if count > 0:
            okaer.logger.info(f"  Address {addr}: {count} events")

okaer.logger.info("=== END DATA INTEGRITY CHECK ===")

for i in range(len(spike_files)):
    okaer.logger.info("Plotting the spikegram for input %s", INPUTS[i])
    Plots.spikegram(spike_files[i], settings)

for i in range(len(spike_files)):
    okaer.logger.info("Plotting the sonogram for input %s", INPUTS[i])
    Plots.sonogram(spike_files[i], settings)

for i in range(len(spike_files)):
    okaer.logger.info("Plotting the histogram for input %s", INPUTS[i])
    Plots.histogram(spike_files[i], settings)

for i in range(len(spike_files)):
    okaer.logger.info("Plotting the average activity for input %s", INPUTS[i])
    Plots.average_activity(spike_files[i], settings)


### Bypassing the NAS output to OKAERTool output data bus
The NAS output can be bypassed to the OKAERTool output data bus using the `bypass` command of the OKAERTool class. This is useful to test the NAS output without the OKAERTool processing. The script below bypasses the NAS output to the OKAERTool output data bus.

In [None]:
# okaer.reset_board(mode='both')

# Monitor the inputs
INPUTS = ['port_a']

# Bypass the inputs to the outputs
okaer.bypass(inputs=INPUTS)