In [2]:
%%capture
!apt-get install libglu1-mesa
!pip install sinabs tonic

To remove later cell:

- every neuron 16bit, every synapse 8bit. Follow the following table for layer:
https://sinabs.readthedocs.io/en/v2.0.0/speck/overview.html

# LOAD DATA

In [7]:
#### LOAD DATA ####

import tonic
from tonic.transforms import ToFrame
from tonic.datasets import nmnist
from tonic import transforms
import os
import numpy as np

root = "/"

transform = transforms.Compose([
    transforms.ToFrame(sensor_size=tonic.datasets.DVSGesture.sensor_size, n_time_bins=16, include_incomplete=True),
    lambda x: x.astype(np.float32),
])

testset = tonic.datasets.DVSGesture(save_to=os.path.join(root, "tutorials/data"), train=False, transform=transform)
trainset = tonic.datasets.DVSGesture(save_to=os.path.join(root, "tutorials/data"), train=True, transform=transform)

events, label = trainset[0]
events[0].shape

(2, 128, 128)

# DEFINE MODEL

In [8]:
#### DEFINE MODEL ###

import torch
import torch.nn as nn
from typing import List
import sinabs
import sinabs.layers as sl

class DVSGestureNet(nn.Module):
    def __init__(self, channels=128, *args, **kwargs):
        super().__init__()

        conv = []
        for i in range(5):
            if conv.__len__() == 0:
                in_channels = 2
                stride=2
            else:
                in_channels = channels
                stride=1

            conv.append(nn.Conv2d(in_channels, channels, kernel_size=3, padding=1, stride=stride))
            conv.append(nn.BatchNorm2d(channels))
            conv.append(sl.IAFSqueeze(*args, **kwargs))
            if i != 0:
              conv.append(sl.SumPool2d(2, 2))


        self.conv_fc = nn.Sequential(
            *conv,

            nn.Flatten(),
            nn.Dropout(0.5),
            nn.Linear(channels * 4 * 4, 512),
            sl.IAFSqueeze(*args, **kwargs),

            nn.Dropout(0.5),
            nn.Linear(512, 110),
            sl.IAFSqueeze(*args, **kwargs),
            nn.Linear(110,11),
            #sl.SumPool2d((10,1), stride=(10,1)),
            sl.IAFSqueeze(*args, **kwargs),

        )

    def forward(self, x: torch.Tensor):
        return self.conv_fc(x)

    def return_sequential(self):
      return self.conv_fc

In [9]:
net = DVSGestureNet(batch_size=1, channels=8)

In [10]:
from torchsummary import summary
net.return_sequential()
#summary(net, (2,128,128))
x = torch.randn(1,2,128,128)
net(x)

tensor([[0., 0., -0., -0., 0., 0., 0., 0., -0., 0., 0.]],
       grad_fn=<ViewBackward0>)

# TRAIN MODEL

In [19]:
from torch.utils.data import DataLoader
from torch.optim import SGD, Adam
from torch.nn import CrossEntropyLoss
from tqdm import tqdm

epochs = 1
lr = 1e-3
batch_size = 4
num_workers = 4
n_time_steps=16
device = "cuda:0" # "cpu"
shuffle = True

snn_train_dataloader = DataLoader(trainset, batch_size=batch_size, num_workers=num_workers, drop_last=True, shuffle=True)
snn_test_dataloader = DataLoader(testset, batch_size=batch_size, num_workers=num_workers, drop_last=True, shuffle=False)

net = net.to(device=device)

optimizer = Adam(params=net.parameters(), lr=lr)
criterion = CrossEntropyLoss()

for e in range(epochs):

    # train
    train_p_bar = tqdm(snn_train_dataloader)
    for data, label in train_p_bar:
        # reshape the input from [Batch, Time, Channel, Height, Width] into [Batch*Time, Channel, Height, Width]
        data = data.reshape(-1, 2, 128, 128).to(dtype=torch.float, device=device)
        label = label.to(dtype=torch.long, device=device)
        # forward
        optimizer.zero_grad()
        output = net(data)
        # reshape the output from [Batch*Time,num_classes] into [Batch, Time, num_classes]
        output = output.reshape(batch_size, n_time_steps, -1)
        # accumulate all time-steps output for final prediction
        output = output.sum(dim=1)
        loss = criterion(output, label)
        #print(loss.device)
        # backward
        loss.backward()
        optimizer.step()

        # detach the neuron states and activations from current computation graph(necessary)
        for layer in net.modules():
            if isinstance(layer, sl.StatefulLayer):
                for name, buffer in layer.named_buffers():
                    buffer.detach_()

        # set progressing bar
        train_p_bar.set_description(f"Epoch {e} - BPTT Training Loss: {round(loss.item(), 4)}")

    # validate
    correct_predictions = []
    with torch.no_grad():
        test_p_bar = tqdm(snn_test_dataloader)
        for data, label in test_p_bar:
            # reshape the input from [Batch, Time, Channel, Height, Width] into [Batch*Time, Channel, Height, Width]
            data = data.reshape(-1, 2, 34, 34).to(dtype=torch.float, device=device)
            label = label.to(dtype=torch.long, device=device)
            # forward
            output = net(data)
            # reshape the output from [Batch*Time,num_classes] into [Batch, Time, num_classes]
            output = output.reshape(batch_size, n_time_steps, -1)
            # accumulate all time-steps output for final prediction
            output = output.sum(dim=1)
            # calculate accuracy
            pred = output.argmax(dim=1, keepdim=True)
            # compute the total correct predictions
            correct_predictions.append(pred.eq(label.view_as(pred)))
            # set progressing bar
            test_p_bar.set_description(f"Epoch {e} - BPTT Testing Model...")

        correct_predictions = torch.cat(correct_predictions)
        print(f"Epoch {e} - BPTT accuracy: {correct_predictions.sum().item()/(len(correct_predictions))*100}%")

  self.pid = os.fork()
  self.pid = os.fork()
  0%|          | 0/269 [00:03<?, ?it/s]


RuntimeError: Trying to backward through the graph a second time (or directly access saved tensors after they have already been freed). Saved intermediate values of the graph are freed when you call .backward() or autograd.grad(). Specify retain_graph=True if you need to backward through the graph a second time or if you need to access saved tensors after calling backward.

# DEPLOY MODEL ON SPECK

In [None]:
import numpy as np
import torch
from torch.utils.data import DataLoader
from torch.optim import SGD
from tqdm.notebook import tqdm
from torch.nn import CrossEntropyLoss

In [None]:
from sinabs.backend.dynapcnn import io
print(io.device_types)

{'speck': 'speck', 'speck2b': 'Speck2bTestboard', 'speck2devkit': 'Speck2DevKit', 'speck2btiny': 'Speck2bDevKitTiny', 'speck2e': 'Speck2eTestBoard', 'speck2edevkit': 'Speck2eDevKit', 'speck2fmodule': 'Speck2fModuleDevKit', 'speck2fdevkit': 'Speck2fDevKit', 'dynapse1devkit': 'Dynapse1DevKit', 'davis346': 'Davis 346', 'davis240': 'Davis 240', 'dvxplorer': 'DVXplorer', 'pollendevkit': 'PollenDevKit', 'dynapcnndevkit': 'DynapcnnDevKit', 'dynapse2': 'DYNAP-SE2 DevBoard', 'dynapse2_stack': 'DYNAP-SE2 Stack'}


In [None]:
from sinabs.backend.dynapcnn import DynapcnnNetwork

# cpu_snn = snn_convert.to(device="cpu")
cpu_snn = net.return_sequential().to(device="cpu")
dynapcnn = DynapcnnNetwork(snn=cpu_snn, input_shape=(2, 128, 128), discretize=True, dvs_input=False)
devkit_name = "speck2edevkit"

# use the `to` method of DynapcnnNetwork to deploy the SNN to the devkit
dynapcnn.to(device=devkit_name, chip_layers_ordering="auto",monitor_layers=[-1])
print(f"The SNN is deployed on the core: {dynapcnn.chip_layers_ordering}")

Network is valid


KeyError: 'speck2edevkit:0'