# About this assignment
In this assignment, you will implement two adversarial attacks against ResNet18 (FGSM and PGD), as well as two defenses against adversarial attacks (adversarial training and SAP). There are three goals for this assignment:
1. Learning about and evaluate base adversarial attacks and defenses in a simple setting.
2. Learning to use Pytorch's Lightning framework to simplify and modularize your code.
3. Learning to use Pytorch to adjust/manipulate the *architecture* of a pretrained model.

# Imports

If you're running this notebook in Colab, you'll want to uncomment and run the following line.

If you're running this notebook locally or on a Grace cluster, you can separately install any packages you use. 

Note: for this assignment, if your local machine is not GPU-compatible, you will probably want to use Colab or a Grace cluster.

In [None]:
# !pip install lightning

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

%matplotlib inline

import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import lightning as L
from torchmetrics import Accuracy
from lightning.pytorch.callbacks import ModelCheckpoint, LearningRateMonitor, EarlyStopping

# Config
Just run the next code block, but double check the one after that.

In [None]:
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
print(device)

CLASSES = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
NUM_CLASSES = len(CLASSES)

In [None]:
# If you run into memory issues, you can reduce the batch size
BATCH_SIZE = 128

# Change these to the relative paths you'd like to use
# for the CIFAR-10 data and model checkpoints
DATA_PATH = 'data/'
CHECKPOINT_PATH = 'models/checkpoints/'

# The different models we'll be fine-tuning
SAVE_NAMES = [
    'baseline',
    'adv_train',    # Adversarial training a la Madry et al.
    'SAP_conv', # Full SAP post-convolution a la Dhillon et al.
]
SAVE_NAMES = {
    name: os.path.join(CHECKPOINT_PATH, name) for name in SAVE_NAMES
}

# Results dictionary
We set up for storing experiment results here. Just run the following block.

In [None]:
models = {name: None for name in SAVE_NAMES.keys()}
attacks = {
    'id': None,
    'fgsm': None,
    'pgd': None,
}

results_dic = {
    'model': [],
    'attack': [],
    'top_k': [],
    'accuracy': [],
}
results_trainer = L.Trainer(accelerator='auto', devices=1)

# Data processing
You can just run these three blocks of code. They import the CIFAR10 data from Torchvision and split them into train/validation/test sets.

We also takes a sample for later visualization purposes.

In [None]:
# Pretrained normalization based on https://discuss.pytorch.org/t/how-to-preprocess-input-for-pre-trained-networks/683
means, stds = [0.49139968, 0.48215827, 0.44653124], [0.24703233, 0.24348505, 0.26158768]
means, stds = np.array(means), np.array(stds)

In [None]:
import torchvision.transforms.v2 as transforms

def get_cifar_loaders(batch_size):
    # Transformations applied to images before passing them to the model
    transform = transforms.Compose(
        [
            # transforms.Resize(256),
            # transforms.CenterCrop(224),
            transforms.ToImage(), # Converts to tensor
            transforms.ToDtype(torch.float32, scale=True),
            transforms.Normalize(mean=means, std=stds)
        ])

    trainset = torchvision.datasets.CIFAR10(root=DATA_PATH, train=True,
                                            download=True, transform=transform)
    # The train set is of size 50000
    trainset, valset = torch.utils.data.random_split(trainset, [40000, 10000])
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                            shuffle=True, num_workers=2)
    valloader = torch.utils.data.DataLoader(valset, batch_size=batch_size,
                                            shuffle=False, num_workers=2)

    testset = torchvision.datasets.CIFAR10(root=DATA_PATH, train=False,
                                        download=True, transform=transform)
    testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                            shuffle=False, num_workers=2)
    
    return trainloader, valloader, testloader

In [None]:
trainloader, valloader, testloader = get_cifar_loaders(BATCH_SIZE)
sample_images, sample_labels = next(iter(trainloader))
sample_images, sample_labels = sample_images.to(device), sample_labels.to(device)

# Base Resnet Class
Here we've implemented a ResNet18 model in the Pytorch Lightning framework. Here is [Lightning's documentation](https://lightning.ai/docs/pytorch/stable/).

The main code to look at are ```__init__``` and ```training_step```. If you'd like to use Lightning on your own project, the other methods may be useful reference, but as always we defer to the documentation.

In [None]:
class LResnet(L.LightningModule):
    def __init__(self):
        super().__init__()
        # Set loss module
        self.loss_module = nn.CrossEntropyLoss()
        # Example input for visualizing the graph in Tensorboard
        # CIFAR-10 images are 32x32
        self.example_input_array = torch.zeros((1, 3, 32, 32), dtype=torch.float32)
        self.num_target_classes = 10
        # Accuracy metric for training logs and testing evaluation
        self.accuracy = Accuracy(task="multiclass", num_classes=self.num_target_classes, top_k=1)
        # Adversarial generation method for training
        self.adv_train_method = None

        # Load pretrained model weights
        self.model = torchvision.models.resnet18(
            weights=torchvision.models.ResNet18_Weights.IMAGENET1K_V1
        )
        # Change final layer from 1000 (ImageNet) classes to 10 (CIFAR-10) classes
        self.model.fc = nn.Linear(self.model.fc.in_features, self.num_target_classes)

    def forward(self, imgs):
        return self.model(imgs)

    def configure_optimizers(self):
        optimizer = optim.AdamW(self.parameters(), lr=1e-5, weight_decay=0.1)
        return [optimizer] # Lightning has enables multi-optimizer training, e.g. for GANs
    
    def training_step(self, batch, batch_idx):
        imgs, labels = batch
        if self.adv_train_method is not None:
            opt = self.optimizers()
            opt.zero_grad()
            # Change the images to adversarial examples
            imgs = self.adv_train_method(self.model, imgs, labels)
            # adv_train_method sets the model to eval
            self.model.train()
            # Reset accumulated gradients from adversarial generation
            opt.zero_grad()
        # Once we have the correct training images,
        # we can use the usual Lightning forward pass
        outputs = self.model(imgs)
        loss = self.loss_module(outputs, labels)
        acc = self.accuracy(outputs, labels)
        # Log accuracy and loss per-batch for Tensorboard
        self.log('train_acc', acc, on_step=False, on_epoch=True)
        self.log('train_loss', loss, prog_bar=True)
        return loss
    
    def validation_step(self, batch, batch_idx):
        imgs, labels = batch
        outputs = self.model(imgs)
        loss = self.loss_module(outputs, labels)
        self.log('val_loss', loss)
        # No need to return to call backward() on the loss
    
    def test_step(self, batch, batch_idx):
        imgs, labels = batch
        outputs = self.model(imgs)
        acc = self.accuracy(outputs, labels)
        self.log("test_acc", acc, prog_bar=True)
        # No need to return to call backward() on the loss

## Example training code
Run the following code block. It is an example of how to code a training loop with Lightning. If you change hyperparameters for your experiments later, you will need to comment at the end on the changes you've made.

In [None]:
save_key = 'baseline'
baseline_model = LResnet()
baseline_trainer = L.Trainer(
    default_root_dir = SAVE_NAMES[save_key], # Where to save the model
    accelerator='auto',
    devices=1,
    max_epochs=30,
    callbacks=[
        ModelCheckpoint( # Save the best model by validation loss
            dirpath=SAVE_NAMES[save_key],
            monitor='val_loss',
            save_top_k=1,
            mode='min',
            save_weights_only=True,
            every_n_epochs=1,
        ),
        EarlyStopping( # Stop training early if val_loss doesn't improve
            monitor='val_loss', 
            patience=3, 
            verbose=True, 
            mode='min',
        ),
        LearningRateMonitor('epoch') # Log learning rate each epoch
    ],
)

# These two lines are optional, but they make the Tensorboard logs look nicer
baseline_trainer.logger._log_graph = True  # If True, we plot the computation graph in tensorboard
baseline_trainer.logger._default_hp_metric = None  # Optional logging argument that we don't need

# This is all you need to train the model
baseline_trainer.fit(baseline_model, trainloader, valloader)
# Load best checkpoint after training
baseline_model = LResnet.load_from_checkpoint(
    baseline_trainer.checkpoint_callback.best_model_path
).to(device)

# Store the model in the dictionary
models[save_key] = baseline_model

# Adversarial attacks
Implement the FGSM and PGD attacks. These are white-box evasion attacks, and they were covered in class. Make sure that the final outputs are detached!

Once you finish this part and the previous one, you can head to the Experiments section to test your attacks on the baseline (pretrained) model.

In [None]:
# Used as a baseline
def id(model, imgs, labels):
    return imgs.detach()

def fgsm(model, imgs, labels):
    r"""
    Args:
        model (nn.Module): Model to attack, e.g. self.model in the LResnet definition.
        imgs (Tensor): Tensor of images. Size (BATCH_SIZE, C, H, W). Normalized according to means, stds.
        labels (Tensor): Tensor of labels. Size (BATCH_SIZE,). Each element is an integer in [0, NUM_CLASSES).
    Returns:
        adv_imgs (Tensor): Adversarial images. Same dimensions and normalization as imgs. Detached.
            Each adversarial image in the batch is L_infinity distance at most eps away from the original image.
            Images generated by the Fast Gradient Sign Method (FGSM).
    """
    eps = 8/255 # Maximum perturbation
    model.eval()
    loss_to_use = nn.CrossEntropyLoss()
    # YOUR CODE HERE
    pass

def pgd(model, imgs, labels):
    r"""
    Args:
        model (nn.Module): Model to attack, e.g. self.model in the LResnet definition.
        imgs (Tensor): Tensor of images. Size (BATCH_SIZE, C, H, W). Normalized according to means, stds.
        labels (Tensor): Tensor of labels. Size (BATCH_SIZE,). Each element is an integer in [0, NUM_CLASSES).
    Returns:
        adv_imgs (Tensor): Adversarial images. Same dimensions and normalization as imgs. Detached.
            Each adversarial image in the batch is L_infinity distance at most eps away from the original image.
            Images generated by the Projected Gradient Descent (PGD)
    """
    iters = 20 # Number of steps in PGD
    eps = 8/255 # Maximum perturbation
    alpha = 2/255 # Step size
    model.eval()
    loss_to_use = nn.CrossEntropyLoss()
    # YOUR CODE HERE
    pass

attacks['id'] = id
attacks['fgsm'] = fgsm
attacks['pgd'] = pgd

Refer to the slides for Lecture 08 on Adversarial Evasion, page 26, on FGSM. Compare and contrast the two variants shown. Under what circumstances would you choose the one method over the other?

ANSWER:

# Adversarial Defenses

## Adversarial Training
Implement the training loop for an adversarially trained model using PGD as the adversarial example generation method.

Your code should look very similar to the baseline example above. Be sure to save your model in the right place and to store your model in the ```models``` dictionary. You can adjust ```max_epochs``` (although early stopping should handle the cases you'd want to) or any other hyperparameters if you'd like. You will need to comment at the end on any changes you've made.

In [None]:
save_key = 'adv_train'
# YOUR CODE HERE


# Store the model in the dictionary
models[save_key] = ...

## SAP
### Function implementation
Implement a function that applies [Stochastic Activation Pruning](https://arxiv.org/pdf/1803.01442.pdf) (SAP) to a Tensor. Also read the description from the [Obfuscated Gradients](https://arxiv.org/pdf/1802.00420.pdf) paper (SAP is described in Section 5.3.1).

Roughly, the algorithm keeps each activation from the previous layer (or, generally, Module) with probability proportional to its absolute value, making this choice independently for each activation, and rescales the kept activations so that the average total activation is not changed.

That is:
1. Let the activation being passed in (from a single image, i.e. assuming batch size 1) be $act$.
2. Let $p$ be the same shape as the feature $act$, with values proportional to $|act|$ (absolute value applied element-wise) and sum 1.
3. Let $N$ be the number of entries in the feature. Draw $N$ times *with replacement* from the entries with probability mass function $p$. Set the selected entries to 1 and the remaining entries to 0 in a Tensor $m$ of the same shape as $p$ (and therefore $act$).
4. Apply the mask to get $act \circ m$ (element-wise multiplication). Divide each entry by the probability of keeping that entry (i.e. having corresponding 1 in $m$). Return the result.

Now, the above method runs very slowly. Here's another approach that the authors of Obfuscated Gradients actually use instead:  
- Essentially, if we leave each entry with the same probability of being selected as in the original SAP method, but assume we choose whether or not to keep each entry independently (instead of drawing with replacement from all the entries many times), we get a much faster filter. Specifically, once we get $p$ and $N$, the probability of keeping entry $j$ is $q:=1-e^{-Np_j}$. Consider it an exercise to prove that this is the case :)
- For the reason from the "Bonus" part at the end of this assignment, the authors of Obfuscated Gradients use probability $1-e^{-2Np_j}.$ Do this as well.
- Normalization is easier because $q$ is records precisely the probability of keeping each entry.
- The time-save is mostly in vectorization.

You may use either approach, although the latter is *much* faster.

Read the above papers for more details. You may also find [Erratum](https://arxiv.org/abs/2010.00071) interesting.

In [None]:
def sap(act):
    r"""
    Args:
        act (Tensor): Tensor of activations of shape (K, C, H, W), where K is the batch size.
        The values of C, H, W depend on the layer.
    Returns:
        Tensor of the same shape as act, masked and rescaled according to the SAP method.
    """
    # YOUR CODE HERE
    pass

Closely study Algorithm 1 from the SAP paper. Line 8 draws from a categorical distribution with probabilities $p^i$. Explain what this means: if a neuron has $p_j = 0.3$, what happens over $r^i$ samples? Explain your answer intuitively. Why do you think we need line 13 in Algorithm 1?

ANSWER:

### Adjusted Model
The change you need to make to apply the defense to a ResNet model is simple: simply replace each Conv2d module with a very similar module that applies SAP immediately after convolution. Run the next block and complete the one after that.

In [None]:
class SAP_Conv2d(nn.Conv2d):
    def __init__(
            self,
            in_channels,
            out_channels,
            kernel_size,
            stride=1,
            padding=0,
            groups=1,
            bias=True,
            dilation=1,
    ):
        super().__init__(in_channels, out_channels, kernel_size, stride,
                         padding, dilation, groups, bias)
        
    # This is the important part
    def _conv_forward(self, input, weight, bias):
        act = super()._conv_forward(input, weight, bias)
        masked_act = sap(act)
        return masked_act

In [None]:
# Transforms LResnet to use SAP_Conv2d instead of nn.Conv2d
def to_sap_conv(model):
    r"""
    Args:
        model (LResnet): Model to modify.
    Returns:
        None. The model is modified in place.
        EVERY nn.Conv2d layer is replaced with SAP_Conv2d.
    """
    # YOUR CODE HERE
    # Consider using recursion on `model.model.modules()` –> iterates through its submodules
    pass

### Training
Train an LResnet defended by SAP.

Your code should look very similar to the baseline example above. Be sure to save your model in the right place and to store your model in the ```models``` dictionary. You can adjust ```max_epochs``` (although early stopping should handle the cases you'd want to) or any other hyperparameters if you'd like. You will need to comment at the end on any changes you've made.

In [None]:
save_key = 'SAP_conv'
# YOUR CODE HERE


# Store the model in the dictionary
models[save_key] = ...

# Evaluation

## Methods
These two functions help us modularize the experiments we run. Complete ```eval_attack``` to compute the accuracy of each model (baseline, adversarially trained, SAP) on images. We take every batch in ```loader```, apply ```attack_method``` to the batch, and check the accuracy of ```model``` in predicting the class of each adversarial image. Output a Float between 0 and 1.

```top_k``` describes how we determine accuracy. For example, ```top_k=2``` means if the model predicts the correct class within its two highest-scoring classes, it's counted as correct.

Complete the next code block and just run the one after that.

In [None]:
def eval_attack(model, attack_method, loader, top_k, max_batches=0):
    r"""
    Args:
        model (LResnet): Model to attack.
        attack_method (function): Adversarial generation method. One of id, fgsm, pgd.
        loader (DataLoader): Data loader for the dataset to evaluate on.
        top_k (int): The number of top predictions to check for correctness.
        max_batches (int): Maximum number of batches to evaluate. If 0, evaluate on the entire dataloader.
    Returns:
        float: Accuracy of the model on the (adversarially perturbed) dataset.
    """
    # YOUR CODE HERE
    pass

In [None]:
def run_experiment(model, attack, top_k=1, max_batches=0):
    # If we're re-running an experiment, remove the old results
    for i in range(len(results_dic['model'])):
        if results_dic['model'][i] == model and results_dic['attack'][i] == attack and results_dic['top_k'][i] == top_k:
            results_dic['model'].pop(i)
            results_dic['attack'].pop(i)
            results_dic['top_k'].pop(i)
            results_dic['accuracy'].pop(i)
            break
    # Run the experiment
    acc = eval_attack(
        models[model], 
        attacks[attack], 
        testloader, 
        top_k=top_k, 
        max_batches=max_batches
    )
    # Store the results
    results_dic['model'].append(model)
    results_dic['attack'].append(attack)
    results_dic['top_k'].append(top_k)
    results_dic['accuracy'].append(acc)

## Experiments

### Baseline
The following code runs experiments with all three attacks (including the baseline identity) on the baseline model. Feel free to adjust the parameters or code how you'd like. You will need to comment later on any adjustments you've made.

In [None]:
for attack_method in ['id', 'fgsm', 'pgd']:
    print(f"Running experiment baseline with attack {attack_method}...")
    mb = 0
    # I've found 100 batches about matches the time of the other attacks' experiments
    if attack_method == 'pgd':
        mb = 100
    run_experiment('baseline', attack_method, max_batches=mb)

### Adversarially trained
Run the same experiments on the adversarially trained model. You should be able to use very similar code.

In [None]:
# YOUR CODE HERE


### SAP
Run the same experiments on the model defended by SAP. You should be able to use very similar code.

In [None]:
# YOUR CODE HERE


Play around with the `top_k` value in `run_experiment` (which is passed down to `eval_attack`). What is the largest `top_k`, from your results, such that the baseline performs at a similar level as the SAP-defended model kept with `top_k=1`. 

Note that this value of `top_k` that offers the baseline as much flexibility as possible, but is also an important indication of how incorrect the baseline will become if attacked. It is also an indication of how much more robust your SAP-defended model is compared to the baseline if it's able to do well with just `top_k=1`.

ANSWER:

### Display results
We've already stored the results in a dictionary. Let's put them in a Pandas DataFrame to make them nicer to look at. Export your results to a CSV to save them.

It might take some manual work, but if you run any training loop more than once you should probably keep track, e.g. in a spreadsheet or in file names, of which one is which. In particular, always ensure you will know which model is the most recently trained: even better, ensure you'll still know in a month or more.

In [None]:
df_results = pd.DataFrame(results_dic)
df_results

In [None]:
# YOUR CODE HERE

### Tensorboard
Use the cell below to open a Tensorboard session, and check out the train accuracy/loss and validation loss over the training period. Take screenshots or export images of some salient graphs. Briefly describe what you notice. See [the documentation](https://colab.research.google.com/github/tensorflow/tensorboard/blob/master/docs/tensorboard_in_notebooks.ipynb) to find how to use Tensorboard with Google Colab.

In [None]:
%load_ext tensorboard

(YOUR ANSWER HERE)

### Final question
Comment on your results and any adjustments you've made to the experiments. What did you expect? What met or differed from your expectations? How would you compare the attacks? How would you compare the defenses (in raw performance? in performance against adversarial examples? in training time?)?

(YOUR ANSWER HERE)

### Bonus
Technically, because SAP is stochastic, the authors average the outputs of 100 runs. Try implementing this. How does the model's regular performance change? How does its performance against adversarial attacks change?