In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import time
from datetime import datetime as dt
from torchvision import datasets, transforms
import pickle
!pip install torchattacks==3.3.0


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting torchattacks
  Downloading torchattacks-3.3.0-py3-none-any.whl (155 kB)
[K     |████████████████████████████████| 155 kB 4.7 MB/s 
[?25hInstalling collected packages: torchattacks
Successfully installed torchattacks-3.3.0


# Konstante za ponovne treninge (da bude uvek isto)

In [None]:
torch.manual_seed(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(42)

# Konfiguracija modela

In [None]:
OMEGA = 1
HIDDEN1 = 120
HIDDEN2 = 84
CONV_OUT = 16 * 5 * 5
NUM_CLASSES = 10

LR = 0.001
MOM = 0.9

# Glavna klasa modela

In [None]:
from torchvision.transforms.transforms import ToTensor
class Net(nn.Module):
    def __init__(self, net_type):
        super(Net, self).__init__()

        self.net_type = net_type

        if ('normal' in self.net_type) or ('hybrid_nor' in self.net_type) or ('synergy_nor' in self.net_type) or ('synergy_all' in self.net_type):
           self.conv1_normal = nn.Conv2d(3, 6, 5)
           self.pool_normal = nn.MaxPool2d(2, 2)
           self.conv2_normal = nn.Conv2d(6, 16, 5)

        if ('negative' in self.net_type) or ('hybrid_neg' in self.net_type) or ('synergy_neg' in self.net_type) or ('synergy_all' in self.net_type):
           self.conv1_negative = nn.Conv2d(3, 6, 5)
           self.pool_negative = nn.MaxPool2d(2, 2)
           self.conv2_negative = nn.Conv2d(6, 16, 5)

        if ('normal' in self.net_type) or ('synergy_nor' in self.net_type) or ('synergy_all' in self.net_type):
           self.fc1_normal = nn.Linear(CONV_OUT, HIDDEN1)
           self.fc2_normal = nn.Linear(HIDDEN1, HIDDEN2)
           self.fc3_normal = nn.Linear(HIDDEN2, NUM_CLASSES)

        if ('hybrid_nor' in self.net_type) or ('synergy_nor' in self.net_type) or ('synergy_all' in self.net_type):
           self.fc1_normal_n = nn.Linear(CONV_OUT, HIDDEN1)
           self.fc2_normal_n = nn.Linear(HIDDEN1, HIDDEN2)
           self.fc3_normal_n = nn.Linear(HIDDEN2, NUM_CLASSES)

        if ('hybrid_neg' in self.net_type) or ('synergy_neg' in self.net_type) or ('synergy_all' in self.net_type):
           self.fc1_negative = nn.Linear(CONV_OUT, HIDDEN1)
           self.fc2_negative = nn.Linear(HIDDEN1, HIDDEN2)
           self.fc3_negative = nn.Linear(HIDDEN2, NUM_CLASSES)

        if ('negative' in self.net_type) or ('synergy_neg' in self.net_type) or ('synergy_all' in self.net_type):
           self.fc1_negative_n = nn.Linear(CONV_OUT, HIDDEN1)
           self.fc2_negative_n = nn.Linear(HIDDEN1, HIDDEN2)
           self.fc3_negative_n = nn.Linear(HIDDEN2, NUM_CLASSES)

    def forward(self, x):
        # conv block:
        if ('normal' in self.net_type) or ('hybrid_nor' in self.net_type) or ('synergy_nor' in self.net_type) or ('synergy_all' in self.net_type):
           x_normal = self.pool_normal(F.relu(self.conv1_normal(x)))
           x_normal = self.pool_normal(F.relu(self.conv2_normal(x_normal)))
           x_normal = x_normal.view(-1, CONV_OUT)

        if ('negative' in self.net_type) or ('hybrid_neg' in self.net_type) or ('synergy_neg' in self.net_type) or ('synergy_all' in self.net_type):
           x_negative = self.pool_negative(F.relu(self.conv1_negative(x)))
           x_negative = self.pool_negative(F.relu(self.conv2_negative(x_negative)))
           x_negative = x_negative.view(-1, CONV_OUT)

        # fc block:
        if 'synergy_nor' in self.net_type:
            x_normal_normal = self.fc3_normal(F.relu(self.fc2_normal(F.relu(self.fc1_normal(x_normal)))))
            x_normal_negative = self.fc3_normal_n(F.relu(self.fc2_normal_n(F.relu(self.fc1_normal_n(1 - x_normal)))))

            o = x_normal_normal + x_normal_negative * OMEGA

        else:
            if 'synergy_neg' in self.net_type:
                x_negative_normal = self.fc3_negative(F.relu(self.fc2_negative(F.relu(self.fc1_negative(x_negative)))))
                x_negative_negative = self.fc3_negative_n(F.relu(self.fc2_negative_n(F.relu(self.fc1_negative_n(1 - x_negative)))))

                o = x_negative_normal + x_negative_negative * OMEGA

            else:
                if 'synergy_all' in self.net_type:
                    x_normal_normal = self.fc3_normal(F.relu(self.fc2_normal(F.relu(self.fc1_normal(x_normal)))))
                    x_normal_negative = self.fc3_normal_n(F.relu(self.fc2_normal_n(F.relu(self.fc1_normal_n(1 - x_normal)))))
                    x_negative_normal = self.fc3_negative(F.relu(self.fc2_negative(F.relu(self.fc1_negative(x_negative)))))
                    x_negative_negative = self.fc3_negative_n(F.relu(self.fc2_negative_n(F.relu(self.fc1_negative_n(1 - x_negative)))))

                    o = x_normal_normal + x_normal_negative * OMEGA + x_negative_normal + x_negative_negative * OMEGA
                else:
                    if 'negative' in self.net_type:
                         x = 1 - x_negative
                         o = self.fc3_negative_n(F.relu(self.fc2_negative_n(F.relu(self.fc1_negative_n(x)))))
                    else:
                         if 'normal' in self.net_type:
                             x = x_normal
                             o = self.fc3_normal(F.relu(self.fc2_normal(F.relu(self.fc1_normal(x)))))
                         else:
                             if 'hybrid_neg' in self.net_type:
                                 x = x_negative
                                 o = self.fc3_negative(F.relu(self.fc2_negative(F.relu(self.fc1_negative(x)))))
                             else:
                                 x = 1 - x_normal
                                 o = self.fc3_normal_n(F.relu(self.fc2_normal_n(F.relu(self.fc1_normal_n(x)))))

        return o

# Pomoćne funkcije za trening i testove

In [None]:
def train(model, device, train_loader, optimizer, epoch, loss_fn=F.nll_loss):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = loss_fn(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 5000 == 0:
            print('[{}] Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'
                  .format(model.net_type, epoch, batch_idx * len(data),
                          len(train_loader.dataset),
                          100. * batch_idx / len(train_loader), loss.item()))


def test(model, device, test_loader, loss_fn=F.nll_loss, dataset_name=None, filter_map=None):
    model.eval()
    test_loss = 0
    correct = 0
    index_store = []
    with torch.no_grad():
        for i, (data, target) in enumerate(test_loader):
            data, target = data.to(device), target.to(device)
            if filter_map and dataset_name in filter_map:
              for batch in data:
                for channel in batch:
                  channel.mul_(filter_map[dataset_name].to(device))
            output = model(data)
            test_loss += loss_fn(output, target, reduction='sum').item()
            pred = output.argmax(dim=1, keepdim=True)
            c = pred.eq(target.view_as(pred)).sum().item()
            correct += c
            index_store.append((i, c))

    test_loss /= len(test_loader.dataset)

    print('[{}] Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)'
          .format(model.net_type, test_loss, correct, len(test_loader.dataset),
                  100. * correct / len(test_loader.dataset)))

    return index_store

# Učitavanje podataka (i "stari" filteri)

In [None]:
kwargs = {'num_workers': 2, 'pin_memory': True} \
         if torch.cuda.is_available() else {}

f_v, f_h, f_d, f_t = torch.ones((28, 28)), torch.ones((28, 28)), \
                     torch.ones((28, 28)), torch.ones((28, 28))
f_v[:, :14] = 0  # vertical filter
f_h[:14, :] = 0  # horizontal filter
f_d[:14, 15:], f_d[15:, :14] = 0, 0  # diagonal filter
f_t[6:15, 6:15], f_t[18:27, 11:20], f_t[8:17, 17:26] = 0, 0, 0  # tcut


def mnist_loader(train=False):
    return torch.utils.data.DataLoader(
        datasets.MNIST('../data', download=True, train=train,
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ])),
        batch_size=64 if train else 1, shuffle=train, **kwargs)


def emnist_loader(split='balanced'):
    def _loader(train=False):
        return torch.utils.data.DataLoader(
           datasets.EMNIST('../data', split=split, download=True, train=train,
                           transform=transforms.Compose([
                                transforms.ToTensor(),
                                transforms.Normalize((0.1307,), (0.3081,))
                            ])), batch_size=64 if train else 1,
           shuffle=train, **kwargs)
    return _loader

transform_cifar = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

def cifar10_loader(train=False, batch_size=4):
    return torch.utils.data.DataLoader(
        datasets.CIFAR10('../data', download=True, train=train,
                          transform=transform_cifar),
                          batch_size=batch_size,
                          shuffle=train, **kwargs)

transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

def cifar10_loader_resnet(train=False, batch_size=1536):
    return torch.utils.data.DataLoader(
        datasets.CIFAR10('../data', download=True, train=train,
                          transform=transform_train if train else transform_test),
                          batch_size=batch_size,
                          shuffle=train, **kwargs)

cifar10_classes = ['plane', 'car', 'bird', 'cat',
                   'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

# Gde se radi trening (GPU)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Trening modela

In [None]:
loader = cifar10_loader

train_loader = loader(train=True)
test_loader = loader()

model_normal = Net('normal').to(device)
model_negative = Net('negative').to(device)

model_synergy_nor = Net('synergy_nor').to(device)
model_synergy_neg = Net('synergy_neg').to(device)
model_synergy_all = Net('synergy_all').to(device)

model_hybrid_nor = Net('hybrid_nor').to(device)
model_hybrid_neg = Net('hybrid_neg').to(device)

model_tr_synergy_all = Net('tr_synergy_all').to(device)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ../data/cifar-10-python.tar.gz


  0%|          | 0/170498071 [00:00<?, ?it/s]

Extracting ../data/cifar-10-python.tar.gz to ../data
Files already downloaded and verified


# Obični model

In [None]:
# ---- Normal net:

optimizer_normal = optim.SGD(filter(lambda p: p.requires_grad,
                             model_normal.parameters()), lr=LR, momentum=MOM)

for epoch in range(1, 10 + 1):
    train(model_normal, device, train_loader, optimizer_normal,
          epoch, loss_fn=F.cross_entropy)

for param in model_normal.parameters():
  param.requires_grad = False

conv1_nor = model_normal.conv1_normal
conv2_nor = model_normal.conv2_normal

# Zamrzavanje slojeva
conv1_nor.weight.requires_grad = False
conv2_nor.weight.requires_grad = False
conv1_nor.bias.requires_grad = False
conv2_nor.bias.requires_grad = False




#Snimanje modela

In [None]:
# torch.save(model_normal.state_dict(), 'model_normal.pt')
torch.save(model_normal, 'model_normal.pt')

#Učitavanje modela

In [None]:
# model_normal = Net('normal').to(device)
model_normal =torch.load('model_normal.pt')
# model_normal = torch.load('model_normal.pt',map_location=torch.device('cpu'))

# Negativni (hibrid) model

In [None]:
# ---- Hybrid net:

# Deljenje zamrznutih slojeva
model_hybrid_nor.conv1_normal = conv1_nor
model_hybrid_nor.conv2_normal = conv2_nor

optimizer_hybrid_nor = optim.SGD(filter(lambda p: p.requires_grad,
                             model_hybrid_nor.parameters()), lr=LR, momentum=MOM)

for epoch in range(1, 10 + 1):
    train(model_hybrid_nor, device, train_loader, optimizer_hybrid_nor,
          epoch, loss_fn=F.cross_entropy)

for param in model_hybrid_nor.parameters():
  param.requires_grad = False




#Snimanje modela

In [None]:
# torch.save(model_hybrid_nor.state_dict(), 'model_hybrid_nor.pt')
torch.save(model_hybrid_nor, 'model_hybrid_nor.pt')

#Učitavanje modela

In [None]:
# model_hybrid_nor = Net('hybrid_nor').to(device)
model_hybrid_nor = torch.load('model_hybrid_nor.pt')
# model_hybrid_nor = torch.load('model_hybrid_nor.pt', map_location=torch.device('cpu'))

# Skroz negativni model

In [None]:
# ---- Negative net:

optimizer_negative = optim.SGD(filter(lambda p: p.requires_grad,
                             model_negative.parameters()), lr=LR, momentum=MOM)

for epoch in range(1, 10 + 1):
    train(model_negative, device, train_loader, optimizer_negative,
          epoch, loss_fn=F.cross_entropy)

for param in model_negative.parameters():
  param.requires_grad = False

conv1_neg = model_negative.conv1_negative
conv2_neg = model_negative.conv2_negative

conv1_neg.weight.requires_grad = False
conv2_neg.weight.requires_grad = False
conv1_neg.bias.requires_grad = False
conv2_neg.bias.requires_grad = False



#Snimanje modela

In [None]:
# torch.save(model_negative.state_dict(), 'model_negative.pt')
torch.save(model_negative, 'model_negative.pt')

#Učitavanje modela

In [None]:
# model_negative = Net('negative').to(device)
model_negative = torch.load('model_negative.pt')
# model_negative = torch.load('model_negative.pt', map_location=torch.device('cpu'))

# Skroz negativni (hibrid) model

In [None]:
# ---- Hybrid_neg net:

model_hybrid_neg.conv1_negative = conv1_neg
model_hybrid_neg.conv2_negative = conv2_neg

optimizer_hybrid_neg = optim.SGD(filter(lambda p: p.requires_grad,
                             model_hybrid_neg.parameters()), lr=LR, momentum=MOM)

for epoch in range(1, 10 + 1):
    train(model_hybrid_neg, device, train_loader, optimizer_hybrid_neg,
          epoch, loss_fn=F.cross_entropy)

for param in model_hybrid_neg.parameters():
  param.requires_grad = False




#Snimanje modela

In [None]:
# torch.save(model_hybrid_neg.state_dict(), 'model_hybrid_neg.pt')
torch.save(model_hybrid_neg, 'model_hybrid_neg.pt')

#Učitavanje modela

In [None]:
# model_hybrid_neg = Net('hybrid_neg').to(device)
model_hybrid_neg = torch.load('model_hybrid_neg.pt')
# model_hybrid_neg = torch.load('model_hybrid_neg.pt', map_location=torch.device('cpu'))

# Sinergije bez treninga

In [None]:
# ---- synergy_nor net (not trained):

model_synergy_nor.conv1_normal = conv1_nor
model_synergy_nor.conv2_normal = conv2_nor

model_synergy_nor.fc1_normal = model_normal.fc1_normal
model_synergy_nor.fc2_normal = model_normal.fc2_normal
model_synergy_nor.fc3_normal = model_normal.fc3_normal

model_synergy_nor.fc1_normal_n = model_hybrid_nor.fc1_normal_n
model_synergy_nor.fc2_normal_n = model_hybrid_nor.fc2_normal_n
model_synergy_nor.fc3_normal_n = model_hybrid_nor.fc3_normal_n

# ---- synergy_neg net (not trained):

model_synergy_neg.conv1_negative = conv1_neg
model_synergy_neg.conv2_negative = conv2_neg

model_synergy_neg.fc1_negative = model_hybrid_neg.fc1_negative
model_synergy_neg.fc2_negative = model_hybrid_neg.fc2_negative
model_synergy_neg.fc3_negative = model_hybrid_neg.fc3_negative

model_synergy_neg.fc1_negative_n = model_negative.fc1_negative_n
model_synergy_neg.fc2_negative_n = model_negative.fc2_negative_n
model_synergy_neg.fc3_negative_n = model_negative.fc3_negative_n

# ---- synergy_all net (not trained):

model_synergy_all.conv1_normal = conv1_nor
model_synergy_all.conv2_normal = conv2_nor
model_synergy_all.conv1_negative = conv1_neg
model_synergy_all.conv2_negative = conv2_neg

model_synergy_all.fc1_normal = model_normal.fc1_normal
model_synergy_all.fc2_normal = model_normal.fc2_normal
model_synergy_all.fc3_normal = model_normal.fc3_normal

model_synergy_all.fc1_normal_n = model_hybrid_nor.fc1_normal_n
model_synergy_all.fc2_normal_n = model_hybrid_nor.fc2_normal_n
model_synergy_all.fc3_normal_n = model_hybrid_nor.fc3_normal_n

model_synergy_all.fc1_negative = model_hybrid_neg.fc1_negative
model_synergy_all.fc2_negative = model_hybrid_neg.fc2_negative
model_synergy_all.fc3_negative = model_hybrid_neg.fc3_negative

model_synergy_all.fc1_negative_n = model_negative.fc1_negative_n
model_synergy_all.fc2_negative_n = model_negative.fc2_negative_n
model_synergy_all.fc3_negative_n = model_negative.fc3_negative_n


#Snimanje modela

In [None]:
# torch.save(model_synergy_nor.state_dict(), 'model_synergy_nor.pt')
# torch.save(model_synergy_neg.state_dict(), 'model_synergy_neg.pt')
# torch.save(model_synergy_all.state_dict(), 'model_synergy_all.pt')
torch.save(model_synergy_nor, 'model_synergy_nor.pt')
torch.save(model_synergy_neg, 'model_synergy_neg.pt')
torch.save(model_synergy_all, 'model_synergy_all.pt')

#Učitavanje modela

In [None]:
model_synergy_nor = torch.load('model_synergy_nor.pt')
model_synergy_neg = torch.load('model_synergy_neg.pt')
model_synergy_all = torch.load('model_synergy_all.pt')

# model_synergy_nor = torch.load('model_synergy_nor.pt', map_location=torch.device('cpu'))
# model_synergy_neg = torch.load('model_synergy_neg.pt', map_location=torch.device('cpu'))
# model_synergy_all = torch.load('model_synergy_all.pt', map_location=torch.device('cpu'))

#Trenirana sinergija

In [None]:
# ---- Trained synergy_all TF:

model_tr_synergy_all.conv1_normal = conv1_nor
model_tr_synergy_all.conv2_normal = conv2_nor
model_tr_synergy_all.conv1_negative = conv1_neg
model_tr_synergy_all.conv2_negative = conv2_neg

optimizer_tr_synergy_all = optim.SGD(filter(lambda p: p.requires_grad,
                                    model_tr_synergy_all.parameters()),
                                    lr=LR, momentum=MOM)

for epoch in range(1, 10 + 1):
    train(model_tr_synergy_all, device, train_loader,
          optimizer_tr_synergy_all, epoch, loss_fn=F.cross_entropy)


#Snimanje modela

In [None]:
# torch.save(model_tr_synergy_all.state_dict(), 'model_tr_synergy_all.pt')
torch.save(model_tr_synergy_all, 'model_tr_synergy_all.pt')

#Učitavanje modela

In [None]:
model_tr_synergy_all = torch.load('model_tr_synergy_all.pt')

# model_tr_synergy_all = torch.load('model_tr_synergy_all.pt', map_location=torch.device('cpu'))

# Nova sečenja

In [None]:
import random
random.seed(0)

def corner_kernel(factor):
    kernel = np.ones((32, 32))
    limit = int(factor * kernel.shape[0]) * 2
    f = (1 - np.tril(kernel[:limit, :limit]))
    kernel[kernel.shape[0] - limit:limit + kernel.shape[0], :limit] = f
    return torch.tensor(kernel)

def single_square_kernel(start=5, size=10):
    kernel = np.ones((32, 32))
    kernel[start:start+size, start:start+size] = 0.
    return torch.tensor(kernel)

def single_square_kernel_random(start=5, size=10):
    random_pos = random.randint(0, 31 - size)
    kernel = np.ones((32, 32))
    kernel[random_pos:random_pos + size, random_pos:random_pos + size] = 0.
    return torch.tensor(kernel)

def multiple_square_kernel(n_cuts=3, start=5, size=10):
    size_triple = size // n_cuts
    kernel = np.ones((32, 32))

    beg = start
    end = start + size_triple

    for _ in range(n_cuts):

        kernel[beg:end, beg:end] = 0.

        beg = end + 5
        end = beg + size_triple

    return torch.tensor(kernel)

def multiple_square_kernel_random(n_cuts = 2, start=0, size=20, sep = 5):
    size_triple = size // n_cuts
    kernel = np.ones((32, 32))

    beg1 = start + random.randint(1, sep)
    end1 = beg1 + size_triple

    beg2 = start + random.randint(1, sep)
    end2 = beg2 + size_triple

    for _ in range(n_cuts):

        kernel[beg1:end1, beg2:end2] = 0.

        beg1 = end1 + random.randint(1, sep)
        end1 = beg1 + size_triple

        beg2 = end2 + random.randint(1, sep)
        end2 = beg2 + size_triple

    return torch.tensor(kernel)

# Testiranje sa svim modelima i svim "novim" sečenjima

In [None]:
from torchvision import datasets, transforms
import random
random.seed(0)

models = [model_normal, model_negative, model_hybrid_nor, model_hybrid_neg,
          model_synergy_nor, model_synergy_neg, model_synergy_all, model_tr_synergy_all]


dataset_names = ['Normal', 'C1', 'C2', 'C3', 'SSK', 'SSKR', 'MSK', 'MSKR']

alldatasets = []
for _ in dataset_names:
    alldatasets.append(loader())

filters_map = {'C1': corner_kernel(0.1),
               'C2': corner_kernel(0.2), 'C3': corner_kernel(0.3),
               'SSK': single_square_kernel(), 'SSKR': single_square_kernel_random(), 'MSK': multiple_square_kernel(), 'MSKR': multiple_square_kernel_random()}

for dataset, name in zip(alldatasets, dataset_names):
    print('Testing -- ' + name)
    fdir = '.'
    for m in models:
        index_store = test(m, device, dataset, loss_fn=F.cross_entropy, dataset_name=name, filter_map=filters_map)
        file_name = f'{fdir}/{m.net_type}-{name}-{dt.now()}.pth'
        with open(file_name, 'wb') as f:
            pickle.dump(index_store, f)

for m in models:
    file_name = f'{m.net_type}-{dt.now()}.pth'
    torch.save(m.state_dict(), file_name)

# Dodatni pomoćni elementi za FGSM

In [None]:
loader_fgsm = cifar10_loader

test_loader_fgsm = loader_fgsm(batch_size=1)

# FGSM attack code
def fgsm_attack(image, epsilon, data_grad):
    # get element-wise signs for gradient ascent
    sign_data_grad = data_grad.sign()
    perturbed_image = image + epsilon * sign_data_grad
    # clip to [0,1]
    # perturbed_image = torch.clamp(perturbed_image, 0, 1)
    return perturbed_image

def test_fgsm(model, device, test_loader, epsilon):
    model.eval()
    correct = 0
    adv_examples = []

    for data, target in test_loader:

        data, target = data.to(device), target.to(device)
        data.requires_grad = True
        output = model(data)
        init_pred = output.max(1, keepdim=True)[1]

        # breakpoint()
        # If the initial prediction is wrong
        # dont bother attacking, just move on
        if init_pred.item() != target.item():
            continue

        loss = F.nll_loss(output, target)

        model.zero_grad()

        loss.backward()

        data_grad = data.grad.data

        perturbed_data = fgsm_attack(data, epsilon, data_grad)

        output = model(perturbed_data)

        final_pred = output.max(1, keepdim=True)[1]

        if final_pred.item() == target.item():
            correct += 1
            if (epsilon == 0) and (len(adv_examples) < 5):
                adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
                adv_examples.append((init_pred.item(), final_pred.item(), adv_ex))
        else:
            if len(adv_examples) < 5:
                adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
                adv_examples.append((init_pred.item(), final_pred.item(), adv_ex))

    final_acc = correct / float(len(test_loader))
    print(
        "Model: {}\tEpsilon: {}\tTest Accuracy = {} / {} = {}".format(
            model.net_type, epsilon, correct, len(test_loader), final_acc
        )
    )

    return final_acc, adv_examples

Files already downloaded and verified


# Testiranje za FGSM

In [None]:
# Testing:

start_time = time.time()

models_fgsm = [
    model_normal,
    model_negative,
    model_synergy_nor,
    model_synergy_neg,
    model_synergy_all,
    model_tr_synergy_all
]

epsilons = [0, 0.01, 0.02, 0.03, 0.04, 0.05]
accuracies = []
examples = []

# Run test for each epsilon
for eps in epsilons:
    for model in models_fgsm:
        acc, ex = test_fgsm(model, device, test_loader_fgsm, eps)
        accuracies.append(acc)

print('--- Total time: %s seconds ---' % (time.time() - start_time))


#Black-box PGD

In [None]:
import torchattacks
from torchattacks import PGD
from torch.utils.data import DataLoader, TensorDataset

batch_size = 128

train_att = datasets.CIFAR10(root='../data', train=True,
                      download=True, transform=transforms.ToTensor())
test_att = datasets.CIFAR10(root='../data', train=False,
                     download=True, transform=transforms.ToTensor())

train_loader_att = torch.utils.data.DataLoader(train_att,
                                           batch_size=batch_size,
                                           shuffle=True)
test_loader_att = torch.utils.data.DataLoader(test_att,
                                          batch_size=batch_size,
                                          shuffle=False)

transform_att = transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))


models_test = [
    model_normal,
    model_synergy_nor,
    model_synergy_all,
    model_tr_synergy_all
]

epsilons = [0.0, 0.01, 0.03, 0.05]


for epsilon in epsilons:
    print("=" * 30)
    print("epsilon =", epsilon)
    print("=" * 30)

    for attacked in models_test:
        print('attacked: ', attacked.net_type)
        pgd_attack = PGD(attacked, eps=epsilon, alpha=2/255, steps=40)
        pgd_attack.save(data_loader=test_loader_att, save_path="../data/cifar_pgd.pt",
                        verbose=False, save_type='int')

        adv_dict = torch.load("../data/cifar_pgd.pt")
        adv_images = adv_dict['adv_inputs']
        adv_labels = adv_dict['labels']

        adv_images = transform_att(adv_images.float()/255)
        adv_data = TensorDataset(adv_images, adv_labels)
        adv_loader = DataLoader(adv_data, batch_size=128, shuffle=False, **kwargs)

        for model in models_test:
            model.eval()
            correct = 0
            total = 0

            for images, labels in adv_loader:
                images = images.cuda()
                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels.cuda()).sum()

            print(model.net_type, 'acc: %.2f %%' % (100 * float(correct) / total), end='; ')
        print("\n")
