<a href="https://colab.research.google.com/github/AloniRegev/Defense-Against-Adversarial-Examples-in-NN/blob/main/DL_project_adversary_attacks.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
import torch
import torchvision
import torchvision.datasets as datasets
from torch import optim
from torchvision import transforms
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import torch.utils.data as data
from torchsummary import summary

# for adversary
import copy
from PIL import Image


def load_dataset():
    transform_train = transforms.Compose([
        #agmentation below
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        #regular normalization
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261)),
    ])

    # Normalize the test set same as training set without augmentation
    transform_test = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261)),
    ])

    trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
    print(f' trainset: {trainset}')
    #print(f' trainset shape: {trainset.size()}')

    ## script to find mean
    # data = trainset.data / 255  # data is numpy array
    #
    # mean = data.mean(axis=(0, 1, 2))
    # std = data.std(axis=(0, 1, 2))
    # print(f"Mean : {mean}   STD: {std}")  # Mean : [0.491 0.482 0.446]   STD: [0.247 0.243 0.261]

    cifar_testset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)#transform_test
    cifar_trainset, cifar_valset = data.random_split(trainset, [int(len(trainset) * 0.8), int(len(trainset) * 0.2)])  # split the trainset to trainset and validation set in 80%-20% retio

    print('train set len', len(cifar_trainset))
    print('validation set len', len(cifar_valset))
    print('test set len', len(cifar_testset))

    number_workers =0
    if device==torch.device('cuda'):
        number_workers = 2
    train_loader = data.DataLoader(cifar_trainset, shuffle=True, batch_size=64, num_workers=number_workers)
    val_loader = data.DataLoader(cifar_valset, shuffle=False, batch_size=64, num_workers=number_workers) #TODO: changed shuffle see if helps
    test_loader = data.DataLoader(cifar_testset, shuffle=False, batch_size=64, num_workers=number_workers)



    return train_loader, val_loader, test_loader


# model3:  with dropout, with batch, without fc layers

#model3
class CNN_model3(nn.Module): #TODO: fix so I get correct dimensions of output
    def __init__(self):
        super(CNN_model3, self).__init__()
        self.feature_extractor = nn.Sequential(

            # Conv Layer block 1
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            # Conv Layer block 2
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout2d(p=0.05),

            # Conv Layer block 3
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        self.classifier = nn.Sequential(
            nn.Dropout(p=0.1),
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=4),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=512, out_channels=256, kernel_size=1),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.1),
            nn.Conv2d(in_channels=256, out_channels=64, kernel_size=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=64, out_channels=10, kernel_size=1),

        )

    def forward(self, x):
        # conv layers
        features = self.feature_extractor(x)
        #print("features shape:", features.shape)

        # final non fully connected
        class_scores = self.classifier(features)
        #print("class_scores shape:", class_scores.shape)
        class_scores = torch.reshape(class_scores, (class_scores.size(dim=0), class_scores.size(dim=1)))
        #print("class_scores shape:", class_scores.shape)

        return class_scores



# Train
def train_data(model, epochs, learning_rate, loss_function, train_loader, valid_loader, patience=4):
    loss_arr = []
    avg_train_loss_arr, avg_val_loss_arr = [], []
    train_acc_arr, val_acc_arr = [], []
    # Early stopping  parameters
    last_loss = 100 #initializing max loss as high unreachable value
    trigger_times = 0
    PATH = './checkpoint'
    total, correct = 0.0, 0.0

    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=0.00012)

    dataiter = iter(train_loader)
    images, labels = dataiter.next()
    print(type(images))
    print(images.shape)
    print(labels.shape)

    for epoch in range(epochs):
        model.train() #defining we're training so can use dropout, batch norm

        for i, (inputs, labels) in enumerate(train_loader, 0):
            if device==torch.device('cuda'):
                inputs = inputs.to(device)
                labels = labels.to(device)

            # Zero the gradients
            optimizer.zero_grad()

            # Forward and backward propagation
            outputs = model(inputs)
            loss = loss_function(outputs, labels)
            loss_arr.append(loss.item())
            loss.backward()
            optimizer.step()

            # Show progress
            if i % 100 == 0 or i == len(train_loader):
                #print('[{}/{}, {}/{}] loss: {:.8}'.format(epoch, epochs, i, len(train_loader), loss.item()))
                print("Iteration: {0} | Loss: {1} | index {2} ".format(epoch, loss.item(),i))

            total += inputs.shape[0]
            predictions = torch.argmax(outputs.data, dim=1)
            correct += torch.sum(predictions == labels).type(torch.float32)

        #print("total is: {0}, len(train_loader): {1}, correct pred num is: {2}".format(total, len(train_loader), correct))
        train_acc = (correct / total)
        print('Accuracy: in train', train_acc)
        train_acc_arr.append(train_acc.item())

        plot_graph(loss_arr, "generic network training loss")
        avg_train_loss_arr.append(np.mean(loss_arr))
        # Early stopping
        current_loss, avg_val_loss , val_acc = validation_data(model, valid_loader)
        print('The Current Loss by validation data:', current_loss)
        avg_val_loss_arr.append(avg_val_loss.item())
        val_acc_arr.append(val_acc.item())

        if current_loss > last_loss:
            trigger_times += 1
            #print('Trigger Times:', trigger_times)

            if trigger_times >= patience:
                print('Early stopping!\nStart to test process.')
                break #exit loop, print data

        else:
            #print('trigger times did not increase:' , trigger_times)
            torch.save(model.state_dict(), PATH)
            trigger_times = 0

        last_loss = current_loss

    model.load_state_dict(torch.load(PATH))
    plot_graph(loss_arr, "generic network training loss")

    plt.plot(avg_train_loss_arr, label='train loss')
    # Plot another line on the same chart/graph
    plt.plot(avg_val_loss_arr, label='val loss')
    plt.title("avg train loss vs avg validation loss")
    plt.legend()
    plt.xlabel('epochs')
    plt.ylabel('loss')
    # plt.savefig('./outputs_q1/avg_train_loss_vs_avg_validation_loss.png')
    plt.show()

    plt.plot(train_acc_arr, label='train accuracy')
    # Plot another line on the same chart/graph
    plt.plot(val_acc_arr, label='validation accuracy')
    plt.title("avg train acc vs avg validation acc")
    plt.legend()
    plt.xlabel('epochs')
    plt.ylabel('accuracy')
    # plt.savefig('./outputs_q1/avg_train_acc_vs_avg_validation_acc.png')
    plt.show()

    return model


def plot_graph(list, title):
    plt.plot(list)
    plt.title(title)
    plt.show()


def validation_data(model, valid_loader):
    model.eval()
    total = 0
    correct = 0
    loss_total = 0.0
    loss_arr = []

    # iterate over test data
    with torch.no_grad(): #disable gradients because we only run on test data
        for (data, labels) in valid_loader:
            # move tensors to GPU if CUDA is available
            if device==torch.device('cuda'):
                data, labels = data.cuda(), labels.cuda()
            # forward pass: compute predicted outputs by passing inputs to the model
            output = model(data)
            # calculate the batch loss
            loss_valid = loss_fn(output, labels)
            loss_arr.append(loss_valid.item())
            loss_total += loss_valid.item()

            total += data.shape[0]
            predictions = torch.argmax(output.data, dim=1)
            correct+= torch.sum(predictions==labels).type(torch.float32)

    #plot_graph(loss_arr, "generic network valid loss")
    acc = (correct / total)
    print('Accuracy: in validation', acc)

    return (loss_total / len(valid_loader)), np.mean(loss_arr), acc


def test_data(model, test_loader):
    model.eval()
    total = 0
    correct = 0
    loss_arr = []

    # iterate over test data
    with torch.no_grad(): #disable gradients because we only run on test data
        for (data, labels) in test_loader:
            # move tensors to GPU if CUDA is available
            if device==torch.device('cuda'):
                data, labels = data.cuda(), labels.cuda()
            # forward pass: compute predicted outputs by passing inputs to the model
            output = model(data)
            # calculate the batch loss
            loss_test = loss_fn(output, labels)
            loss_arr.append(loss_test.item())

            total += data.shape[0]
            predictions = torch.argmax(output.data, dim=1)
            correct+= torch.sum(predictions==labels).type(torch.float32)

    plot_graph(loss_arr, "generic network test loss")

    print('Accuracy: in test', (correct / total))


def deepfool(image, net, num_classes=10, overshoot=0.02, max_iter=10):

    print('in deepfool before first forward')
    image_torch = torch.tensor(image, device=device)
    f_image = net.forward(image_torch)
    #f_image = net.forward(image_torch).data.numpy().flatten()
    I = (np.array(f_image)).flatten().argsort()[::-1]

    print('after first')

    I = I[0:num_classes]
    label = I[0]

    input_shape = image.detach().numpy().shape
    pert_image = copy.deepcopy(image)
    w = np.zeros(input_shape)
    r_tot = np.zeros(input_shape)

    loop_i = 0

    x = torch.tensor(pert_image[None, :],requires_grad=True)
    
    fs = net.forward(x[0])
    print('after second forward')
    fs_list = [fs[0,I[k]] for k in range(num_classes)]
    k_i = label

    while k_i == label and loop_i < max_iter:

        pert = np.inf
        fs[0, I[0]].backward(retain_graph=True)
        grad_orig = x.grad.data.numpy().copy()

        for k in range(1, num_classes):
            
            #x.zero_grad()
            
            fs[0, I[k]].backward(retain_graph=True)
            cur_grad = x.grad.data.numpy().copy()

            # set new w_k and new f_k
            w_k = cur_grad - grad_orig
            f_k = (fs[0, I[k]] - fs[0, I[0]]).data.numpy()

            pert_k = abs(f_k)/np.linalg.norm(w_k.flatten())

            # determine which w_k to use
            if pert_k < pert:
                pert = pert_k
                w = w_k

        # compute r_i and r_tot
        # Added 1e-4 for numerical stability
        r_i =  (pert+1e-4) * w / np.linalg.norm(w)
        r_tot = np.float32(r_tot + r_i)

        pert_image = image + (1+overshoot)*torch.from_numpy(r_tot)

        x = torch.tensor(pert_image, requires_grad=True)
        fs = net.forward(x[0])
        k_i = np.argmax(fs.data.numpy().flatten())

        loop_i += 1

    r_tot = (1+overshoot)*r_tot

    return r_tot, loop_i, label, k_i, pert_image


def deepfool(test_images, net, num_classes=10, overshoot=0.02, max_iter=10):

    print('in deepfool before first forward')
    image_torch = torch.tensor(image, device=device)
    f_image = net.forward(image_torch)
    #f_image = net.forward(image_torch).data.numpy().flatten()
    I = (np.array(f_image)).flatten().argsort()[::-1]

    print('after first')

    I = I[0:num_classes]
    label = I[0]

    input_shape = image.detach().numpy().shape
    pert_image = copy.deepcopy(image)
    w = np.zeros(input_shape)
    r_tot = np.zeros(input_shape)

    loop_i = 0

    x = torch.tensor(pert_image[None, :],requires_grad=True)
    
    fs = net.forward(x[0])
    print('after second forward')
    fs_list = [fs[0,I[k]] for k in range(num_classes)]
    k_i = label

    while k_i == label and loop_i < max_iter:

        pert = np.inf
        fs[0, I[0]].backward(retain_graph=True)
        grad_orig = x.grad.data.numpy().copy()

        for k in range(1, num_classes):
            
            #x.zero_grad()
            
            fs[0, I[k]].backward(retain_graph=True)
            cur_grad = x.grad.data.numpy().copy()

            # set new w_k and new f_k
            w_k = cur_grad - grad_orig
            f_k = (fs[0, I[k]] - fs[0, I[0]]).data.numpy()

            pert_k = abs(f_k)/np.linalg.norm(w_k.flatten())

            # determine which w_k to use
            if pert_k < pert:
                pert = pert_k
                w = w_k

        # compute r_i and r_tot
        # Added 1e-4 for numerical stability
        r_i =  (pert+1e-4) * w / np.linalg.norm(w)
        r_tot = np.float32(r_tot + r_i)

        pert_image = image + (1+overshoot)*torch.from_numpy(r_tot)

        x = torch.tensor(pert_image, requires_grad=True)
        fs = net.forward(x[0])
        k_i = np.argmax(fs.data.numpy().flatten())

        loop_i += 1

    r_tot = (1+overshoot)*r_tot

    return r_tot, loop_i, label, k_i, pert_image


def calling_deepfool(net):

  # Switch to evaluation mode
  net.eval()

  im_orig = Image.open('./DeepFool/test_im1.jpg')

  im = transforms.Compose([
      transforms.ToTensor(),
      ])(im_orig)

  img = torch.tensor(im[None,:,:,:],requires_grad =True)

  r, loop_i, label_orig, label_pert, pert_image = deepfool(img, net,max_iter=50)

  labels = open(('./DeepFool/synset_words.txt'), 'r').read().split('\n')

  str_label_orig = labels[np.int(label_orig)].split(',')[0]
  str_label_pert = labels[np.int(label_pert)].split(',')[0]

  print("Original label = ", str_label_orig)
  print("Perturbed label = ", str_label_pert)

  pert_image_numpy = pert_image.detach().squeeze().numpy()
  print(pert_image_numpy.shape)
  plt.figure()
  plt.imshow(pert_image_numpy.transpose(1,2,0))
  plt.title(str_label_pert)
  plt.show()


  plt.figure()
  plt.imshow(im_orig)
  plt.title(str_label_orig)
  plt.show()

  r_new = r.squeeze()

  plt.figure()
  plt.imshow(r_new.transpose(1,2,0))
  plt.show()




if __name__ == '__main__':
    global device
    device = torch.device('cpu')
    # check if cuda is available
    train_on_gpu = torch.cuda.is_available()
    if train_on_gpu:
        device = torch.device('cuda')
        print("CUDA available. Training on GPU")
    else:
        print("CUDA is not available. Training on CPU")

    train_loader, val_loader, test_loader = load_dataset()
    batch_size = 64
    max_epochs = 80  # number of steps between evaluations
    loss_fn = nn.CrossEntropyLoss()

    model3 = CNN_model3().to(device)  # with dropout, batch, without FC layers
    summary(model3, input_size=(3, 32,32))
    print(model3)torch.save(model.state_dict(), PATH)
    model3 = train_data(model3, 100, 0.0001, loss_fn, train_loader, val_loader)

    #torch.save(model3.state_dict(), PATH)
    # PATH = './checkpoint'
    # model3.load_state_dict(torch.load(PATH))
    test_data(model3, test_loader)
    print('\n\nFinished Training model3\n\n')

    #calling_deepfool(model3)










CUDA available. Training on GPU
Files already downloaded and verified
 trainset: Dataset CIFAR10
    Number of datapoints: 50000
    Root location: ./data
    Split: Train
    StandardTransform
Transform: Compose(
               RandomCrop(size=(32, 32), padding=4)
               RandomHorizontalFlip(p=0.5)
               ToTensor()
               Normalize(mean=(0.4914, 0.4822, 0.4465), std=(0.247, 0.243, 0.261))
           )
Files already downloaded and verified
train set len 40000
validation set len 10000
test set len 10000


Finished Training model3


in deepfool before first forward




RuntimeError: ignored