# Robust methods for Machine Learning

## Defenses against adversarial attacks

#### Tutorial #3 (Anne Gagneux)

In [None]:
# imports
from PIL import Image
import torch
import numpy as np
import matplotlib.pyplot as plt
import json

import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.autograd import Variable
from torchvision.models import resnet50, ResNet50_Weights
from torchvision import datasets, transforms
from torchvision.transforms import ToTensor

from tqdm import trange


if torch.cuda.is_available():
    device = torch.device("cuda")
    print("GPU is available and being used")
else:
    device = torch.device("cpu")
    print("GPU is not available, using CPU instead")

In [None]:
def display_attack(model, image, image_adv):
    pred_orig = model(image)
    pred_adv = model(image_adv)
    fig, axs = plt.subplots(1, 2, figsize=(16, 6))
    axs[0].set_title('Prediction orig: %s. Confidence orig: %d' %
                     (pred_orig.argmax().item(), pred_orig.max()*100),
                     fontsize=12)
    axs[0].imshow((image).reshape((28, 28)).detach().numpy(),
                  cmap=plt.cm.gray_r, vmin=0, vmax=max(min(255, image.max()), 1))
    axs[1].set_title('Prediction attack: %s. Confidence attack: %d ' %
                     (pred_adv.argmax().item(), pred_adv.max()*100),
                     fontsize=12)
    axs[1].imshow((image_adv).detach().numpy().reshape((28, 28)),
                  cmap=plt.cm.gray_r, vmin=0, vmax=max(min(255, image.max()), 1))
    plt.axis('off')

### A last adversarial attack for the road

#### DeepFool: back to the linear model

Remember the linear model $h(x) = w^T x + b$.
For a given image $x_1$, the orthogonal projection problem writes as:
$$\min_x \Vert x-x_1 \Vert_2 \text{ subject to } w^Tx+ b = 0$$

We had the projection:
\begin{equation}
 \mathbf x^* = \mathbf x_1 - \frac{w^T \mathbf x_1 + b}{\Vert w \Vert^2} w
\end{equation}

What if now we do not have a linear model but a deep neural network instead $h_\theta$.
We can just linearize it ! 

*Recall : Taylor's expansion* 

\begin{align} 
h_\theta(\mathbf x) & \approx h_\theta(\mathbf x_1) + \nabla_{\mathbf x}  h_\theta (\mathbf x_1)^T (\mathbf x - \mathbf x_1) \\ 
&  = \underbrace{\nabla_{\mathbf x}  h_\theta (\mathbf x_1)^T}_{w^T} \mathbf x + \underbrace{(h_\theta(\mathbf x_1)  -\nabla_{\mathbf x}  h_\theta (\mathbf x_1)^T \mathbf x_1)}_{b}
\end{align}

We can write a new "projection"-like perturbed image as:

\begin{align}
 \mathbf x^* & = \mathbf x_1 - \frac{\nabla_{\mathbf x}  h_\theta (\mathbf x_1)^T \mathbf x_1 + (h_\theta(\mathbf x_1)  -\nabla_{\mathbf x}   h_\theta (\mathbf x_1)^T \mathbf x_1)}{\Vert \nabla_{\mathbf x}  h_\theta (\mathbf x_1) \Vert^2} \nabla_{\mathbf x}  h_\theta (\mathbf x_1) \\
 &=  \mathbf x_1 - \frac{ h_\theta(\mathbf x_1)  }{\Vert \nabla_{\mathbf x}  h_\theta (\mathbf x_1) \Vert^2} \nabla_{\mathbf x}  h_\theta (\mathbf x_1)
\end{align}



![](figures/decision-boundary.jpg)



In [None]:
# on Colab
# %matplotlib inline
# img = plt.imread('/content/decision-boundary.jpg')
# plt.imshow(img)
# plt.axis('off')
# plt.show()

In [None]:
def deepfool_binary(model, X, y, overshoot=0.2, max_iters=10):
    image = X.float().detach().numpy().copy().reshape((1, 28, 28))
    current_x = Variable(torch.from_numpy(image), requires_grad=True)
    pred = model(current_x)
    n_iter = 0
    delta = np.zeros_like(image)
    while pred.argmax() == y and n_iter <= max_iters:
        pred[0, y].backward(retain_graph=True) 
        grad = current_x.grad.data.detach().numpy().copy()
        delta -= # TO COMPLETE
        current_x.grad.data.zero_()
        current_x = Variable(torch.from_numpy(
            image + (1+overshoot) * delta), requires_grad=True)
        pred = model(current_x)
    x_adv = torch.from_numpy()# TO COMPLETE
    return x_adv.reshape(X.shape)


def fgsm(model, X, y, epsilon):
    delta = torch.zeros_like(X, requires_grad=True)
    criterion = nn.CrossEntropyLoss()
    pred = model(X+delta)
    loss = criterion(pred, y)
    loss.backward()
    x_adv = X + epsilon * torch.sign(delta.grad.detach())
    return x_adv

##### Train a Fully Connected Network on  binary MNIST

In [None]:
# load MNIST
train_data = datasets.MNIST(
    root='data',
    train=True,
    transform=ToTensor(),
    download=True,
)
test_data = datasets.MNIST(
    root='data',
    train=False,
    transform=ToTensor()
)
# Only keep 3 and 7
train_idx = (train_data.targets == 0) + (train_data.targets == 1)

train_data.data = train_data.data[train_idx]
train_data.targets = train_data.targets[train_idx]

test_idx = (test_data.targets == 0) + (test_data.targets == 1)
test_data.data = test_data.data[test_idx]
test_data.targets = test_data.targets[test_idx]

BATCH_SIZE = 32
# data loader
train_loader = torch.utils.data.DataLoader(
    train_data, batch_size=BATCH_SIZE, shuffle=False)
test_loader = torch.utils.data.DataLoader(
    test_data, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
class FC(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(784, 16)
        self.fc2 = nn.Linear(16, 2)

    def forward(self, x):
        x = nn.Flatten()(x)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
def train(model, train_loader):
    optimizer = torch.optim.SGD(model.parameters(), lr=0.0005)
    error = nn.CrossEntropyLoss()
    EPOCHS = 2
    model.train()
    for epoch in range(EPOCHS):
        correct = 0
        for batch_idx, (X_batch, y_batch) in enumerate(train_loader):
            var_X_batch = Variable(X_batch).float()
            var_y_batch = Variable(y_batch)
            optimizer.zero_grad()
            output = model(var_X_batch)
            loss = error(output, var_y_batch)
            loss.backward()
            optimizer.step()
            _, predicted = torch.max(output.data, axis=1)
            correct += (predicted == var_y_batch).sum()
            if batch_idx % 50 == 0:
                print('Epoch : {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\t Accuracy:{:.3f}%'.format(
                    epoch, batch_idx*len(X_batch), len(train_loader.dataset), 100.*batch_idx / len(train_loader), loss.data, float(correct*100) / float(BATCH_SIZE*(batch_idx+1))))


def evaluate(model, test_loader):
    correct = 0
    for test_imgs, test_labels in test_loader:
        test_imgs = Variable(test_imgs).float()
        output = model(test_imgs)
        _, predicted = torch.max(output.data, 1)
        correct += (predicted == test_labels).sum()
    print("Test accuracy:{:.3f}%".format(
        float(correct*100) / (len(test_loader)*BATCH_SIZE)))

In [None]:
torch.manual_seed(1)
fc = FC()
train(fc, train_loader)

In [None]:
evaluate(fc, test_loader)

In [None]:
image1 = (test_data.data[test_data.targets == 1][1])
image_adv = deepfool_binary(fc, image1, 1)

In [None]:
display_attack(fc, image1.float().reshape(1, 28, 28),
               image_adv.float().reshape(1, 28, 28))
print("Total variation pixels:", np.sum(np.abs((image_adv-image1).numpy())))

In [None]:
image1 = torch.tensor((test_data.data[test_data.targets == 1][1]).reshape(
    1, 1, 28, 28), dtype=torch.float)
image_adv = fgsm(fc, image1, torch.LongTensor([target]), 15)
display_attack(fc, image1.float().reshape(1, 28, 28),
               image_adv.float().reshape(1, 28, 28))
print("Total variation pixels:", np.sum(np.abs((image_adv-image1).numpy())))

#### Multi-class DeepFool

In [None]:
def deepfool(model, X, y, overshoot=0, max_iter=5):
    image = X.detach().numpy().copy()
    current_x = Variable(torch.from_numpy(image), requires_grad=True)
    preds = model(current_x)
    n_classes = preds.shape[-1]
    preds_true_class = preds[0, y]
    preds_true_class.backward(retain_graph=True)
    grad_true_class = current_x.grad.data.detach().numpy().copy()
    ratio = np.inf
    delta = np.zeros_like(X)
    n_iter = 0
    while preds.argmax() == y and n_iter <= max_iter:
        n_iter += 1
        for k in range(n_classes):
            if k != y:
                current_x.grad.data.zero_()
                preds_k = preds[0, k]
                preds_k.backward(retain_graph=True)
                grad_k = current_x.grad.data.detach().numpy().copy()
                diff_grads = grad_k-grad_true_class
                diff_preds = preds_true_class-preds_k
                new_ratio = np.abs(diff_preds.detach().numpy()) / \
                    np.linalg.norm(diff_grads)**2
                if new_ratio < ratio:
                    ratio = new_ratio
                    delta_iter = ratio * diff_grads
        delta += delta_iter
        current_x = Variable(torch.from_numpy(
            image + (1+overshoot) * delta), requires_grad=True)
        preds = model(current_x)
        preds_true_class = preds[0, y]
        preds_true_class.backward(retain_graph=True)
        grad_true_class = current_x.grad.data.detach().numpy().copy()

    x_adv = torch.from_numpy(image + (1+overshoot) * delta)
    return x_adv.reshape(X.shape)

#### Train a CNN on 10-classes MNIST

In [None]:
train_data = datasets.MNIST(
    root='data',
    train=True,
    transform=ToTensor(),
    download=True,
)
test_data = datasets.MNIST(
    root='data',
    train=False,
    transform=ToTensor()
)

BATCH_SIZE = 32
# data loader
train_loader = torch.utils.data.DataLoader(
    train_data, batch_size=BATCH_SIZE, shuffle=True)
test_loader = torch.utils.data.DataLoader(
    test_data, batch_size=BATCH_SIZE, shuffle=True)

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.cnn_layers = nn.Sequential(nn.Conv2d(1, 32, 3, padding=1), nn.ReLU(),
                                        nn.Conv2d(32, 32, 3, padding=1,
                                                  stride=2), nn.ReLU(),
                                        nn.Conv2d(
                                            32, 64, 3, padding=1), nn.ReLU(),
                                        nn.Conv2d(64, 64, 3, padding=1, stride=2), nn.ReLU())
        self.fc_layers = nn.Sequential(nn.Linear(7*7*64, 100), nn.ReLU(),
                                       nn.Linear(100, 10))

    def forward(self, x):
        x = self.cnn_layers(x)

        x = x.view(x.shape[0], -1)
        x = (self.fc_layers(x))
        return x

In [None]:
def train(model, train_loader, epochs=3):
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
    error = nn.CrossEntropyLoss()
    model.train()
    for epoch in range(epochs):
        correct = 0
        for batch_idx, (X_batch, y_batch) in enumerate(train_loader):

            var_X_batch = Variable(X_batch).float()
            var_y_batch = Variable(y_batch)
            optimizer.zero_grad()
            output = model(var_X_batch)
            loss = error(output, var_y_batch)
            loss.backward()
            optimizer.step()
            predicted = torch.max(output.data, 1)[1]
            correct += (predicted == var_y_batch).sum()
            if batch_idx % 50 == 0:
                print('Epoch : {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\t Accuracy:{:.3f}%'.format(
                    epoch, batch_idx*len(X_batch), len(train_loader.dataset), 100.*batch_idx / len(train_loader), loss.data, float(correct*100) / float(BATCH_SIZE*(batch_idx+1))))

In [None]:
torch.manual_seed(0)
cnn = CNN()
train(cnn, train_loader)

In [None]:
evaluate(cnn, test_loader)

In [None]:
images, labels = next(iter(train_loader))
for n, (image, target) in enumerate(zip(images, labels)):
    if n <= 10:
        image_adv = deepfool(cnn, image.reshape(
            (1, 1, 28, 28)),  torch.LongTensor([target]))

        if cnn(image_adv).argmax() != target:
            display_attack(cnn, image.reshape((1, 1, 28, 28)),
                           image_adv.reshape((1, 1, 28, 28)))

    else:
        break

In [None]:
images, labels = next(iter(train_loader))
for n, (image, target) in enumerate(zip(images, labels)):
    if n <= 10:
        image_adv = fgsm(cnn, image.reshape((1, 1, 28, 28)),
                         torch.LongTensor([target]), 0.2)

        if cnn(image_adv).argmax() != target:
            display_attack(cnn, image.reshape((1, 1, 28, 28)),
                           image_adv.reshape((1, 1, 28, 28)))

    else:
        break

### Adversarial training

Up to now, we have seen how an attacker could create adversarial examples which fool a classifier.
The goal of the attacker can write as follows:
$$ \max_{\delta \in \Delta} l (h_\theta (\mathbf x + \delta ), y) $$
where $\Delta$ is the set of allowed perturbations, $\mathbf x$ is the original image we want to attack, $\delta$ is the perturbation, $h_\theta$ denots the net trained with weights $\theta$, $l$ is the loss used for training and $y$ the ground-truth label.

To defend against these attacks, one way is to include adversarial examples into the training process.

- The usual training optimization problem writes as:
$$\min_\theta \frac{1}{N}\sum_{(\mathbf x,y) \in \mathcal D_N} l (h_\theta ( \mathbf x),y)$$

Which we solve with the following iterations:

$$ \theta_{t+1} = \theta_t - \eta \sum_{(\mathbf x,y) \in \mathcal B} \nabla_\theta l (h_\theta(\mathbf x), y) $$

- The *adversarial* training optimization problem writes as:

$$\min_\theta \frac{1}{N}\sum_{(\mathbf  x,y) \in \mathcal D_N}  \max_{\delta \in \Delta}  l (h_\theta ( \mathbf x + \delta),y)$$

$\max_{\delta}$ means that we want to anticipate the worst-case attack. 

The iterations **we would like to compute**:

$$ \theta_{t+1} = \theta_t - \eta \sum_{(\mathbf  x,y) \in \mathcal B} \nabla_\theta \left[ \max_{\delta \in \Delta} l (h_\theta( \mathbf x + \delta), y) \right]$$

**BUT how ?**

*Danskin's theorem* Under suitable conditions, one has:
$$ \nabla_\theta \left[ \max_{\delta \in \Delta} l (h_\theta(\mathbf x + \delta), y) \right] = \nabla_\theta \left[  l (h_\theta(\mathbf x + \delta^* ), y) \right] \quad  \text{where } \delta^* = \arg\max_{\delta \in \Delta} (h_\theta(x+\delta),y)$$

In other words (under suitable conditions), to evaluate the gradient of the supremum of a class of functions, one should simply evaluate the gradient of the function in the class that actually obtains the maximum (if one exists).
This means that if we want to train a robust network using stochastic gradient descent (SGD), we just need to train it maximally perturbed images. Sadly, we don't know how to find these images exactly. 

$\rightarrow$ The good news: we can try to approximate these "maximally" perturbed images with the ones we know how to compute (FGSM, projected gradient descent, DeepFool, etc) i


In [None]:
BATCH_SIZE = 100
# data loader
train_loader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
def projection_Linfty(x, epsilon):
    return x.clamp(-epsilon, epsilon)


def projection_L2(x, epsilon):
    x = x.cpu().detach().numpy().copy()
    x = x / np.maximum(np.linalg.norm(x, axis=(2, 3)) /
                       epsilon, 1)[:, :, None, None]
    return Variable(torch.from_numpy(x).to(device), requires_grad=True)


def pgd_Linfty(model, X, y, epsilon, n_steps=40, eta=0.01):
    """ Run projected gradient descent on the examples X with set of pertubations allowed in Linfty norm"""
    delta = torch.zeros_like(X, requires_grad=True)
    for k in range(n_steps):
        criterion = nn.CrossEntropyLoss()
        pred = model(X+delta)
        loss = criterion(pred, y)
        loss.backward()
        delta.data = projection_Linfty(
            delta.data + eta * delta.grad.detach().sign(), epsilon)
        delta.grad.zero_()  # do not forget to put the gradient back to 0 before the next step
    return X + delta.detach()


def pgd_L2(model, X, y, epsilon, n_steps=40, eta=0.1):
    """ Run projected gradient descent on the examples X with set of pertubations allowed in L2 norm"""
    delta = torch.zeros_like(X, requires_grad=True)
    for k in range(n_steps):
        criterion = nn.CrossEntropyLoss()
        pred = model(X+delta)
        loss = criterion(pred, y)
        loss.backward()
        grad = delta.grad.detach().cpu().numpy().copy()
        grad /= np.linalg.norm(grad, axis=(2, 3))[:, :, None, None]

        delta.data = projection_L2(
            delta.data + eta * torch.from_numpy(grad).to(device), epsilon)
        delta.grad.zero_()  # do not forget to put the gradient back to 0 before the next step
    return X + delta.detach()

In [None]:
def epoch(loader, model, opt=None):
    """Standard training/evaluation epoch over the dataset"""
    total_loss, total_err = 0., 0.
    for X, y in loader:
        X, y = X.to(device), y.to(device)
        yp = model(X)
        loss = nn.CrossEntropyLoss()(yp, y)
        if opt:
            opt.zero_grad()
            loss.backward()
            opt.step()

        total_err += (yp.max(dim=1)[1] != y).sum().item()
        total_loss += loss.item() * X.shape[0]
    return total_err / len(loader.dataset), total_loss / len(loader.dataset)


dict_of_attacks = {
    "deepfool": deepfool,
    "fgsm": fgsm,
    "pgd_linfty": pgd_Linfty,
    "pgd_l2": pgd_L2,
}


def epoch_adversarial(loader, model, attack, opt=None, **kwargs):
    """Adversarial training/evaluation epoch over the dataset"""
    total_loss, total_err = 0., 0.
    for X, y in loader:
        X, y = X.to(device), y.to(device)
        if attack == "deepfool":  # does not support batch
            model = model.cpu()
            X, y = X.cpu(), y.cpu()
            for i, (image, target) in enumerate(zip(X, y)):
                image_adv = dict_of_attacks[attack](model, image.reshape(
                    (1, 1, 28, 28)), target, **kwargs)
                yp = model(image_adv)
                loss = nn.CrossEntropyLoss()(yp, torch.LongTensor([target]))
                if opt:
                    opt.zero_grad()
                    loss.backward()
                    opt.step()

                total_err += (yp.max(dim=1)[1] != target).item()
                total_loss += loss.item()
        else:
            X_adv = dict_of_attacks[attack](model, X, y, **kwargs)
            yp = model(X_adv)
            loss = nn.CrossEntropyLoss()(yp, y)
            if opt:
                opt.zero_grad()
                loss.backward()
                opt.step()

            total_err += (yp.max(dim=1)[1] != y).sum().item()
            total_loss += loss.item() * X.shape[0]

    return total_err / len(loader.dataset), total_loss / len(loader.dataset)

Let's train a standart CNN and evaluate it under an attacked dataset.
What do you observe ? Is it robust ?

In [None]:
torch.manual_seed(1)
standartCNN = CNN()

if torch.cuda.is_available():
    standartCNN.cuda()

opt = optim.SGD(standartCNN.parameters(), lr=1e-1)
for t in range(2):
    train_err, train_loss = epoch(train_loader, standartCNN, opt)
    test_err, test_loss = epoch(test_loader, standartCNN)
    adv_err, adv_loss = epoch_adversarial(
        test_loader, standartCNN, "pgd_linfty", epsilon=0.1)
    if t == 4:
        for param_group in opt.param_groups:
            param_group["lr"] = 1e-2
    print(*("{:.6f}".format(i)
          for i in (train_err, test_err, adv_err)), sep="\t")

Let's train a robust CNN and evaluate it under an attacked dataset.
What do you observe ? Is it robust ?

In [None]:
torch.manual_seed(1)
robustCNN = CNN()

if torch.cuda.is_available():
    robustCNN.cuda()


opt = optim.SGD(robustCNN.parameters(), lr=1e-1)
for t in range(2):
    train_err, train_loss = # TO COMPLETE
    test_err, test_loss = # TO COMPLETE
    adv_err, adv_loss = # TO COMPLETE
    if t == 4:
        for param_group in opt.param_groups:
            param_group["lr"] = 1e-2
    print(*("{:.6f}".format(i)
          for i in (train_err, test_err, adv_err)), sep="\t")

Now, you can evaluate you robust CNN against various attacks ! 

In [None]:
print("FGSM: ", epoch_adversarial(
    test_loader, robustCNN, "fgsm", epsilon=0.2)[0])

In [None]:
print("FGSM: ", epoch_adversarial(
    test_loader, standartCNN, "fgsm", epsilon=0.2)[0])

In [None]:
print(r"PGD, projection $\ell_2$: ", epoch_adversarial(
    test_loader, robustCNN, "pgd_l2", epsilon=0.2)[0])

In [None]:
print(r"PGD, projection $\ell_2$: ", epoch_adversarial(
    test_loader, robustCNN, "pgd_l2", epsilon=0.4)[0])

In [None]:
print(r"DeepFool", epoch_adversarial(
    test_loader, robustCNN, "deepfool")[0])

In [None]:
print(r"PGD, projection $\ell_2$: ", epoch_adversarial(
    test_loader, robustCNN, "pgd_linfty", epsilon=0.4)[0])

Comment on you results.