In [1]:
import copy
import json
import matplotlib.pyplot as plt
import numpy as np
import pickle
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.metrics import ConfusionMatrixDisplay
import time

# - Rockpool imports
from rockpool.nn.modules.jax import LinearJax
from rockpool.nn.combinators import Sequential
from rockpool.timeseries import TSEvent
from rockpool.devices.dynapse import (
    DynapSim,
    mapper,
    autoencoder_quantization,
    config_from_specification,
    find_dynapse_boards,
    DynapseSamna,
    dynapsim_net_from_config,
)

import sys
sys.path.append("../training-pipeline/lib")
from data_loading import load_data

## Data Loading and Model Restoration

In [72]:
# Configuration
model_path = "../nni_experiments/dynapse2/final/dynapse2_4cw"

def load_data_from_model_path(model_path):
    # Obtain training metadata
    training_metadata = None
    with open(f"{model_path}/training_metadata.json") as f:
        training_metadata = json.load(f)

    # Load dataset
    input_params = training_metadata["input_params"]

    train_dl, val_dl, test_dl = load_data(**input_params)
    return train_dl, val_dl, test_dl, input_params

def load_network_from_path(model_path, input_params):
    # Load simulation parameters
    sim_params = np.load(f"{model_path}/model_sim_params.npy", allow_pickle=True)
    layer_params = sim_params.item()["1_DynapSim"]

    # Obtain pretrained layer weights
    opt_params =  np.load(f"{model_path}/model_params.npy", allow_pickle=True)
    w_in_opt = opt_params.item()["0_LinearJax"]["weight"]
    w_rec_opt = opt_params.item()["1_DynapSim"]["w_rec"]

    # Build network
    n_input_channels = input_params["n_channels"]
    n_output_channels = len(input_params["enabled_classes"]) if input_params["enabled_classes"] != None else input_params["n_classes"]

    net = Sequential(
        LinearJax((n_input_channels, n_output_channels), has_bias=False, weight=w_in_opt),
        DynapSim(
            (n_output_channels, n_output_channels),
            has_rec=True,
            w_rec = w_rec_opt,
            **layer_params,
        )
    )

    print(f"Built network: \n{net}")

    return net,n_input_channels,n_output_channels

train_dl, val_dl, test_dl, input_params = load_data_from_model_path(model_path)

## Model Evaluation

In [None]:
# Load network from model path
net,_,_ = load_network_from_path(model_path, input_params)

# Evaluate network on the full test set
ds = test_dl.dataset
net = net.reset_state()
output, _, _ = net(ds.x.numpy())
m = np.sum(output, axis=1)
preds = np.argmax(m, axis=1)
print(f"Final accuracy: {np.round(np.mean(np.array(preds == ds.y.numpy()))*100,decimals=2)}%")

# Calculate confusion matrix
cm = confusion_matrix(ds.y.numpy(), preds)
disp = ConfusionMatrixDisplay(cm)
disp.plot()
plt.show()

# Print the classification report
print(classification_report(ds.y.numpy(), preds))

In [None]:
# Plot the network activity for a specific sample
ds = test_dl.dataset
i = 0
output, _, _ = net(ds.x.numpy()[i])

spks = np.sum(output,axis=1)[0]
print(f"Neural Activity (spikes per neuron): {spks}")
print(f"Expected Label: {int(ds.y.numpy()[i])}")
print(f"Predicted Label: {spks.argmax()}")

TSEvent.from_raster(
            output[0],
            dt=1e-3,
        ).plot(marker="|", s=8)
plt.plot(0, ds.y.numpy()[i], '>', ms=20)
plt.tight_layout()
plt.show()

## Quantization Tuning and Hardware Config Generation

In [None]:
# Load network from model path
net,n_input_channels,n_output_channels = load_network_from_path(model_path, input_params)

# Convert network to spec
spec = mapper(net.as_graph())
spec_Q = copy.deepcopy(spec)

# Use quantization tuned specification object, if available
try:
    with open(f"{model_path}/dynapse2_qtuned_params", "rb") as f:
        params_Q = pickle.load(f)
        spec_Q.update(params_Q)
    print("Model tuning applied")
except FileNotFoundError:
    # Quantize the parameters
    spec_Q.update(autoencoder_quantization(**spec_Q))
    print("No model tuning available")

# Convert spec to DYNAP-SE2 configuration
config = config_from_specification(**spec_Q)

# Print the most significative parameters from hardware configuration object
enabled_params = ["SYAM_W0_P","SYAM_W1_P","SYAM_W2_P","SYAM_W3_P","DEAM_ITAU_P","DEAM_ETAU_P","DEAM_IGAIN_P","DEAM_EGAIN_P","SOIF_LEAK_N","SOIF_SPKTHR_P","SOIF_GAIN_N","SOIF_REFR_N","SOIF_DC_P"]

params = []
for (k,v) in config["config"].chips[0].cores[0].parameters.items():
    if k in enabled_params:
        params.append((k,v))

for param in sorted(params):
    print(f"{param[0]} -> (Coarse:{param[1].coarse_value}, Fine:{param[1].fine_value})")

# Build DYNAPSim from the config
net_quantized = dynapsim_net_from_config(**config)

## Quantized Network Analysis

In [None]:
# Display the quantized network weights
plt.imshow(np.array(spec['weights_in']).T)
plt.title('$W_{in}$')
plt.show()

plt.imshow(np.array(spec['weights_rec']).T)
plt.title('$W_{rec}$')
plt.show()

In [None]:
# Evaluate the DYNAPSim network on the full test set
ds = test_dl.dataset
output, _, _ = net_quantized(ds.x.numpy())
m = np.sum(output, axis=1)
preds = np.argmax(m, axis=1)
print(f"Final accuracy: {np.round(np.mean(np.array(preds == ds.y.numpy()))*100,decimals=2)}%")

# Calculate confusion matrix
cm = confusion_matrix(ds.y.numpy(), preds)
disp = ConfusionMatrixDisplay(cm)
disp.plot()
plt.show()

# Print the classification report
print(classification_report(ds.y.numpy(), preds))

In [None]:
# Plot the DYNAPSim network activity for a specific sample
ds = test_dl.dataset
i = 0

plt.figure(figsize=(15,5))
plt.subplot(1,3,1)
TSEvent.from_raster(
            ds.x[i].numpy(),
            dt=1e-3,
        ).plot(marker="|", s=8,linewidths=1)

output, _, _ = net_quantized(ds.x[i].numpy())
spks = np.sum(output,axis=1)[0]
pred = np.argmax(spks)
print(f"Readout layer activity (spikes): {spks}")
print(f"Label: {int(ds.y[i].numpy())}")
print(f"Prediction: {pred}")

plt.subplot(1,3,2)
TSEvent.from_raster(
            output[0],
            dt=1e-3,
        ).plot(marker="|", s=8)

plt.tight_layout()
plt.show()

## DYNAP-SE2 Hardware Deploy

In [None]:
# Search for connected DYNAP-SE2
se2_devices = find_dynapse_boards()
if len(se2_devices) < 0:
    raise "No devices found..."

# Try to find the best IScale parameter value, to account for temperature changes
best_config = None
min_act = 1e10
max_spks = 0
for i_scale_value in range(1,30,2):

    spec_Q = copy(spec_Q)

    # Scale the weights to account for temperature discrepancy
    spec_Q["Iscale"] *= i_scale_value

    # Convert spec to DYNAP-SE2 configuration
    config = config_from_specification(**spec_Q)

    # Test activity
    se2 = DynapseSamna(se2_devices[0], **config,dt=1e-3)
    se2.discharge_capacitors()
    time.sleep(1)
    output, _, _ = se2(ds.x[1].numpy(), record=False)
    spks = np.sum(output[:,0:n_output_channels],axis=0)
    
    print(f"Test iscale_value:{i_scale_value} -> Spikes {spks}")
    label = int(ds.y[1].numpy())
    if len(spks) >= n_output_channels and (spks[label]> max_spks or spks[label] == max_spks and sum(spks) < min_act):
        max_spks = spks[label]
        min_act = sum(spks)
        best_config = config
        print(f"New best: {spks}")
    

# Print the most significative parameters from hardware configuration object
print("\nParameters:")
enabled_params = ["SYAM_W0_P","SYAM_W1_P","SYAM_W2_P","SYAM_W3_P","DEAM_ITAU_P","DEAM_ETAU_P","DEAM_IGAIN_P","DEAM_EGAIN_P","SOIF_LEAK_N","SOIF_SPKTHR_P","SOIF_GAIN_N","SOIF_REFR_N","SOIF_DC_P"]

params = []
for (k,v) in best_config["config"].chips[0].cores[0].parameters.items():
    if k in enabled_params:
        params.append((k,v))

for param in sorted(params):
    print(f"{param[0]} -> (Coarse:{param[1].coarse_value}, Fine:{param[1].fine_value})")

# Configure the board with the found configuration
se2 = DynapseSamna(se2_devices[0], **best_config,dt=1e-3)

In [None]:
# Evaluate the network on the board on a reduced test subset
ds = test_dl.dataset
preds = []
activity = []
sample_subset = (100,120)
for i in range(*sample_subset):
    time.sleep(1)
    se2.discharge_capacitors()
    time.sleep(1)
    plt.figure(figsize=(15,5))
    plt.subplot(1,3,1)
    TSEvent.from_raster(
                ds.x[i].numpy(),
                dt=1e-3,
            ).plot(marker="|", s=8,linewidths=1)

    output, state, rec = se2(ds.x[i].numpy(), record=False)

    spks = np.sum(output[:,0:n_output_channels],axis=0)
    pred = None
    if output.shape[1] > 0:
        pred = np.argmax(spks)
    preds.append(pred)
    activity.append(output)
    print(f"Readout layer activity (spikes): {spks}")
    print(f"Label: {int(ds.y[i].numpy())}")
    print(f"Prediction: {pred}")

    plt.subplot(1,3,2)
    TSEvent.from_raster(
                output[:,0:n_output_channels],
                dt=1e-3,
            ).plot(marker="|", s=8)
    
    plt.tight_layout()
    plt.show()

preds = np.array(preds)
print(f"Final accuracy: {np.mean(preds[preds != None]==ds.y.numpy()[sample_subset[0]:sample_subset[1]][preds!=None])*100}%")

# Store activity for future usage
with open(f"{model_path}/recorded_activity.pkl", 'wb') as f:
    pickle.dump(activity, f, pickle.HIGHEST_PROTOCOL)
    
# Print the classification report
print(ds.y.numpy()[sample_subset[0]:sample_subset[1]][preds!=None])
print(preds[preds != None])
print(classification_report(ds.y.numpy()[sample_subset[0]:sample_subset[1]][preds!=None], preds[preds != None]))