<a href="https://colab.research.google.com/github/gbachu/rice-experiment-mnist.ipynb/blob/main/rice_experiment_mnist.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install spikingjelly

Collecting spikingjelly
  Downloading spikingjelly-0.0.0.0.14-py3-none-any.whl (437 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m437.6/437.6 kB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch->spikingjelly)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch->spikingjelly)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch->spikingjelly)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (14.1 MB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch->spikingjelly)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl (731.7 MB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch->spikingjelly)
  Using cached nvidia_cublas_cu12-12.1.3.1-py3-none-manylinux1_x86_64.whl (410.6 MB)
Collecti

Quadratic & SpikingJellyQuadratic Classes

In [None]:
import torch
from torch import func
from torch import nn
import torch.nn.functional as F
from torch import Tensor

# Based on nn.Linear
# https://pytorch.org/docs/stable/_modules/torch/nn/modules/linear.html#Linear

class Quadratic(nn.Module):

    __constants__ = ['in_features', 'out_features']
    in_features: int
    out_features: int
    fc_1: nn.Linear
    fc_2: nn.Linear

    def __init__(self, in_features: int, out_features: int,
                 bias_1: bool = True, bias_2: bool = True,
                 device=None, dtype=None) -> None:
        factory_kwargs = {'device': device, 'dtype': dtype}
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.fc_1 = nn.Linear(in_features, out_features, bias_1)
        self.fc_2 = nn.Linear(in_features, out_features, bias_2)

    def reset_parameters(self) -> None:
        self.fc_1.reset_parameters()
        self.fc_2.reset_parameters()

    def forward(self, input: Tensor) -> Tensor:
        return self.fc_1(input) * (1 + self.fc_2(input))

    def extra_repr(self) -> str:
        return 'in_features={}, out_features={}, bias_1st={}, bias_2nd={}'.format(
            self.in_features, self.out_features,
            self.fc_1.bias is not None, self.fc_2.bias is not None
        )

In [None]:
import logging

import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from spikingjelly.activation_based import base, functional
from torch import Tensor
from torch.nn.common_types import _size_any_t, _size_1_t, _size_2_t, _size_3_t
from typing import Optional, List, Tuple, Union
from typing import Callable
from torch.nn.modules.batchnorm import _BatchNorm

# Based on SpikingJelly's layer.Linear
class SpikingJellyQuadratic(Quadratic, base.StepModule):
    def __init__(self, in_features: int, out_features: int, bias: bool = True, step_mode='s') -> None:
        super().__init__(in_features, out_features, bias)
        self.step_mode = step_mode

Running Model

In [None]:
import os
import time
import argparse
import sys
import datetime

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
from torch.cuda import amp
from torch.utils.tensorboard import SummaryWriter
import torchvision
import matplotlib.pyplot as plt # Addded for "Plotting"
import numpy as np

from spikingjelly.activation_based import neuron, encoding, functional, surrogate, layer



class SNN(nn.Module):
    def __init__(self, tau):
        super().__init__()

        self.layer = nn.Sequential(
            layer.Flatten(),
            SpikingJellyQuadratic(28 * 28, 10, bias=False), # Replaced layer.Linear with SpikingJellyQuadratic
            neuron.LIFNode(tau=tau, surrogate_function=surrogate.ATan()),
            )

    def forward(self, x: torch.Tensor):
        return self.layer(x)

# Dictionary to Replace Parser
args = {
    "T": 10, # 10
    "device": 'cuda:0',
    "b": 64,
    "epochs": 100, # 100
    "j": 4,
    "data_dir": './datasets/MNIST',
    "out_dir": './logs',
    "amp": 'store_true',
    "opt": 'adam',
    "momentum": 0.9,
    "lr": 1e-3,
    "tau": 2.0
}

def main(params=args):

    #From parser
    print(args)
    net = SNN(tau=args.get("tau"))
    print(net)
    net.to(args.get("device"))

    # For Plotting
    x_values = np.array([])
    y_values = np.array([])


    train_dataset = torchvision.datasets.MNIST(
        root=str(args.get("data_dir")),
        train=True,
        transform=torchvision.transforms.ToTensor(),
        download=True
    )

    test_dataset = torchvision.datasets.MNIST(
        root=str(args.get("data_dir")),
        train=False,
        transform=torchvision.transforms.ToTensor(),
        download=True
    )

    train_data_loader = data.DataLoader(
        dataset=train_dataset,
        batch_size=args.get("b"),
        shuffle=True,
        drop_last=True,
        num_workers=args.get("j"),
        pin_memory=True
    )

    test_data_loader = data.DataLoader(
        dataset=test_dataset,
        batch_size=args.get("b"),
        shuffle=False,
        drop_last=False,
        num_workers=args.get("j"),
        pin_memory=True
    )

    scaler = None
    if args.get("amp"):
        scaler = amp.GradScaler()

    start_epoch = 0
    max_test_acc = -1

    optimizer = None
    if args.get("opt") == 'sgd':
        optimizer = torch.optim.SGD(net.parameters(), lr=args.get("lr"), momentum=args.get("momentum"))
    elif args.get("opt") == 'adam':
        optimizer = torch.optim.Adam(net.parameters(), lr=args.get("lr"))
    else:
        raise NotImplementedError(args.get("opt"))

    out_dir = os.path.join(str(args.get("out_dir")), f'T{str(args.get("T"))}_b{str(args.get("b"))}_{str(args.get("opt"))}_lr{str(args.get("lr"))}')
    out_dir += '__' + datetime.datetime.today().strftime('%Y-%m-%d-%H-%M')

    if args.get("amp"):
        out_dir += '_amp'

    os.makedirs(out_dir, exist_ok=True)

    with open(os.path.join(out_dir, 'args.txt'), 'w', encoding='utf-8') as args_txt:
        args_txt.write(str(args))

    writer = SummaryWriter(out_dir, purge_step=start_epoch)
    with open(os.path.join(out_dir, 'args.txt'), 'w', encoding='utf-8') as args_txt:
        args_txt.write(str(args))
        args_txt.write('\n')
        args_txt.write(' '.join(sys.argv))

    encoder = encoding.PoissonEncoder()

    for epoch in range(start_epoch, args.get("epochs")):
        start_time = time.time()
        net.train()
        train_loss = 0
        train_acc = 0
        train_samples = 0
        for img, label in train_data_loader:
            optimizer.zero_grad()
            img = img.to(args.get("device"))
            label = label.to(args.get("device"))
            label_onehot = F.one_hot(label, 10).float()

            if scaler is not None:
                with amp.autocast():
                    out_fr = 0.
                    for t in range(args.get("T")):
                        encoded_img = encoder(img)
                        out_fr += net(encoded_img)
                    out_fr = out_fr / args.get("T")
                    loss = F.mse_loss(out_fr, label_onehot)
                scaler.scale(loss).backward()
                scaler.step(optimizer)
                scaler.update()
            else:
                out_fr = 0.
                for t in range(args.T):
                    encoded_img = encoder(img)
                    out_fr += net(encoded_img)
                out_fr = out_fr / args.T
                loss = F.mse_loss(out_fr, label_onehot)
                loss.backward()
                optimizer.step()

            train_samples += label.numel()
            train_loss += loss.item() * label.numel()
            train_acc += (out_fr.argmax(1) == label).float().sum().item()

            functional.reset_net(net)

        train_time = time.time()
        train_speed = train_samples / (train_time - start_time)
        train_loss /= train_samples
        train_acc /= train_samples

        writer.add_scalar('train_loss', train_loss, epoch)
        writer.add_scalar('train_acc', train_acc, epoch)

        net.eval()
        test_loss = 0
        test_acc = 0
        test_samples = 0
        with torch.no_grad():
            for img, label in test_data_loader:
                img = img.to(args.get("device"))
                label = label.to(args.get("device"))
                label_onehot = F.one_hot(label, 10).float()
                out_fr = 0.
                for t in range(args.get("T")):
                    encoded_img = encoder(img)
                    out_fr += net(encoded_img)
                out_fr = out_fr / args.get("T")
                loss = F.mse_loss(out_fr, label_onehot)

                test_samples += label.numel()
                test_loss += loss.item() * label.numel()
                test_acc += (out_fr.argmax(1) == label).float().sum().item()
                functional.reset_net(net)
        test_time = time.time()
        test_speed = test_samples / (test_time - train_time)
        test_loss /= test_samples
        test_acc /= test_samples
        writer.add_scalar('test_loss', test_loss, epoch)
        writer.add_scalar('test_acc', test_acc, epoch)

        save_max = False
        if test_acc > max_test_acc:
            max_test_acc = test_acc
            save_max = True

        checkpoint = {
            'net': net.state_dict(),
            'optimizer': optimizer.state_dict(),
            'epoch': epoch,
            'max_test_acc': max_test_acc
        }

        if save_max:
            torch.save(checkpoint, os.path.join(out_dir, 'checkpoint_max.pth'))

        torch.save(checkpoint, os.path.join(out_dir, 'checkpoint_latest.pth'))

        print(args)
        print(out_dir)
        print(f'epoch ={epoch}, train_loss ={train_loss: .4f}, train_acc ={train_acc: .4f}, test_loss ={test_loss: .4f}, test_acc ={test_acc: .4f}, max_test_acc ={max_test_acc: .4f}')
        print(f'train speed ={train_speed: .4f} images/s, test speed ={test_speed: .4f} images/s')
        print(f'escape time = {(datetime.datetime.now() + datetime.timedelta(seconds=(time.time() - start_time) * (args.get("epochs") - epoch))).strftime("%Y-%m-%d %H:%M:%S")}\n')

        # For Plotting
        x_values = np.append(x_values, epoch)
        y_values = np.append(y_values, max_test_acc)


    net.eval()
    output_layer = net.layer[-1]
    output_layer.v_seq = []
    output_layer.s_seq = []
    def save_hook(m, x, y):
        m.v_seq.append(m.v.unsqueeze(0))
        m.s_seq.append(y.unsqueeze(0))

    output_layer.register_forward_hook(save_hook)


    with torch.no_grad():
        img, label = test_dataset[0]
        img = img.to(args.get("device"))
        out_fr = 0.
        for t in range(int(args.get("T"))):
            encoded_img = encoder(img)
            out_fr += net(encoded_img)
        out_spikes_counter_frequency = (out_fr / args.get("T")).cpu().numpy()
        print(f'Firing rate: {out_spikes_counter_frequency}')

        output_layer.v_seq = torch.cat(output_layer.v_seq)
        output_layer.s_seq = torch.cat(output_layer.s_seq)
        v_t_array = output_layer.v_seq.cpu().numpy().squeeze()  # v_t_array[i][j]
        np.save("v_t_array.npy",v_t_array)
        s_t_array = output_layer.s_seq.cpu().numpy().squeeze()  # s_t_array[i][j]
        np.save("s_t_array.npy",s_t_array)

    # Plotting
    print("X-Values: Epochs ", x_values)
    print("Y-Values: Max Test Accuracy ", y_values)
    plt.xlabel("Epochs")
    plt.ylabel("Max Test Accuracy")
    plt.plot(x_values, y_values)
    plt.show()

if __name__ == '__main__':
    main()

{'T': 10, 'device': 'cuda:0', 'b': 64, 'epochs': 100, 'j': 4, 'data_dir': './datasets/MNIST', 'out_dir': './logs', 'amp': 'store_true', 'opt': 'adam', 'momentum': 0.9, 'lr': 0.001, 'tau': 2.0}
SNN(
  (layer): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1, step_mode=s)
    (1): SpikingJellyQuadratic(
      in_features=784, out_features=10, bias_1st=False, bias_2nd=True
      (fc_1): Linear(in_features=784, out_features=10, bias=False)
      (fc_2): Linear(in_features=784, out_features=10, bias=True)
    )
    (2): LIFNode(
      v_threshold=1.0, v_reset=0.0, detach_reset=False, step_mode=s, backend=torch, tau=2.0
      (surrogate_function): ATan(alpha=2.0, spiking=True)
    )
  )
)
{'T': 10, 'device': 'cuda:0', 'b': 64, 'epochs': 100, 'j': 4, 'data_dir': './datasets/MNIST', 'out_dir': './logs', 'amp': 'store_true', 'opt': 'adam', 'momentum': 0.9, 'lr': 0.001, 'tau': 2.0}
./logs/T10_b64_adam_lr0.001__2024-06-13-00-01_amp
epoch =0, train_loss = 0.0222, train_acc = 0.8815, test_loss

* Output with Linear (100 Epochs, 100 T): 2 hrs, 19 Mins, Max Test Accuracy: 0.9293
* Output with Quadratic (100 Epochs, 100 T): 10326.091 s (2 hrs, 52 Mins), Max Test Accuracy: 0.9445
* Output with Linear (100 Epochs, 10 T): 28.9 Minutes, Max Test Accuracy: 0.9256
* **Output with Quadratic (100 Epochs, 10 T): 33 Minutes, Max Test Accuracy: 0.9413**