In [3]:
import torch
import torch.nn as nn
from spikingjelly.activation_based import layer, surrogate, neuron
from copy import deepcopy
from spikingjelly.datasets.n_mnist import NMNIST
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import torch.nn.functional as F
from torch.cuda import amp
from spikingjelly.activation_based import functional
from torch import optim
 

In [1]:
from poisoned_dataset import PoisonedDataset
from datasets import get_dataset
import numpy as np
from spikingjelly.datasets.n_mnist import NMNIST

train_set = NMNIST("data/mnist", train=False, data_type='frame',
                           split_by='number', frames_number=16)


data = np.array([np.array(x[0])for x in train_set])
train_set = []
print(data.shape)
# clean_train_data = PoisonedDataset(train_data,0, mode='train', epsilon=0,
#                                  pos="top-left", attack_type="static", time_step=16,
#                                  trigger_size=0.1, dataname="mnist",
#                                  polarity=1, n_masks=2, least=True, most_polarity=True, frame_gap = 1)
# poisoned_train_data = PoisonedDataset(train_data,0, mode='train', epsilon=1,
#                                  pos="top-left", attack_type="static", time_step=16,
#                                  trigger_size=0.1, dataname="mnist",
#                                  polarity=1, n_masks=2, least=True, most_polarity=True, frame_gap = 1)

The directory [data/mnist\frames_number_16_split_by_number] already exists.
(10000, 16, 2, 34, 34)


In [2]:
def create_static_trigger(pos, polarity , data, size_width, size_height, width, height):
    # print("======>",data.shape)
    if pos == 'top-left':
        x_begin = 0
        x_end = size_width
        y_begin = 0
        y_end = size_height

    elif pos == 'top-right':
        x_begin = int(width - size_width)
        x_end = width
        y_begin = 0
        y_end = size_height

    elif pos == 'bottom-left':
        x_begin = 0
        x_end = size_width
        y_begin = int(height - size_height)
        y_end = height

    elif pos == 'bottom-right':
        x_begin = int(width - size_width)
        x_end = width
        y_begin = int(height - size_height)
        y_end = height

    elif pos == 'middle':
        x_begin = int((width - size_width) / 2)
        x_end = int((width + size_width) / 2)
        y_begin = int((height - size_height) / 2)
        y_end = int((height + size_height) / 2)

    elif pos == 'random':
        x_begin = np.random.randint(0, int(width-size_width))
        x_end = x_begin + size_width
        y_begin = np.random.randint(0, int(height - size_height))
        y_end = y_begin + size_height

    # The shape of the data is (N, T, C, H, W)
    if polarity == 0:
        data[:, :, :, y_begin:y_end, x_begin:x_end] = 0
    elif polarity == 1:
        data[:, :, 0, y_begin:y_end, x_begin:x_end] = 0
        data[:, :, 1, y_begin:y_end, x_begin:x_end] = 1
    elif polarity == 2:
        data[:, :, 0, y_begin:y_end, x_begin:x_end] = 1
        data[:, :, 1, y_begin:y_end, x_begin:x_end] = 0
    else:
        data[:, :, :, y_begin:y_end, x_begin:x_end] = 1

    return data

In [3]:
poison = create_static_trigger("top-left", 1 , data, 3, 3, 34, 34)
print(poison.shape)

(10000, 16, 2, 34, 34)


In [12]:
import torch
import torch.nn.functional as F

def ssim(img1, img2, C1=0.01**2, C2=0.03**2):
    """
    Compute SSIM (Structural Similarity Index) between two images.
    :param img1: First image (torch tensor)
    :param img2: Second image (torch tensor)
    :param C1: Stabilization constant 1
    :param C2: Stabilization constant 2
    :return: SSIM value
    """
    mu1 = F.avg_pool2d(img1, 3, 1, 0)
    mu2 = F.avg_pool2d(img2, 3, 1, 0)
    
    sigma1 = F.avg_pool2d(img1 * img1, 3, 1, 0) - mu1 * mu1
    sigma2 = F.avg_pool2d(img2 * img2, 3, 1, 0) - mu2 * mu2
    sigma12 = F.avg_pool2d(img1 * img2, 3, 1, 0) - mu1 * mu2

    ssim_map = ((2 * mu1 * mu2 + C1) * (2 * sigma12 + C2)) / ((mu1 * mu1 + mu2 * mu2 + C1) * (sigma1 + sigma2 + C2))
    return ssim_map.mean()

def calculate_average_ssim(data_clean, data_backdoored):
    """
    Calculate the average SSIM over all samples, frames, and polarities.
    :param data_clean: Clean data (numpy array of shape [N, T, C, H, W])
    :param data_backdoored: Backdoored data (numpy array of shape [N, T, C, H, W])
    :return: Average SSIM value
    """
    data_clean = torch.tensor(data_clean, dtype=torch.float32)
    data_backdoored = torch.tensor(data_backdoored, dtype=torch.float32)
    
    N, T, C, H, W = data_clean.shape
    total_ssim = 0.0
    count = 0
    
    for i in range(N):
        for t in range(T):
            for c in range(C):
                img_clean = data_clean[i, t, c].unsqueeze(0).unsqueeze(0)
                img_backdoored = data_backdoored[i, t, c].unsqueeze(0).unsqueeze(0)
                total_ssim += ssim(img_clean, img_backdoored)
                count += 1
                
    average_ssim = total_ssim / count
    return average_ssim


In [13]:
average_ssim_value = calculate_average_ssim(data, poison)
print("Average SSIM:", average_ssim_value)

Average SSIM: tensor(1.)


In [4]:
frames_number = 16
epochs=1
batch_size=64
lr=0.001
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
scaler = None

In [5]:

class MNISTNet(nn.Module):
    def __init__(self, channels=128, spiking_neuron: callable = None, **kwargs):
        super().__init__()

        self.conv_fc = nn.Sequential(
            layer.Conv2d(1, channels, kernel_size=3, padding=1, bias=False),
            layer.BatchNorm2d(channels),
            spiking_neuron(**deepcopy(kwargs)),
            layer.MaxPool2d(2, 2),

            layer.Conv2d(channels, channels, kernel_size=3,
                         padding=1, bias=False),
            layer.BatchNorm2d(channels),
            spiking_neuron(**deepcopy(kwargs)),
            layer.MaxPool2d(2, 2),

            layer.Flatten(),
            layer.Dropout(0.5),
            layer.Linear(channels * 7 * 7, 2048),
            spiking_neuron(**deepcopy(kwargs)),
            layer.Dropout(0.5),
            layer.Linear(2048, 100),
            spiking_neuron(**deepcopy(kwargs)),
            layer.VotingLayer()
        )

    def forward(self, x: torch.Tensor):
        return self.conv_fc(x)


class NMNISTNet(MNISTNet):
    def __init__(self, channels=128, spiking_neuron: callable = None, **kwargs):
        super().__init__(channels, spiking_neuron, **kwargs)
        self.conv_fc[0] = layer.Conv2d(
            2, channels, kernel_size=3, padding=1, bias=False)
        self.conv_fc[-6] = layer.Linear(channels * 8 * 8, 2048)

In [7]:
train_set = NMNIST("data/mnist", train=True, data_type='frame',
                           split_by='number', frames_number=frames_number)

test_set = NMNIST("data/mnist", train=False, data_type='frame',
                          split_by='number', frames_number=frames_number)

train_data_loader = DataLoader(
        dataset=train_set, batch_size=batch_size, shuffle=True, num_workers=1)
test_data_loader = DataLoader(
        dataset=test_set, batch_size=batch_size, shuffle=False, num_workers=1)

The directory [data/mnist\frames_number_16_split_by_number] already exists.
The directory [data/mnist\frames_number_16_split_by_number] already exists.


In [8]:
model = NMNISTNet(spiking_neuron=neuron.IFNode,surrogate_function=surrogate.ATan(),  detach_reset=True)

In [9]:

def train(model, train_loader, optimizer, criterion, device, scaler=None, scheduler=None):
    # Train the model
    model.train()
    train_loss = 0
    train_acc = 0
    train_samples = 0
    try:
        n_classes = len(train_loader.dataset.classes)
    except:
        n_classes = 10

    for frame, label in tqdm(train_loader):
        optimizer.zero_grad()
        frame = frame.to(device)
        frame = frame.transpose(0, 1)  # [N, T, C, H, W] -> [T, N, C, H, W]
        label = label.to(device)
        # If label is not one-hot,
        if len(label.shape) == 1:
            label = F.one_hot(label, n_classes).float()

        if scaler is not None:
            with amp.autocast():
                # Mean is important; (https://spikingjelly.readthedocs.io/zh_CN/latest/activation_based_en/conv_fashion_mnist.html)
                # we need to average the output in the time-step dimension to get the firing rates,
                # and then calculate the loss and accuracy by the firing rates
                out_fr = model(frame).mean(0)
                loss = criterion(out_fr, label)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            out_fr = model(frame).mean(0)
            loss = criterion(out_fr, label)
            loss.backward()
            optimizer.step()

        label = label.argmax(1)
        train_samples += label.numel()
        train_loss += loss.item() * label.numel()
        train_acc += (out_fr.argmax(1) == label).float().sum().item()

        functional.reset_net(model)

    train_loss /= train_samples
    train_acc /= train_samples

    if scheduler is not None:
        scheduler.step()

    return train_loss, train_acc


def evaluate(model, test_loader, criterion, device):
    model.eval()
    test_loss = 0
    test_acc = 0
    test_samples = 0
    with torch.no_grad():
        for frame, label in tqdm(test_loader):
            frame = frame.to(device)
            # [N, T, C, H, W] -> [T, N, C, H, W]
            frame = frame.transpose(0, 1)
            label = label.to(device)
            # label_onehot = F.one_hot(label, 11).float()
            out_fr = model(frame).mean(0)
            loss = criterion(out_fr, label)

            label = label.argmax(1)
            test_samples += label.numel()
            test_loss += loss.item() * label.numel()
            test_acc += (out_fr.argmax(1) == label).float().sum().item()

            functional.reset_net(model)

    test_loss /= test_samples
    test_acc /= test_samples

    return test_loss, test_acc

In [11]:
list_train_loss = []
list_train_acc = []
list_test_loss = []
list_test_acc = []
list_test_loss_backdoor = []
list_test_acc_backdoor = []

functional.set_step_mode(model, 'm')
functional.set_backend(model, 'cupy', instance=neuron.LIFNode)
model = model.to(device)
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, epochs)
criterion = nn.MSELoss()

print(f'\n[!] Training the model for {epochs} epochs')
print(f'\n[!] Trainset size is {len(train_data_loader.dataset)},'
        f'Testset size is {len(test_data_loader.dataset)}')

for epoch in range(epochs):

    train_loss, train_acc = train(
        model, train_data_loader, optimizer, criterion, device, scaler, scheduler)

    test_loss_clean, test_acc_clean = evaluate(
        model, test_data_loader, criterion, device)


    list_train_loss.append(train_loss)
    list_train_acc.append(train_acc)
    list_test_loss.append(test_loss_clean)
    list_test_acc.append(test_acc_clean)


    print(f'\n[!] Epoch {epoch + 1}/{epochs} '
            f'Train loss: {train_loss:.4f} '
            f'Train acc: {train_acc:.4f} '
            f'Test acc: {test_acc_clean:.4f} ')



[!] Training the model for 1 epochs

[!] Trainset size is 60000,Testset size is 10000


100%|██████████| 938/938 [08:31<00:00,  1.83it/s]
  return F.mse_loss(input, target, reduction=self.reduction)
  0%|          | 0/157 [00:08<?, ?it/s]


RuntimeError: The size of tensor a (10) must match the size of tensor b (64) at non-singleton dimension 1