In [1]:
import numpy as np
import cv2
import matplotlib.pyplot as plt
import os
import argparse
import pickle as pkl
import time
from copy import deepcopy
import torch.optim.lr_scheduler as lr_scheduler
from tqdm import tqdm

In [2]:
import os
import pandas as pd
from torchvision.io import read_image
import torch
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt
from random import shuffle
import torch.nn.functional as F
from torchvision.transforms import v2
from torch import nn



### Load model

In [3]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None, end_maxpool = False):
        super(ResidualBlock, self).__init__()
        if(downsample is not None):
            self.conv1 = nn.Sequential(
                            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding='same'),
                            nn.BatchNorm2d(out_channels),
                            nn.ReLU(inplace=False),
                            nn.MaxPool2d(kernel_size=2, stride=2)
                            )  # Changed inplace to False
        else:
            self.conv1 = nn.Sequential(
                            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding='same'),
                            nn.BatchNorm2d(out_channels),
                            nn.Hardtanh(min_val=-1.0, max_val=1.0, inplace=False)
                            )
        self.conv2 = nn.Sequential(
                        nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1),
                        nn.BatchNorm2d(out_channels),
                        nn.Hardtanh(min_val=-1.0, max_val=1.0, inplace=False))  # Changed inplace to False
        self.downsample = downsample
        self.relu = nn.Hardtanh(min_val=-1.0, max_val=1.0, inplace=False)  # Changed inplace to False
        self.out_channels = out_channels
        self.end_maxpool = end_maxpool

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.conv2(out)
        if self.downsample:
            residual = self.downsample(x)
        out = out + residual
        if self.end_maxpool:
            out = F.relu(out, inplace=False)
        else:
            out = F.hardtanh(out, inplace=False, min_val=-1.0, max_val=1.0)   # Use non-in-place ReLU
        return out

class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes = 2, in_channels = 5):
        super(ResNet, self).__init__()
        self.inplanes = 64
        self.conv1 = nn.Sequential(
                        nn.Conv2d(in_channels, 64, kernel_size = 7, stride = 1, padding = 3),
                        nn.BatchNorm2d(64),
                        nn.ReLU(inplace=False))
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        self.layer0 = self._make_layer(block, 64, layers[0], stride = 1)
        self.layer1 = self._make_layer(block, 128, layers[1], stride = 2)
        self.layer2 = self._make_layer(block, 256, layers[2], stride = 2)
        self.layer3 = self._make_layer(block, 512, layers[3], stride = 2, end_maxpool = True)
        self.avgpool = nn.MaxPool2d(7, stride=1)
        self.fc = nn.Linear(2048, 128)
        self.fc2 = nn.Linear(128, num_classes)
        self.dropout = nn.Dropout(0.1)

    def _make_layer(self, block, planes, blocks, stride=1, end_maxpool = False):
        downsample = None
        if stride != 1 or self.inplanes != planes:

            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes, kernel_size=1, stride=1, padding='same'),
                nn.BatchNorm2d(planes),
                nn.ReLU(inplace=False),
                nn.MaxPool2d(kernel_size=2, stride=2)
            )
        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes
        for i in range(1, blocks):
            if i == blocks-1 and end_maxpool:
                layers.append(block(self.inplanes, planes, end_maxpool = True))
            else:
                layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.maxpool(x)
        x = self.layer0(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        x = F.hardtanh(x)
        # x = self.dropout(x)
        x = self.fc2(x)
        return x

In [4]:
model_resnet = ResNet(ResidualBlock, [3, 4, 6, 3], in_channels = 12)
model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet34', pretrained=True)

size_layer = [3, 4, 6, 3]
layers = [(model_resnet.layer0, model.layer1), (model_resnet.layer1, model.layer2), (model_resnet.layer2, model.layer3), (model_resnet.layer3, model.layer4)]
for n in range(len(layers)):
    for i in range(size_layer[n]):
        layers[n][0][i].conv1[0] = layers[n][1][i].conv1
        layers[n][0][i].conv1[0].stride = 1
        layers[n][0][i].conv2[0] = layers[n][1][i].conv2
        layers[n][0][i].conv2[0].stride = 1
        
        layers[n][0][i].conv1[1] = layers[n][1][i].bn1
        layers[n][0][i].conv2[1] = layers[n][1][i].bn2

model_resnet.layer1[0].downsample[0] = model.layer2[0].downsample[0]
model_resnet.layer1[0].downsample[0].stride = 1
model_resnet.layer1[0].downsample[1] = model.layer2[0].downsample[1]

model_resnet.layer2[0].downsample[0] = model.layer3[0].downsample[0]
model_resnet.layer2[0].downsample[0].stride = 1
model_resnet.layer2[0].downsample[1] = model.layer3[0].downsample[1]

model_resnet.layer3[0].downsample[0] = model.layer4[0].downsample[0]
model_resnet.layer3[0].downsample[0].stride = 1
model_resnet.layer3[0].downsample[1] = model.layer4[0].downsample[1]

model_resnet.to("cpu")


model_resnet.load_state_dict(torch.load("best_resnet_nCars_Hardtanh_ReLUmaxpool_EST_FC2__128x128_pretrained_aug_correct.pt"))
model_resnet.eval()
print(model_resnet)

Using cache found in C:\Users\nikos/.cache\torch\hub\pytorch_vision_v0.10.0


ResNet(
  (conv1): Sequential(
    (0): Conv2d(12, 64, kernel_size=(7, 7), stride=(1, 1), padding=(3, 3))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer0): Sequential(
    (0): ResidualBlock(
      (conv1): Sequential(
        (0): Conv2d(64, 64, kernel_size=(3, 3), stride=1, padding=(1, 1), bias=False)
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): Hardtanh(min_val=-1.0, max_val=1.0)
      )
      (conv2): Sequential(
        (0): Conv2d(64, 64, kernel_size=(3, 3), stride=1, padding=(1, 1), bias=False)
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): Hardtanh(min_val=-1.0, max_val=1.0)
      )
      (relu): Hardtanh(min_val=-1.0, max_val=1.0)
    )
    (1): ResidualBlock(
      (conv1): Sequential(
        (0): Con

In [5]:
from SNN.Check import fuse_conv_and_bn
from SNN.Check import SpikingConv2D
from SNN.Check import MaxMinPool2D
from SNN.Check import LayerSNN_all
from SNN.Check import SpikingDense_positive_tanH, SpikingDense_all_all

In [6]:

class ResNet_ttfs(nn.Module):
    def __init__(self, model : ResNet):
        super(ResNet_ttfs, self).__init__()
        model.eval()
        robustness_params={
            'noise':0.0,
            'time_bits':0,
            'weight_bits': 0,
            'latency_quantiles':0.0
        }
        model.to('cpu')
        conv_fused = fuse_conv_and_bn(model.conv1[0], model.conv1[1], device = 'cpu')
        self.conv_first = SpikingConv2D(64, "temp1", device = 'cpu', padding=(3,3), stride=1, kernel_size=(7,7),robustness_params=robustness_params, kernels=conv_fused.weight.data, biases= conv_fused.bias.data)
        max_vect = torch.tensor([1]*12)
        tmin, tmax, max_vect, scalar = self.conv_first.set_params(0,1,max_vect)
        self.tmin_post_pool = tmax
        self.pool = MaxMinPool2D(3, tmax.data,2,1).to("cpu")
        self.layer0SNN = LayerSNN_all(model.layer0, 64, 64, 3,device = 'cpu')
        tmax_prev = tmax
        tmin, tmax, max_vect, scalar = self.layer0SNN.set_params(tmin, tmax, torch.concat((max_vect, torch.zeros(max_vect.shape))), in_scalar=scalar)
        self.layer1SNN = LayerSNN_all(model.layer1, 64, 128, 4,device = 'cpu')
        tmin, tmax, max_vect, scalar = self.layer1SNN.set_params(tmin, tmax, max_vect, in_scalar=scalar)
        self.layer2SNN = LayerSNN_all(model.layer2, 128, 256, 6,device = 'cpu')
        tmin, tmax, max_vect, scalar = self.layer2SNN.set_params(tmin, tmax, max_vect, in_scalar=scalar)
        self.layer3SNN = LayerSNN_all(model.layer3, 256, 512, 3,device = 'cpu',end_maxpool=True)
        tmin, tmax, max_vect, scalar = self.layer3SNN.set_params(tmin, tmax, max_vect, in_scalar=scalar)
        self.pool2 = MaxMinPool2D(7, tmax.data,1,0).to("cpu")
        temp = torch.ones((512,2,2))
        max_vect = ((temp.T)*max_vect[:512]).T
        max_vect = max_vect.view(-1)
        self.layer_fc = SpikingDense_positive_tanH(128,2048, '',model.fc.weight, model.fc.bias,robustness_params=robustness_params)
        t_min, t_max, max_vect_temp, scalar = self.layer_fc.set_params(tmin, tmax, max_vect, in_scalar=scalar)
        self.layer_fc2 = SpikingDense_all_all(2,128, '',model.fc2.weight, model.fc2.bias,robustness_params=robustness_params)
        t_min, t_max, max_vect_temp, scalar = self.layer_fc2.set_params(t_min, t_max, max_vect_temp, in_scalar=scalar)
        self.tmin, self.tmax = t_min, t_max
        self.scalar = scalar
    
    def forward(self, tj):
        x = self.conv_first(tj)
        x = self.pool(x)
        # print(x.shape)
        x = torch.concat((x, torch.ones(x.shape) * self.tmin_post_pool),dim=1)
        x = self.layer0SNN(x)
        x = self.layer1SNN(x)
        x = self.layer2SNN(x)
        x = self.layer3SNN(x)
        # print(x[1].max(),x[1].min())
        x = self.pool2(x[:512])
        x = x.view(x.size(0), -1)
        x = self.layer_fc(x)
        x = self.layer_fc2(x)
        return x




In [7]:
model_ttfs = ResNet_ttfs(model_resnet)

  self.t_max = torch.tensor(t_min + self.B_n*max_V, dtype=torch.float64, requires_grad=False)
  self.t_max = torch.tensor(max(t_min + self.B_n*max_V, minimal_t_max), dtype=torch.float64, requires_grad=False)
  self.t_min = torch.tensor(t_min, dtype=torch.float64, requires_grad=False)
  self.t_min = torch.tensor(t_min, dtype=torch.float64, requires_grad=False)
  self.t_min = torch.tensor(t_min, dtype=torch.float64, requires_grad=False)
  self.t_max = torch.tensor(t_min + self.B_n*max_V, dtype=torch.float64, requires_grad=False)
  self.t_min = torch.tensor(t_min, dtype=torch.float64, requires_grad=False)
  self.t_max = torch.tensor(max(t_min + self.B_n*max_V, minimal_t_max), dtype=torch.float64, requires_grad=False)
  self.t_min_prev = torch.tensor(t_min_prev, dtype=torch.float64, requires_grad=False)
  self.t_min_prev = torch.tensor(t_min_prev, dtype=torch.float64, requires_grad=False)
  self.t_min = torch.tensor(t_min, dtype=torch.float64, requires_grad=False)
  self.t_max = torch.tens

In [8]:
temp = torch.rand((1,12,128,128))
temp_ttfs = 1 - temp
print(model_ttfs.forward(temp_ttfs))
(model_ttfs.tmax - model_ttfs.forward(temp_ttfs))*model_ttfs.scalar

tensor([[5.3583, 5.3643, 5.3643, 5.3583]], grad_fn=<WhereBackward0>)


tensor([[1.3687, 0.0000, 0.0000, 1.3687]], grad_fn=<MulBackward0>)

In [9]:
model_resnet(temp)

tensor([[ 1.3686, -1.3686]], grad_fn=<AddmmBackward0>)

In [10]:
transforms = v2.Compose([
    # v2.RandomResizedCrop(size=(224, 224), antialias=True),
    v2.RandomHorizontalFlip(p=0.5),
    v2.RandomPerspective(0.4),
    v2.ToDtype(torch.float32)
])

class NCarsImageDataset(Dataset):
    def __init__(self, img_dir_file, transform=None, target_transform=None):
        self.images = np.load(img_dir_file + '_x.npy')
        self.labels = np.load(img_dir_file + '_y.npy')
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        # if self.transform:
        #     image = self.transform(image)
        # if self.target_transform:
        #     label = self.target_transform(label)
        label_temp = np.zeros((2,))
        label_temp[label] = 1
        if self.stage == 0:
            return self.transform(torch.tensor(image)), torch.tensor(label_temp)
        else:
            return torch.tensor(image), torch.tensor(label_temp)
    
    def set_stage(self, stage):
        self.stage = stage

In [11]:
from torch.utils.data import DataLoader
# training_data = NCarsImageDataset("./Datasety/nCars_train_EST_exp_", transform=transforms)
test_data = NCarsImageDataset("./Datasety/nCars_test_EST_exp_", transform=transforms)
test_data.set_stage(1)

In [12]:
temp_data, temp_label = test_data[10]
temp_ttfs = (1 - temp_data).unsqueeze(0)
print(temp_data.shape)
print(model_ttfs.forward(temp_ttfs))
(model_ttfs.tmax - model_ttfs.forward(temp_ttfs))*model_ttfs.scalar

torch.Size([12, 128, 128])
tensor([[5.3643, 5.3593, 5.3593, 5.3643]], grad_fn=<WhereBackward0>)


tensor([[0.0000, 1.1571, 1.1571, 0.0000]], grad_fn=<MulBackward0>)

In [13]:
model_resnet(temp_data.unsqueeze(0))

tensor([[-1.1563,  1.1563]], grad_fn=<AddmmBackward0>)