In [7]:
# !pip install snntorch
# !pip install tonic

In [8]:
import urllib.request
import torch, torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader

import snntorch as snn
from snntorch import surrogate
from snntorch import functional as SF

import tonic
import tonic.transforms as transforms

from torch import randn_like

In [9]:
sensor_size = tonic.datasets.NMNIST.sensor_size
transforms = transforms.Compose([
    transforms.Denoise(filter_time=10000),
    transforms.ToFrame(sensor_size=sensor_size, n_time_bins=100),
    ])

In [10]:
data_path = 'data/nmnist' # Directory where NMNIST dataset is stored
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") # Use GPU if available

# nmnist_train = tonic.datasets.NMNIST(data_path, train=True, transform=transforms)
nmnist_test = tonic.datasets.NMNIST(data_path, train=False, transform=transforms)

In [11]:
config = {
    "num_epochs": 100,  # Number of epochs to train for (per trial)
    "batch_size": 1024,  # Batch size
    "seed": 0,  # Random seed

    # Network parameters
    "batch_norm": False,  # Whether or not to use batch normalization
    "dropout": 0.07,  # Dropout rate
    "beta": 0.97,  # Decay rate parameter (beta)
    "threshold": 2.5,  # Threshold parameter (theta)
    "lr": 3.0e-3,  # Initial learning rate
    "slope": 5.6,  # Slope value (k)

    # Fixed params
    "num_steps": 100,  # Number of timesteps to encode input for
    "correct_rate": 0.8,  # Correct rate
    "incorrect_rate": 0.2,  # Incorrect rate
    "betas": (0.9, 0.999),  # Adam optimizer beta values
}

In [12]:
batch_size = 1024
# cached_train = nmnist_train if debug else tonic.DiskCachedDataset(nmnist_train, cache_path='/temp/dvsgesture/train')
cached_test = tonic.DiskCachedDataset(nmnist_test, cache_path='/temp/nmnist/test')

# trainloader = DataLoader(cached_train, shuffle=True, batch_size=batch_size, collate_fn=tonic.collation.PadTensors(batch_first=False))
testloader = DataLoader(cached_test, shuffle=True, batch_size=batch_size, collate_fn=tonic.collation.PadTensors(batch_first=False))
# frames, target = next(iter(trainloader))

In [13]:
class Net(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.thr = config["threshold"]
        self.slope = config["slope"]
        self.beta = config["beta"]
        self.num_steps = config["num_steps"]
        self.batch_norm = config["batch_norm"]
        self.p1 = config["dropout"]
        self.spike_grad = surrogate.fast_sigmoid(self.slope)
        # self.init_net()

        # Initialize Layers
        self.conv1 = nn.Conv2d(2, 16, 5, bias=False)
        self.conv1_bn = nn.BatchNorm2d(16)
        self.lif1 = snn.Leaky(self.beta, threshold=self.thr, spike_grad=self.spike_grad)
        self.conv2 = nn.Conv2d(16, 32, 5, bias=False)
        self.conv2_bn = nn.BatchNorm2d(64)
        self.lif2 = snn.Leaky(self.beta, threshold=self.thr, spike_grad=self.spike_grad)
        self.fc1 = nn.Linear(32 * 5 * 5, 10, bias=False)
        self.lif3 = snn.Leaky(self.beta, threshold=self.thr, spike_grad=self.spike_grad)
        self.dropout = nn.Dropout(self.p1)

    def forward(self, x):
        # Initialize hidden states and outputs at t=0
        mem1 = self.lif1.init_leaky()
        mem2 = self.lif2.init_leaky()
        mem3 = self.lif3.init_leaky()

        # Record the final layer
        spk3_rec = []
        mem3_rec = []

        for step in range(self.num_steps):
            cur1 = F.avg_pool2d(self.conv1(x[step]), 2)
            if self.batch_norm:
                cur1 = self.conv1_bn(cur1)

            spk1, mem1 = self.lif1(cur1, mem1)
            cur2 = F.avg_pool2d(self.conv2(spk1), 2)
            if self.batch_norm:
                cur2 = self.conv2_bn(cur2)

            spk2, mem2 = self.lif2(cur2, mem2)
            cur3 = self.dropout(self.fc1(spk2.flatten(1)))
            spk3, mem3 = self.lif3(cur3, mem3)
            spk3_rec.append(spk3)
            mem3_rec.append(mem3)

        return torch.stack(spk3_rec, dim=0), torch.stack(mem3_rec, dim=0)

net = Net(config).to(device)

In [14]:
optimizer = torch.optim.Adam(net.parameters(),
    lr=config["lr"], betas=config["betas"]
)

criterion = SF.mse_count_loss(correct_rate=config["correct_rate"],
    incorrect_rate=config["incorrect_rate"]
)

In [15]:
# def train(config, net, trainloader, criterion, optimizer, device="cuda", scheduler=None):
#     """Complete one epoch of training."""

#     net.train()
#     loss_accum = []
#     i = 0
#     for data, labels in trainloader:
#         data, labels = data.to(device), labels.to(device)
#         # print(data.shape)
#         spk_rec, _ = net(data.permute(0, 1, 2, 3, 4))
#         loss = criterion(spk_rec, labels)
#         optimizer.zero_grad()
#         loss.backward()

#         optimizer.step()
#         loss_accum.append(loss.item() / config["num_steps"])

#     return loss_accum

def test(config, net, testloader, device="cuda"):
    """Calculate accuracy on full test set."""
    correct = 0
    total = 0
    with torch.no_grad():
        net.eval()
        for data in testloader:
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            outputs, _ = net(images.permute(0, 1, 2, 3, 4))
            accuracy = SF.accuracy_rate(outputs, labels)
            total += labels.size(0)
            correct += accuracy * labels.size(0)

    return 100 * correct / total

In [16]:
model = Net(config)
model.load_state_dict(torch.load(f'nmnist_fp32.pt')) # load the trained model here
model.to(device)
model.eval()
accuracy = test(config, model, testloader, device)
print(f"Original accuracy: {accuracy}")

  model.load_state_dict(torch.load(f'nmnist_fp32.pt')) # load the trained model here


Original accuracy: 98.05


Inference on analog hardware

In [5]:
# install aihwkit libraries first
# !pip install aihwkit

# !pip install -q condacolab
# import condacolab
# condacolab.install()
# !conda install -c conda-forge aihwkit-gpu

In [18]:
# import aihwkit libraries here
from aihwkit.simulator.configs import (
    InferenceRPUConfig,
)
from aihwkit.inference import PCMLikeNoiseModel, ReRamWan2022NoiseModel, GlobalDriftCompensation

rpu_config = InferenceRPUConfig()
# rpu_config.noise_model = PCMLikeNoiseModel(g_max=25.0)  # PCM noise model
rpu_config.noise_model = ReRamWan2022NoiseModel(g_max=80.0) # RRAM noise model

rpu_config.mapping.max_input_size = 256
rpu_config.mapping.max_output_size = 256

rpu_config.forward.out_noise = 0.04
rpu_config.forward.inp_res = 2**8
rpu_config.forward.out_res = 2**8
# rpu_config.drift_compensation = GlobalDriftCompensation() #Enable only for PCM devices

from aihwkit.nn.conversion import convert_to_analog
# from aihwkit.simulator.presets import StandardHWATrainingPreset
from aihwkit.inference.calibration import (
    calibrate_input_ranges,
    InputRangeCalibrationType,
)


In [None]:
# t_inferences = [0.0, 3600.0, 86400.0]  # Times to perform infernece.
t_inferences = [1.0, 86400.0, 172800.0]
n_reps = 5  # Number of inference repetitions.

model = convert_to_analog(model, rpu_config=rpu_config)
model.eval() # Determine the inference accuracy with the specified rpu configuration.
print(f"Evaluating imported model number.")
inference_accuracy_values = torch.zeros((len(t_inferences), n_reps))
for t_id, t in enumerate(t_inferences):
  for i in range(n_reps):
    model.drift_analog_weights(t)
    accuracy = test(config, model, testloader, device)
    inference_accuracy_values[t_id, i] = accuracy
  print(
      f"Test set accuracy (%) at t={t}s: mean: {inference_accuracy_values[t_id].mean()}, \
      std: {inference_accuracy_values[t_id].std()}"
      )

Evaluating imported model number.
Test set accuracy (%) at t=1.0s: mean: 97.32600402832031,       std: 0.1372222900390625
Test set accuracy (%) at t=86400.0s: mean: 94.33600616455078,       std: 2.0802712440490723


In [None]:
import numpy as np
noise_sd_list = np.array([0.10, 0.20, 0.30, 0.40, 0.50])

In [None]:
# instantiate the RPU(Resistive processing unit)

rpu_config = InferenceRPUConfig()
rpu_config.noise_model = ReRamWan2022NoiseModel(g_max=80.0)
# rpu_config.drift_compensation = GlobalDriftCompensation()

In [None]:
# t_inferences = [0.0, 3600.0, 86400.0]  # Times to perform infernece.
t_inferences = [1.0, 86400.0, 172800.0]
n_reps = 5  # Number of inference repetitions.

# check test accuracy after converting weights to analog with different times.
for i in range(len(noise_sd_list)):
    model = Net(config)
    model.load_state_dict(torch.load(f'nmnist_fp32_' + str(int(noise_sd_list[i]*100)) + '.pt')) # load the trained model here
    model.to(device)
    accuracy = test(config, model, testloader, device)
    print(f"Original accuracy: {accuracy}")
    model = convert_to_analog(model, rpu_config=rpu_config)
    model.eval() # Determine the inference accuracy with the specified rpu configuration.
    print(f"Evaluating imported model number {i}.")
    inference_accuracy_values = torch.zeros((len(t_inferences), n_reps))
    for t_id, t in enumerate(t_inferences):
        for i in range(n_reps):
            model.drift_analog_weights(t)
            accuracy = test(config, model, testloader, device)
            inference_accuracy_values[t_id, i] = accuracy
        print(
            f"Test set accuracy (%) at t={t}s: mean: {inference_accuracy_values[t_id].mean()}, \
            std: {inference_accuracy_values[t_id].std()}"
        )

  model.load_state_dict(torch.load(f'nmnist_fp32_' + str(int(noise_sd_list[i]*100)) + '.pt')) # load the trained model here


In [None]:
# t_inferences = [0.0, 3600.0, 86400.0]  # Times to perform infernece.
t_inferences = [1.0, 86400.0, 172800.0]
n_reps = 5  # Number of inference repetitions.

# check test accuracy after converting weights to analog with different times.
for i in range(len(noise_sd_list)):
    model = Net(config)
    model.load_state_dict(torch.load(f'nmnist_fp32_' + str(int(noise_sd_list[i]*100)) + '.pt')) # load the trained model here
    model.to(device)
    accuracy = test(config, model, testloader, device)
    print(f"Original accuracy: {accuracy}")
    model = convert_to_analog(model, rpu_config=rpu_config)
    model.eval() # Determine the inference accuracy with the specified rpu configuration.
    print(f"Evaluating imported model number {i}.")
    inference_accuracy_values = torch.zeros((len(t_inferences), n_reps))
    for t_id, t in enumerate(t_inferences):
        for i in range(n_reps):
            model.drift_analog_weights(t)
            accuracy = test(config, model, testloader, device)
            inference_accuracy_values[t_id, i] = accuracy
        print(
            f"Test set accuracy (%) at t={t}s: mean: {inference_accuracy_values[t_id].mean()}, \
            std: {inference_accuracy_values[t_id].std()}"
        )

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
cd /content/drive/MyDrive/Trained\ Models/SNNs/NMNIST

/content/drive/MyDrive/Trained Models/SNNs/NMNIST


In [4]:
ls

AnalogHWInferencesnnTorchNMNISTNoiseAware.ipynb
[0m[01;34mdata[0m/
nmnist_fp32_10.pt
nmnist_fp32_10step.pt
nmnist_fp32_20.pt
nmnist_fp32_30.pt
nmnist_fp32_40.pt
nmnist_fp32_50.pt
nmnist_fp32_5step.pt
nmnist_fp32.pt
RRAM_AnalogHWInferencesnnTorchNMNISTNoiseAware.ipynb


In [None]:
# Disconnect once finished
from google.colab import runtime
runtime.unassign()