# **Neuromorphic Human Activity Recognition (NeHAR) task**

In this notebook we benchmarked SNN-based models performing the Neuromorphic Human Activity Recognition (NeHAR) task using IMU sensor data acquired form a commercial smartwatch.

Human Activity Recognition (HAR) is a time-dependent task that has applications in various aspects of human life, from healthcare to sports, safety, and smart environments. In this task, we present a comparative analysis of different SNN-based models designed for classifying raw signals (Accelerometer and Gyroscope) collected in the Wireless Sensor Data Mining (WISDM) dataset.

The WISDM dataset consists of data from 51 subjects performing 18 activities. This dataset collects signals from both the accelerometer and the gyroscope of a smartphone and a smartwatch. Each activity is recorded for 3 minutes with an acquisition rate of 20 Hz. The dataset's classes are balanced, with each activity represented in the dataset contributing approximately 5.3% to 5.8% of the total approximately 15.63 million samples.
From the whole smartwatch dataset, we selected a subset of general hand-oriented activities for our analysis. These activities include: (1) dribbling in basketball, (2) playing catch with a tennis ball, (3) typing, (4) writing, (5) clapping, (6) brushing teeth, and (7) folding clothes. We divided the signals into non-overlapping temporal windows with a length of 2 seconds. These temporal windows serve as the input layer for the benchmarked models.

---

Refferring to the paper: Fra, V., Forno, E., Pignari, R., Stewart, T. C., Macii, E., & Urgese, G. (2022).
***Human activity recognition: suitability of a neuromorphic approach for on-edge AIoT applications. Neuromorphic Computing and Engineering***, 2(1), 014006.
DOI ***10.1088/2634-4386/ac4c38***



## Environment set-up

### Install packages in the Google Colab runtime

In [None]:
%%capture
!pip install gdown
!pip install hyperopt
!pip install matplotlib
!pip install neurobench
!pip install numpy
!pip install pandas
!pip install scikit-learn
!pip install scipy
!pip install seaborn
!pip install snntorch
!pip install torch
!pip install tqdm

### Basic import



In [None]:
import gdown
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import pickle as pkl
import random
#from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import TensorDataset, DataLoader
from tqdm import tqdm

from neurobench.models import SNNTorchModel
from neurobench.postprocessing.postprocessor import aggregate, choose_max_count
from neurobench.benchmarks import Benchmark

import torch.nn as nn
from torch.utils.data import DataLoader
import snntorch as snn
from snntorch import surrogate
from snntorch import functional as SF
from snntorch import utils
import copy


### Utility functions and general settings

In [None]:
def create_directory(
    directory_path
    ):
    """
    Muller-Cleve, Simon F.; Istituto Italiano di Tecnologia - IIT; Event-driven perception in robotics - EDPR; Genova, Italy.
    """
    if os.path.exists(directory_path):
        return None
    else:
        try:
            os.makedirs(directory_path)
        except:
            return None
        return directory_path


In [None]:
use_seed = True

if use_seed:
    seed = 42
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
else:
    seed = None

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

## Import HAR dataset

In [None]:
### Link to the folder with data
folder_link = "https://drive.google.com/drive/folders/15TSYpE5QSzjOoqvOn8f7Wf59MwhqQEkB"

In [None]:
%%capture
if "data" not in os.listdir("/"):
  ! gdown $folder_link -O ./data --folder

In [None]:
ds_train = torch.load("./data/watch_subset2_40_ds_train.pt", map_location=device)
ds_val = torch.load("./data/watch_subset2_40_ds_val.pt", map_location=device)
ds_test = torch.load("./data/watch_subset2_40_ds_test.pt", map_location=device)

act_map = {
    'A': 'walking',
    'B': 'jogging',
    'C': 'stairs',
    'D': 'sitting',
    'E': 'standing',
    'M': 'kicking',
    'P': 'dribbling',
    'O': 'catch',
    'F': 'typing',
    'Q': 'writing',
    'R': 'clapping',
    'G': 'teeth',
    'S': 'folding',
    'J': 'pasta',
    'H': 'soup',
    'L': 'sandwich',
    'I': 'chips',
    'K': 'drinking',
}

labels_mapping = {k:act_map[k] for k in list(act_map.keys())[6:13] if k in act_map}
labels_activity = list(act_map.values())[6:13]

In [None]:
# Extract a random sample from the training set
random_sample = next(iter(DataLoader(ds_train, batch_size=1, shuffle=False)))
random_data = random_sample[0]
random_label = random_sample[1]

In [None]:
# Randomly select a channel from IMU data
rnd_ch = np.random.randint(0,6)
print("Selected sample: {}".format(labels_activity[random_label]))
print("Selected channel: {}\n".format(rnd_ch))

random_ch = random_data[0,:,rnd_ch]
print("IMU values in time for the selected sample and channel:")
random_ch

In [None]:
# Plot the random (original) example
plt.figure(figsize=(8,4.5))
plt.plot(random_ch.cpu().numpy())
plt.xlabel("Time (s)")
plt.ylabel("IMU data (a.u.)")
plt.title("Activity: {}".format(labels_activity[random_label]))
plt.show()

In [None]:
print("Training set \t ---data---\n \tnumber of samples: {}\n \tsample shape: {}".format(
    len(ds_train),next(iter(DataLoader(ds_train, batch_size=1, shuffle=False)))[0].shape))
print("Training set \t ---labels---\n \tnumber of labels: {}\n \tlabel shape: {}".format(
    len(ds_train),next(iter(DataLoader(ds_train, batch_size=1, shuffle=False)))[1].shape))
print("\n")
print("Validation set \t ---data---\n \tnumber of samples: {}\n \tshape: {}".format(
    len(ds_val),next(iter(DataLoader(ds_val, batch_size=1, shuffle=False)))[0].shape))
print("Validation set \t ---labels---\n \tnumber of labels: {}\n \tlabel shape: {}".format(
    len(ds_val),next(iter(DataLoader(ds_val, batch_size=1, shuffle=False)))[1].shape))
print("\n")
print("Test set \t ---data---\n \tnumber of samples: {}\n \tshape: {}".format(
    len(ds_test),next(iter(DataLoader(ds_test, batch_size=1, shuffle=False)))[0].shape))
print("Test set \t ---labels---\n \tnumber of labels: {}\n \tlabel shape: {}".format(
    len(ds_test),next(iter(DataLoader(ds_test, batch_size=1, shuffle=False)))[1].shape))

## Feedforward SNN

Training, Validation and Test of a FFSNN fully-connected

*Adapted from: V. Fra et al.; "Neuromorphic Human Activity Recognition through LIF-based neurons"; Brain-Inspired Computing Workshop 2023, Modena (Italy)*


Neurobench Metrics extraction

In [None]:
def training_loop(
    dataset,
    batch_size,
    net,
    optimizer,
    loss_fn,
    device):
    """
    Fra, Vittorio; Politecnico di Torino; EDA Group; Torino, Italy.
    """

    train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, drop_last=False)

    batch_loss = []
    batch_acc = []

    for data, labels in tqdm(train_loader):

      data = data.to(device)#.swapaxes(1, 0)
      labels = labels.to(device)

      net.train()
      rec = net.single_forward(data)
      spk_rec = rec[0]

      # Training loss
      loss_val = loss_fn(spk_rec, labels)
      batch_loss.append(loss_val.detach().cpu().item())

      # Training accuracy
      act_total_out = torch.sum(spk_rec, 0)  # sum over time
      _, neuron_max_act_total_out = torch.max(act_total_out, 1)  # argmax over output units to compare to labels
      batch_acc.append(np.mean((neuron_max_act_total_out == labels).detach().cpu().numpy()))

      # Gradient calculation + weight update
      optimizer.zero_grad()
      loss_val.backward()
      optimizer.step()

    epoch_loss = np.mean(batch_loss)
    epoch_acc = np.mean(batch_acc)

    return [epoch_loss, epoch_acc]


def val_test_loop(
    dataset,
    batch_size,
    net,
    loss_fn,
    device,
    shuffle=True,
    label_probabilities=False,
    return_spikes=False):
    """
    Fra, Vittorio; Politecnico di Torino; EDA Group; Torino, Italy.
    """

    with torch.no_grad():
      net.eval()

      loader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, drop_last=False)

      batch_loss = []
      batch_acc = []

      for data, labels in tqdm(loader):
          data = data.to(device)#.swapaxes(1, 0)
          labels = labels.to(device)

          rec = net.single_forward(data)
          spk_out = rec[0]

          # Loss
          loss_val = loss_fn(spk_out, labels)
          batch_loss.append(loss_val.detach().cpu().item())

          # Accuracy
          act_total_out = torch.sum(spk_out, 0)  # sum over time
          _, neuron_max_act_total_out = torch.max(act_total_out, 1)  # argmax over output units to compare to labels
          batch_acc.append(np.mean((neuron_max_act_total_out == labels).detach().cpu().numpy()))

      if label_probabilities:
          log_softmax_fn = nn.LogSoftmax(dim=-1)
          log_p_y = log_softmax_fn(act_total_out)
          if return_spikes:
            return [np.mean(batch_loss), np.mean(batch_acc)], torch.exp(log_p_y), spk_out.detach().cpu().numpy()
          else:
            return [np.mean(batch_loss), np.mean(batch_acc)], torch.exp(log_p_y)
      else:
        if return_spikes:
          return [np.mean(batch_loss), np.mean(batch_acc)], spk_out.detach().cpu().numpy()
        else:
          return [np.mean(batch_loss), np.mean(batch_acc)]

In [None]:
settings= {"enc_pop": 32,
           "nb_hidden": 250,
           "beta_hid": 0.7,
           "beta_out": 0.65,
           "beta_enc": 0.2,
           "alpha_hid": 0.55,
           "alpha_out": 0.9,
           "thr_enc": 0.5,
           "thr_hid": 0.7,
           "thr_out": 0.9,
           "lr": 0.0001,
           "batch_size": 64
           }

In [None]:
network_results = []

In [None]:
### Network structure (input data --> encoding -> hidden -> output)
input_enc = 6
output_enc = int(settings["enc_pop"])
num_hidden = int(settings["nb_hidden"])
num_outputs = 7

num_steps = 40

class Net(nn.Module):
  def __init__(self):
    super().__init__()

    ##### Initialize layers #####
    ### Encoding layer
    self.enc = nn.Linear(input_enc, output_enc)
    self.s_enc = snn.Leaky(beta=settings['beta_enc'], threshold=settings['thr_enc'])
    ### Hidden layer
    self.fc1 = nn.Linear(output_enc, num_hidden)
    self.s1 = snn.Synaptic(beta=settings['beta_hid'], alpha=settings['alpha_hid'], threshold=settings['thr_hid'])
    ### Output layer
    self.fc2 = nn.Linear(num_hidden, num_outputs)
    self.s2 = snn.Synaptic(beta=settings['beta_out'], alpha=settings['alpha_out'], threshold=settings['thr_out'])
    self.mem_enc = self.s_enc.init_leaky()
    ### Hidden layer
    self.syn1, self.mem1 = self.s1.init_synaptic()
    ### Output layer
    self.syn2, self.mem2 = self.s2.init_synaptic()

  def single_forward(self, x):

    ### Encoding layer
    mem_enc = self.s_enc.init_leaky()
    ### Hidden layer
    syn1, mem1 = self.s1.init_synaptic()
    ### Output layer
    syn2, mem2 = self.s2.init_synaptic()

    # Record the final layer
    spk2_rec = []
    syn2_rec = []
    mem2_rec = []

    for step in range(num_steps):
      ### Encoding layer
      cur_enc = self.enc(x[:,step])
      spk_enc, mem_enc = self.s_enc(cur_enc, mem_enc)
      ### Hidden layer
      cur1 = self.fc1(spk_enc)
      spk1, syn1, mem1 = self.s1(cur1, syn1, mem1)
      ### Output layer
      cur2 = self.fc2(spk1)
      spk2, syn2, mem2 = self.s2(cur2, syn2, mem2)

      spk2_rec.append(spk2)
      syn2_rec.append(syn2)
      mem2_rec.append(mem2)

    return torch.stack(spk2_rec, dim=0), torch.stack(syn2_rec, dim=0), torch.stack(mem2_rec, dim=0)

  def forward(self, x):
       ### Encoding layer
      cur_enc = self.enc(x)
      spk_enc, self.mem_enc = self.s_enc(cur_enc, self.mem_enc)
      ### Hidden layer
      cur1 = self.fc1(spk_enc)
      spk1, self.syn1, self.mem1 = self.s1(cur1, self.syn1, self.mem1)
      ### Output layer
      cur2 = self.fc2(spk1)
      spk2, self.syn2, self.mem2 = self.s2(cur2, self.syn2, self.mem2)

      return spk2, self.mem2

  def reset(self):
    self.mem_enc = self.s_enc.init_leaky()
    ### Hidden layer
    self.syn1, self.mem1 = self.s1.init_synaptic()
    ### Output layer
    self.syn2, self.mem2 = self.s2.init_synaptic()


net = Net().to(device)

In [None]:
### Set the loss function
loss_fn = SF.ce_count_loss()

### Set the optimizer
optimizer = torch.optim.Adam(net.parameters(), lr=settings['lr'], betas=(0.9, 0.999))

### Set the batch size
batch_size = settings["batch_size"]

#### Training (with validation)

In [None]:
num_epochs = 100

In [None]:
training_results = []
validation_results = []

for epoch in range(num_epochs):

  train_loss, train_acc = training_loop(ds_train, batch_size, net, optimizer, loss_fn, device)
  val_loss, val_acc = val_test_loop(ds_val, batch_size, net, loss_fn, device)

  training_results.append([train_loss, train_acc])
  validation_results.append([val_loss, val_acc])

  print("Epoch {}/{}: \n\ttraining loss: {} \n\tvalidation loss: {} \n\ttraining accuracy: {}% \n\tvalidation accuracy: {}%".format(epoch+1, num_epochs, training_results[-1][0], validation_results[-1][0], np.round(training_results[-1][1]*100,4), np.round(validation_results[-1][1]*100,4)))

#### Test

In [None]:
test_results, lbl_probs, spk_out = val_test_loop(ds_test, batch_size, net, loss_fn, device, label_probabilities=True, return_spikes=True)

print("\nTest accuracy: {}%".format(np.round(test_results[1]*100,4)))

#### Save model

In [None]:
create_directory('model_data')
torch.save(net.state_dict(), './model_data/HAR_FFSNN.pth')

#### Neurobench Metrics

In [None]:
net = Net().to(device)
net.load_state_dict(torch.load('./model_data/HAR_FFSNN.pth'))

model = SNNTorchModel(net)
test_set_loader = DataLoader(ds_test, batch_size=settings["batch_size"], shuffle=True, drop_last=False)
postprocessors = [choose_max_count]

static_metrics = ["model_size"]
workload_metrics = ["classification_accuracy"]

benchmark = Benchmark(model, test_set_loader, [], postprocessors, [static_metrics, workload_metrics])
results = benchmark.run()
print(results)

results = [results[key] for key in results.keys()]
results.insert(0, 'FFSNN')

network_results.append(copy.copy(results))


#### Single-sample inference

In [None]:
single_sample = next(iter(DataLoader(ds_test, batch_size=1, shuffle=True)))
print("Randomly selected sample: {}".format(labels_activity[single_sample[1].cpu()[0]]))

In [None]:
### Inference
_, lbl_probs, spk_out = val_test_loop(TensorDataset(single_sample[0],single_sample[1]), 1, net, loss_fn, device, label_probabilities=True, return_spikes=True)

### Plot output spiking activity
spk_out = np.moveaxis(spk_out,1,2)
spk_out = np.squeeze(spk_out, axis=-1)
spk_out.shape
aer = []
for num,el in enumerate(spk_out):
  addr = np.where(el)[0].tolist()
  if len(addr) > 0:
    for ii in addr:
      aer.append([num,ii])
aer = np.array(aer)
plt.scatter(aer[:,0], aer[:,1], s=1)
plt.xlabel("Time step (a.u.)")
plt.ylabel("Neuron")
plt.title("Spiking output activity (activity: {}, prediction: {})".format(labels_activity[single_sample[1].cpu()[0]],labels_activity[np.argmax(lbl_probs.cpu())]))
plt.ylim(-0.5,6.5)
plt.xlim((-0.5,num_steps+0.5))
plt.yticks(range(7),labels_activity)
plt.show()

print("\nLabels probabilities:")
for num,el in enumerate(labels_activity):
  print("\t{} \n\t\t{}%".format(el,np.round(lbl_probs.cpu().numpy()[0][num]*100,2)))

## Recurrent SNN

Training, Validation and Test of a fully-connected RSNN

*Adapted from: V. Fra et al.; "Neuromorphic Human Activity Recognition through LIF-based neurons"; Brain-Inspired Computing Workshop 2023, Modena (Italy)*


Neurobench Metrics extraction

In [None]:
def training_loop(
    dataset,
    batch_size,
    net,
    optimizer,
    loss_fn,
    device):
    """
    Fra, Vittorio; Politecnico di Torino; EDA Group; Torino, Italy.
    """

    train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, drop_last=False)

    batch_loss = []
    batch_acc = []

    for data, labels in tqdm(train_loader):

      data = data.to(device)#.swapaxes(1, 0)
      labels = labels.to(device)

      net.train()
      rec = net.single_forward(data)
      spk_rec = rec[0]

      # Training loss
      loss_val = loss_fn(spk_rec, labels)
      batch_loss.append(loss_val.detach().cpu().item())

      # Training accuracy
      act_total_out = torch.sum(spk_rec, 0)  # sum over time
      _, neuron_max_act_total_out = torch.max(act_total_out, 1)  # argmax over output units to compare to labels
      batch_acc.append(np.mean((neuron_max_act_total_out == labels).detach().cpu().numpy()))

      # Gradient calculation + weight update
      optimizer.zero_grad()
      loss_val.backward()
      optimizer.step()

    epoch_loss = np.mean(batch_loss)
    epoch_acc = np.mean(batch_acc)

    return [epoch_loss, epoch_acc]


def val_test_loop(
    dataset,
    batch_size,
    net,
    loss_fn,
    device,
    shuffle=True,
    label_probabilities=False,
    return_spikes=False):
    """
    Fra, Vittorio; Politecnico di Torino; EDA Group; Torino, Italy.
    """

    with torch.no_grad():
      net.eval()

      loader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, drop_last=False)

      batch_loss = []
      batch_acc = []

      for data, labels in tqdm(loader):
          data = data.to(device)#.swapaxes(1, 0)
          labels = labels.to(device)

          rec = net.single_forward(data)
          spk_out = rec[0]

          # Loss
          loss_val = loss_fn(spk_out, labels)
          batch_loss.append(loss_val.detach().cpu().item())

          # Accuracy
          act_total_out = torch.sum(spk_out, 0)  # sum over time
          _, neuron_max_act_total_out = torch.max(act_total_out, 1)  # argmax over output units to compare to labels
          batch_acc.append(np.mean((neuron_max_act_total_out == labels).detach().cpu().numpy()))

      if label_probabilities:
          log_softmax_fn = nn.LogSoftmax(dim=-1)
          log_p_y = log_softmax_fn(act_total_out)
          if return_spikes:
            return [np.mean(batch_loss), np.mean(batch_acc)], torch.exp(log_p_y), spk_out.detach().cpu().numpy()
          else:
            return [np.mean(batch_loss), np.mean(batch_acc)], torch.exp(log_p_y)
      else:
        if return_spikes:
          return [np.mean(batch_loss), np.mean(batch_acc)], spk_out.detach().cpu().numpy()
        else:
          return [np.mean(batch_loss), np.mean(batch_acc)]

In [None]:
settings = {"neurons_per_pop": 5.0,
            "output_pop": 32.0,
            "nb_hidden": 250.0,
            "alpha_hid": 0.55,
            "alpha_out": 0.9,
            "beta_hid": 0.7,
            "beta_out": 0.65,
            "beta_enc": 0.2,
            "lr": 0.0001,
            "slope": 15.0,
            "batch_size": 64.0
            }

In [None]:
### Network structure (input data --> encoding -> hidden -> output)
input_channels = 6
pop_size = int(settings["neurons_per_pop"]) # --> the number of neurons for the encoding layer with populations will be: pop_size*input_channels
output_pop = int(settings["output_pop"])
output_enc = output_pop*input_channels
num_hidden = int(settings["nb_hidden"])
num_outputs = 7

num_steps = 40

### Surrogate gradient setting
spike_grad = surrogate.fast_sigmoid(slope=int(settings["slope"]))


class Net(nn.Module):

    def __init__(self):

        super().__init__()

        ##### Define layers #####
        ### Encoding layer with populations
        self.pop_size = pop_size
        self.enc_pops = []
        self.lif_enc_pops = []
        for ii in range(input_channels):
            self.enc_pops.append(nn.Linear(pop_size, output_pop).to(device))
            self.lif_enc_pops.append(snn.Leaky(beta=settings["beta_enc"], learn_beta=True, learn_threshold=True).to(device))
        self.enc_pops = nn.ModuleList(self.enc_pops)
        self.lif_enc_pops = nn.ModuleList(self.lif_enc_pops)
        ### Recurrent layer
        self.fc1 = nn.Linear(output_enc, num_hidden)
        self.lif1 = snn.RSynaptic(alpha=settings["alpha_hid"], beta=settings["beta_hid"], learn_alpha=True, learn_beta=True, learn_threshold=True, linear_features=num_hidden, spike_grad=spike_grad)
        ### Output layer
        self.fc2 = nn.Linear(num_hidden, num_outputs)
        self.lif2 = snn.Synaptic(alpha=settings["alpha_out"], beta=settings["beta_out"], learn_alpha=True, learn_beta=True, learn_threshold=True)

        # NOTE that this is actually redundant outside of NeuroBench benchmarking
        ##### Initialize hidden states at t=0 #####
        ### Encoding layer with populations
        self.mem_pops_enc = torch.empty((input_channels,int(settings["batch_size"]),output_pop), dtype=torch.float, device=device)
        self.spk_pops_enc = torch.empty((input_channels,int(settings["batch_size"]),output_pop), dtype=torch.float, device=device)
        self.cur_pops_enc = torch.empty((input_channels,int(settings["batch_size"]),output_pop), dtype=torch.float, device=device)
        ### Recurrent layer
        self.spk1, self.syn1, self.mem1 = self.lif1.init_rsynaptic()
        ### Output layer
        self.syn2, self.mem2 = self.lif2.init_synaptic()


    def single_forward(self, x):

        x = x.swapaxes(1,0)

        ##### Initialize hidden states at t=0 #####
        ### Encoding layer with populations
        mem_pops_enc = torch.empty((input_channels,x.shape[1],output_pop), dtype=torch.float, device=device)
        spk_pops_enc = torch.empty((input_channels,x.shape[1],output_pop), dtype=torch.float, device=device)
        cur_pops_enc = torch.empty((input_channels,x.shape[1],output_pop), dtype=torch.float, device=device)
        ### Recurrent layer
        spk1, syn1, mem1 = self.lif1.init_rsynaptic()
        ### Output layer
        syn2, mem2 = self.lif2.init_synaptic()

        # Record the final layer
        spk2_rec = []
        syn2_rec = []
        mem2_rec = []

        for step in range(num_steps):
            ### Encoding layer with populations
            for num,el in enumerate(self.enc_pops):
                cur_pops_enc[num] = el(torch.tile(x[step,:,num],(self.pop_size,1)).swapaxes(1,0))
            for num,el in enumerate(self.lif_enc_pops):
                spk_pops_enc[num], mem_pops_enc[num] = el(cur_pops_enc[num], mem_pops_enc[num])
            spk_enc = spk_pops_enc.clone().permute(1, 0, 2).reshape((x.shape[1],input_channels*output_pop)).requires_grad_(True)
            ### Recurrent layer
            cur1 = self.fc1(spk_enc)
            spk1, syn1, mem1 = self.lif1(cur1, spk1, syn1, mem1)
            ### Output layer
            cur2 = self.fc2(spk1)
            spk2, syn2, mem2 = self.lif2(cur2, syn2, mem2)

            spk2_rec.append(spk2)
            syn2_rec.append(syn2)
            mem2_rec.append(mem2)

        return torch.stack(spk2_rec, dim=0), torch.stack(syn2_rec, dim=0), torch.stack(mem2_rec, dim=0)


    def forward(self, x):

        ### Encoding layer with populations
        for num,el in enumerate(self.enc_pops):
            self.cur_pops_enc[num] = el(torch.tile(x[:,num],(self.pop_size,1)).swapaxes(1,0))
        for num,el in enumerate(self.lif_enc_pops):
            self.spk_pops_enc[num], self.mem_pops_enc[num] = el(self.cur_pops_enc[num], self.mem_pops_enc[num])
        spk_enc = self.spk_pops_enc.clone().permute(1, 0, 2).reshape((x.shape[0],input_channels*output_pop)).requires_grad_(True)
        ### Recurrent layer
        cur1 = self.fc1(spk_enc)
        self.spk1, self.syn1, self.mem1 = self.lif1(cur1, self.spk1, self.syn1, self.mem1)
        ### Output layer
        cur2 = self.fc2(self.spk1)
        spk2, self.syn2, self.mem2 = self.lif2(cur2, self.syn2, self.mem2)

        return spk2, self.mem2

    # def reset(self, x):
    #     ### Encoding layer with populations
    #     self.mem_pops_enc = torch.empty((input_channels,x.shape[1],output_pop), dtype=torch.float, device=device)
    #     self.spk_pops_enc = torch.empty((input_channels,x.shape[1],output_pop), dtype=torch.float, device=device)
    #     self.cur_pops_enc = torch.empty((input_channels,x.shape[1],output_pop), dtype=torch.float, device=device)
    #     ### Hidden layer
    #     self.spk1, self.syn1, self.mem1 = self.lif1.init_rsynaptic()
    #     ### Output layer
    #     self.syn2, self.mem2 = self.lif2.init_synaptic()


net = Net().to(device)

In [None]:
### Set the loss function
loss_fn = SF.ce_count_loss()

### Set the optimizer
optimizer = torch.optim.Adam(net.parameters(), lr=settings['lr'], betas=(0.9, 0.999))

### Set the batch size
batch_size = int(settings["batch_size"])

#### Training (with validation)

In [None]:
num_epochs = 100

In [None]:
training_results = []
validation_results = []

for epoch in range(num_epochs):

  train_loss, train_acc = training_loop(ds_train, batch_size, net, optimizer, loss_fn, device)
  val_loss, val_acc = val_test_loop(ds_val, batch_size, net, loss_fn, device)

  training_results.append([train_loss, train_acc])
  validation_results.append([val_loss, val_acc])

  print("Epoch {}/{}: \n\ttraining loss: {} \n\tvalidation loss: {} \n\ttraining accuracy: {}% \n\tvalidation accuracy: {}%".format(epoch+1, num_epochs, training_results[-1][0], validation_results[-1][0], np.round(training_results[-1][1]*100,4), np.round(validation_results[-1][1]*100,4)))

#### Test

In [None]:
test_results, lbl_probs, spk_out = val_test_loop(ds_test, batch_size, net, loss_fn, device, label_probabilities=True, return_spikes=True)

print("\nTest accuracy: {}%".format(np.round(test_results[1]*100,4)))

#### Save model

In [None]:
create_directory('model_data')
torch.save(net.state_dict(), './model_data/HAR_RSNN.pth')

#### Neurobench Metrics

In [None]:
net = Net().to(device)
net.load_state_dict(torch.load('./model_data/HAR_RSNN.pth'))

model = SNNTorchModel(net)
test_set_loader = DataLoader(ds_test, batch_size=int(settings["batch_size"]), shuffle=True, drop_last=False)
postprocessors = [choose_max_count]

static_metrics = ["model_size"]
workload_metrics = ["classification_accuracy"]

benchmark = Benchmark(model, test_set_loader, [], postprocessors, [static_metrics, workload_metrics])
results = benchmark.run()

results = [results[key] for key in results.keys()]
results.insert(0, 'RSNN')

network_results.append(copy.copy(results))

#### Single-sample inference

In [None]:
single_sample = next(iter(DataLoader(ds_test, batch_size=1, shuffle=True)))
print("Randomly selected sample: {}".format(labels_activity[single_sample[1].cpu()[0]]))

In [None]:
### Inference
_, lbl_probs, spk_out = val_test_loop(TensorDataset(single_sample[0],single_sample[1]), 1, net, loss_fn, device, label_probabilities=True, return_spikes=True)

### Plot output spiking activity
spk_out = np.moveaxis(spk_out,1,2)
spk_out = np.squeeze(spk_out, axis=-1)
spk_out.shape
aer = []
for num,el in enumerate(spk_out):
  addr = np.where(el)[0].tolist()
  if len(addr) > 0:
    for ii in addr:
      aer.append([num,ii])
aer = np.array(aer)
plt.scatter(aer[:,0], aer[:,1], s=1)
plt.xlabel("Time step (a.u.)")
plt.ylabel("Neuron")
plt.title("Spiking output activity (activity: {}, prediction: {})".format(labels_activity[single_sample[1].cpu()[0]],labels_activity[np.argmax(lbl_probs.cpu())]))
plt.ylim(-0.5,6.5)
plt.xlim((-0.5,num_steps+0.5))
plt.yticks(range(7),labels_activity)
plt.show()

print("\nLabels probabilities:")
for num,el in enumerate(labels_activity):
  print("\t{} \n\t\t{}%".format(el,np.round(lbl_probs.cpu().numpy()[0][num]*100,2)))

## Convolutional SNN

Training, Validation and Test of a CSNN

Neurobench Metrics extraction

In [None]:
settings = {
    'conv_1_out_fts': 200,
    'conv_1_kernel_size': 2,
    'maxpool_1_fts': 2,
    'conv_1_pad': 4,
    'leaky_1_beta': 0.4,
    'leaky_1_thr': 0.002,
    'conv_2_in_fts': 100,
    'conv_2_out_fts': 256,
    'conv_2_kernel_size': 1,
    'maxpool_2_fts': 2,
    'leaky_2_beta': 0.3,
    'leaky_2_thr': 0.001,
    'leaky_3_beta': 0.5,
    'leaky_3_thr': 0.001,
    'lr': 1.e-3,
    'batch_size': 256,
}

In [None]:
input_enc = 6
num_outputs = 7
num_steps = 40

class Net(nn.Module):
    def __init__(self):
        super().__init__()

        self.conv1 = nn.Conv1d(input_enc, settings['conv_1_out_fts'], kernel_size=settings['conv_1_kernel_size'], padding=settings['conv_1_pad'])
        self.max1 = nn.MaxPool2d(settings['maxpool_1_fts'])
        self.leaky1 = snn.Leaky(beta=settings['leaky_1_beta'], init_hidden=True, threshold=settings['leaky_1_thr'])
        self.conv2 = nn.Conv1d(settings['conv_2_in_fts'], settings['conv_2_out_fts'], kernel_size=settings['conv_2_kernel_size'])
        self.max2 = nn.MaxPool2d(settings['maxpool_2_fts'])
        self.leaky2 = snn.Leaky(beta=settings['leaky_2_beta'], init_hidden=True, threshold=settings['leaky_2_thr'])
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(settings['conv_2_out_fts'], num_outputs)
        self.leaky3 = snn.Leaky(beta=settings['leaky_3_beta'], output=True, init_hidden=True, threshold=settings['leaky_3_thr'])


    def forward(self, input):

        x = self.conv1(input.reshape(input.shape[0], input.shape[1], 1))
        x = self.max1(x)
        x = self.leaky1(x)
        x = self.conv2(x)
        x = self.max2(x)
        x = self.leaky2(x)
        x = self.flatten(x)
        x = self.linear(x)
        spk_out, mem_out = self.leaky3(x)
        return spk_out, mem_out

    def single_forward(self, input):
        mem_rec = []
        spk_rec = []

        self.leaky1.init_leaky()
        self.leaky2.init_leaky()
        self.leaky3.init_leaky()
        utils.reset(self)
        for step in range(num_steps):

            new_input = input[:, step, :]
            x = self.conv1(new_input.reshape(new_input.shape[0], new_input.shape[1], 1))
            x = self.max1(x)
            x = self.leaky1(x)
            x = self.conv2(x)
            x = self.max2(x)
            x = self.leaky2(x)
            x = self.flatten(x)
            x = self.linear(x)
            spk_out, mem_out = self.leaky3(x)

            spk_rec.append(spk_out)
            mem_rec.append(mem_out)

        return torch.stack(spk_rec), torch.stack(mem_rec)

    def reset(self):
      self.leaky1.init_leaky()
      self.leaky2.init_leaky()
      self.leaky3.init_leaky()

    def frequency(self):
      acquisition_rate = 20
      temporal_window = num_steps/acquisition_rate
      return 1/temporal_window


net = Net().to(device)

In [None]:
### Set the loss function
loss_fn = SF.ce_count_loss()

### Set the optimizer
optimizer = torch.optim.Adam(net.parameters(), lr=settings['lr'], betas=(0.9, 0.999))

### Set the batch size
batch_size = settings["batch_size"]

#### Training (with validation)

In [None]:
num_epochs = 100

training_results = []
validation_results = []

for epoch in range(num_epochs):

  train_loss, train_acc = training_loop(ds_train, batch_size, net, optimizer, loss_fn, device)
  val_loss, val_acc = val_test_loop(ds_val, batch_size, net, loss_fn, device)

  training_results.append([train_loss, train_acc])
  validation_results.append([val_loss, val_acc])

  print("Epoch {}/{}: \n\ttraining loss: {} \n\tvalidation loss: {} \n\ttraining accuracy: {}% \n\tvalidation accuracy: {}%".format(epoch+1, num_epochs, training_results[-1][0], validation_results[-1][0], np.round(training_results[-1][1]*100,4), np.round(validation_results[-1][1]*100,4)))

#### Test

In [None]:
test_results, lbl_probs, spk_out = val_test_loop(ds_test, batch_size, net, loss_fn, device, label_probabilities=True, return_spikes=True)

print("\nTest accuracy: {}%".format(np.round(test_results[1]*100,4)))

#### Save model

In [None]:
create_directory('model_data')
torch.save(net.state_dict(), './model_data/HAR_SCNN.pth')

#### Neurobench Metrics

In [None]:
net = Net().to(device)
net.load_state_dict(torch.load('./model_data/HAR_SCNN.pth'))

model = SNNTorchModel(net)
test_set_loader = DataLoader(ds_test, batch_size=settings["batch_size"], shuffle=True, drop_last=False)
postprocessors = [choose_max_count]

static_metrics = ["model_size"]
workload_metrics = ["classification_accuracy"]

benchmark = Benchmark(model, test_set_loader, [], postprocessors, [static_metrics, workload_metrics])
results = benchmark.run()

results = [results[key] for key in results.keys()]
results.insert(0, 'SCNN')

network_results.append(copy.copy(results))

## Networks Results

In [None]:
pd.DataFrame(network_results, columns=["Model Name", "Model Size", "Classification Accuracy"])