# Adversarial Training

As we saw before, classifiers can be tricked by a adversary (i.e. the bad guy) using attacks such as FGSM. We would like our classifier to be robust against these kind of attacks because misclassifing a `stop sign` as a `speed limit 100 miles` would not be ideal.

Here comes **Adversarial Training**, where we train our model with our original data *along with generated adversarial examples*. If our model sees more adversarial examples, it can handle them better.


## Imports

In [None]:
# Pytorch - Machine Learning Library
import torch
import torch.nn.functional as F
from torch import nn, utils, optim
import torchvision
import torchvision.transforms as transforms
from torchvision import datasets

import tensorflow as tf # the other machine learning library

import requests # this lets us make http requests, so we can use this to download things from the internet
import numpy as np
import matplotlib.pyplot as plt

# libraries to help us process the images
from io import BytesIO
from PIL import Image
from tqdm.notebook import tqdm


### Setting device

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

print(f'Running on {device}')

Running on cuda:0


# Data processing

In [None]:
train_dataset = torchvision.datasets.CIFAR10(root='./cifar10', transform=torchvision.transforms.ToTensor(), download=True)
test_dataset = torchvision.datasets.CIFAR10(root='./cifar10', train=False, transform=torchvision.transforms.ToTensor(), download=True)

In [None]:
batch_size = 128;

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
# visualizing a sample from train loader

train_iter = iter(train_loader)

batch_images, batch_labels = next(train_iter)
image, label = batch_images[0], batch_labels[0]
plt.imshow(image.permute(1,2,0))
plt.show()

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

# Building Model

Remember, that CNN's have a typical architecture that involve CONV -> Maxpool -> .... -> FC -> ... Output

https://pytorch.org/docs/stable/index.html

In [None]:
class SillyBoiModel(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(SillyBoiModel, self).__init__()
        self.ConvLayer1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.ConvLayer2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.ConvLayer3 = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1)
        self.ConvLayer4 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.ConvLayer5 = nn.Conv2d(128, 128, kernel_size=3, padding=1)

        self.batchnorm1 = nn.BatchNorm2d(32)
        self.batchnorm2 = nn.BatchNorm2d(64)
        self.batchnorm3 = nn.BatchNorm2d(64)
        self.batchnorm4 = nn.BatchNorm2d(128)
        self.batchnorm5 = nn.BatchNorm2d(128)

        self.pool = nn.MaxPool2d(2, stride=2)

        self.dropout = nn.Dropout(0.2)
        self.fc1 = nn.Linear(4*4*128, 500)
        self.fc2 = nn.Linear(500, 10)

    def forward(self, x):
        x = self.ConvLayer1(x)
        x = F.relu(x)
        x = self.batchnorm1(x)

        x = self.ConvLayer2(x)
        x = F.relu(x)
        x = self.batchnorm2(x)

        x = self.ConvLayer3(x)
        x = F.relu(x)
        x = self.pool(x)
        x = self.batchnorm3(x)

        x = self.ConvLayer4(x)
        x = F.relu(x)
        x = self.pool(x)
        x = self.batchnorm4(x)
        # print(x.size())

        x = self.ConvLayer5(x)
        x = F.relu(x)
        x = self.pool(x)
        x = self.batchnorm5(x)
        # print(x.size())

        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        x = F.relu(x)

        x = self.dropout(x)
        x = self.fc2(x)

        return x

In [None]:
MikeTheModel = SillyBoiModel(3, 10).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(MikeTheModel.parameters(), lr=0.003)

## Training

In [None]:
def train_one_epoch(model, train_loader, optimizer, criterion, device):
    # what is the first thing to do before starting training?
      model.train()
      running_loss = 0.0
      for i, batch in enumerate(train_loader):  # looping through
        inputs, labels = batch
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = MikeTheModel(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

      print('End of epoch loss:', round(loss.item(), 3))

## Testing

In [None]:
def test(model, test_loader, device):
  model.eval()
  correct = 0
  total = 0
  classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
  class_correct = list(0. for i in range(10))
  class_total = list(0. for i in range(10))
  # since not training, don't need to calculate gradients
  with torch.no_grad():
    # print("TEST BP 1")
    for batch in test_loader:
      # print("TEST BP 1.5")
      inputs, labels = batch
      inputs, labels = inputs.to(device), labels.to(device)
      # print("TEST BP 2")
      outputs = MikeTheModel(inputs)
      _, predicted = torch.max(outputs, 1)
      # print("TEST BP 3")
      total += labels.size(0)
      correct += (predicted == labels).sum().item()
      c = (predicted == labels).squeeze()
      for i in range(4):
        label = labels[i]
        class_correct[label] += c[i].item()
        class_total[label] += 1
      # print("TEST BP 4")
  print('Accuracy of the network on the 10000 val images: %d %%' % (100 * correct / total))

  for i in range(10):
    print('Accuracy of %5s: %2d %%' % (classes[i], 100 * class_correct[i] / class_total[i]))

# Running the train-test loop

In [None]:
# NUM_EPOCHS = 15

# for epoch in range(NUM_EPOCHS):
#     print("Epoch: ", epoch + 1)
#     train_one_epoch(MikeTheModel, train_loader, optimizer, criterion, device)
#     test(MikeTheModel, test_loader, device)

## Saving the weights

In [None]:
# TODO: save the weights of your model (5 min) hint: look at the documentation or slides :))

torch.save(MikeTheModel.state_dict(), "model.pth")

## Loading the weights

In [None]:
# power outage!
# you've lost all your weights.
# or have you?
# TODO: reload the weights you just (hopefully) saved (5 min)

"""
MikealaTheModel = SillyBoiModel(3, 10)
MikealaTheModel.load_state_dict(torch.load("model.pth"))
MikealaTheModel.to(device)
MikealaTheModel.eval()
"""

# Adversarial Training

In [None]:
MichelleTheModel = SillyBoiModel(3, 10).to(device)

In [None]:
# with benson's info

"""
Make FGSMTransorm Class
Use this transform class to
"""

class FGSMTransform:
  def __init__(self, epsilon):
    self.epsilon = epsilon
    self.network = SillyBoiModel(3, 10).to(torch.device("cuda" if torch.cuda.is_available() else "cpu")) # I THINK?????
    # self.model.load_state_dict(torch.load("./models/cnn.pth"))
    self.criterion = torch.nn.CrossEntropyLoss()
    self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

  def __call__(self, sample):  # unsure about the squeezing
    image, label = sample
    image = image.to(self.device)
    label = label.to(self.device)

    image.requires_grad = True

    output = self.network(image.unsqueeze(dim=0))[0]
    prediction = output.max(dim=0)[1].cpu().numpy()
    loss = F.nll_loss(output, label)
    # loss.requires_grad = True

    image_gradients = torch.autograd.grad(loss, image)[0]
    image_gradients = image_gradients.sign()

    # fast gradient sign attack
    image_attacked = (image + self.epsilon * image_gradients).clamp(0, 1)  # add noise to image


    return image_attacked.squeeze() # may not need to squeeze, idk

class CustomCIFAR(datasets.CIFAR10): # straight from Benson's code
    def __getitem__(self, index: int):
        """
        Args:
            index (int): Index
        Returns:
            tuple: (image, target) where target is index of the target class.
        """
        img, target = self.data[index], self.targets[index]
        img = Image.fromarray(img)
        if self.transform is not None:
            #CHANGED from the torchvision implementation: pass the target into transform
            img = self.transform((img, target))
        if self.target_transform is not None:
            target = self.target_transform(target)


        return img, target


In [None]:
# lifted straight from Beson's code
class ToTensor:
    """Convert ndarrays in sample to Tensors. Works the same as transforms.ToTensor() but includes labels"""
    def __call__(self, sample):
        x, label = sample
        # requires_grad = True
        return (transforms.functional.to_tensor(x), torch.from_numpy(np.array(label)))

In [None]:
class CustomCIFAR2(datasets.CIFAR10):
  def __init__(
        self,
        root: str,
        train: bool = True,
        transform  = None,
        target_transform = None,
        download: bool = False,
        processing = None,
    ) -> None:
    super().__init__(root, train, transform, target_transform, download)
    if processing:
      # print(self.data[0], type(self.data), "shape:", self.data[0].shape)
      processed_data = []

      to_tensor_f = ToTensor()
      for datapt, target in zip(self.data, self.targets):
          processed_datapt = processing(to_tensor_f((datapt, target)))
          # we need to transpose processed_data
          # But first we need to move it to the cpu and turn it back to a numpy array
          processed_datapt = processed_datapt.detach().cpu().numpy()

          # processed_datapt = np.transpose(processed_datapt, axes = (2,1,0))
          # ??????? Need to denormalize
          # processed_datapt = (processed_datapt * 255).astype(np.uint8)
          #  ???????????
          processed_data.append(processed_datapt)

      self.data = np.array(processed_data)

  def __getitem__(self, index: int):
      """
      Args:
          index (int): Index
      Returns:
          tuple: (image, target) where target is index of the target class.
      """
      img, target = self.data[index], self.targets[index]
      return img, target

In [None]:
# DUMP OF STUFF

"""
train_dataset = torchvision.datasets.CIFAR10(root='./cifar10', transform=torchvision.transforms.ToTensor(), download=True)
test_dataset = torchvision.datasets.CIFAR10(root='./cifar10', train=False, transform=torchvision.transforms.ToTensor(), download=True)

batch_size = 128;

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
"""

MikeTheModel.train()

EPSILONS = [0.005, 0.01, 0.015, 0.2]

for epsilon in EPSILONS: # make EPSILONS -- ALSO A WIP
  print("TRAINGING WITH EPSILON: ", epsilon)

  fgsm_transform = transforms.Compose([ToTensor(),FGSMTransform(epsilon)])

  train_dataset2 = CustomCIFAR(root='./cifar10', train=True, transform=fgsm_transform, download=True)
  perturbed_train_dataset = torch.utils.data.ConcatDataset([train_dataset, train_dataset2])
  perturbed_train_loader = torch.utils.data.DataLoader(dataset=train_dataset2, batch_size=batch_size, shuffle=True)
  test_dataset2 = CustomCIFAR2(root='./cifar10', train=False, processing=FGSMTransform(epsilon), download=True)
  perturbed_test_dataset = torch.utils.data.ConcatDataset([test_dataset, test_dataset2])
  perturbed_test_loader = torch.utils.data.DataLoader(dataset=test_dataset2, batch_size=batch_size, shuffle=True)
  PERTURBED_NUM_EPOCHS = 5

  for epoch in range(PERTURBED_NUM_EPOCHS):
    print("Epoch: ", epoch + 1)
    train_one_epoch(MikeTheModel, perturbed_train_loader, optimizer, criterion, device)
    test(MikeTheModel, perturbed_test_loader, device) # need to make a perturbed_test_loader


### DEBUG

In [None]:
# test(MikeTheModel, perturbed_test_loader, device)
# train_one_epoch(MikeTheModel, perturbed_train_loader, optimizer, criterion, device)

In [None]:
print(test_dataset2)
print(len(test_dataset2))
perturbed_test_loader = torch.utils.data.DataLoader(dataset=test_dataset2, batch_size=batch_size, shuffle=True)


In [None]:
for data in perturbed_test_dataset:
  print(data)
  break
for data in perturbed_train_dataset:
  print(data)
  break

In [None]:
for param in MikeTheModel.parameters():
    print(param.requires_grad)

In [None]:
for data in test_dataset:
  print(data)
  break
for data in train_dataset:
  print(data)
  break
test_dataset2 = CustomCIFAR(root='./cifar10', train=False, transform=fgsm_transform, download=True)
for data in test_dataset2:
  print(data[0].requires_grad)
  print(data[0].grad_fn)
  break
for data in perturbed_test_loader:
  print(data)
  break
with torch.no_grad():
  for data in perturbed_test_loader:
    print(data)
    break

In [None]:
print(type(test_dataset2))
print(type(test_dataset))