<a href="https://colab.research.google.com/github/GiuliaLanzillotta/exercises/blob/master/Adversarial_attacks_on_MNIST.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Adversarial attacks on MNIST

Today we're going to do 2 things: 
- Traine a naive CNN on MNIST
- Attack it with different techniques

Let's start!



In [None]:
# uncomment if not already installed  
#!pip install tensorboardX

In [2]:
# Imports + constants
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import time
from torchvision import datasets, transforms

use_cuda = False
device = torch.device("cuda" if use_cuda else "cpu")
batch_size = 64

import matplotlib.pyplot as plt

# Reproducibility 
# Notice that complete reproducibility is not guaranteed anyway 
# (for example due to the non perfect associativity of floating point addition)
# Look here for more : https://pytorch.org/docs/stable/notes/randomness.html
np.random.seed(42)
torch.manual_seed(42)

<torch._C.Generator at 0x7f2d7d8685e8>

## MNIST classifier

### The architecture

In [3]:
class Net(nn.Module):
  """2 layers feed-forward classifier for MNIST images"""
  def __init__(self):
        super(Net, self).__init__()
        self.fc = nn.Linear(28*28, 200)
        self.fc2 = nn.Linear(200,10)

  def forward(self, x):
      x = x.view((-1, 28*28))
      x = F.relu(self.fc(x))
      x = self.fc2(x)
      return x

In [5]:
class ConvNet(nn.Module):
  """Pretty basic CNN classifier for MNIST images."""
  def __init__(self):
      # We'll use 6 convolutional layers with decreasing convolution window 
      # and increasing number of channels 
      # + ReLU after each layer 
      # + batch normalization
      # + dropout and 2 fully connected layers as a classification head 
      super(ConvNet, self).__init__()
      self.conv1 = nn.Conv2d(1, 32, kernel_size=(5, 5))
      self.bn1 = nn.BatchNorm2d(32)
      self.conv2 = nn.Conv2d(32, 32, kernel_size=(5, 5))
      self.bn2 = nn.BatchNorm2d(32)
      self.conv3 = nn.Conv2d(32, 64, kernel_size=(3, 3))
      self.bn3 = nn.BatchNorm2d(64)
      self.conv4 = nn.Conv2d(64, 64, kernel_size=(3, 3))
      self.bn4 = nn.BatchNorm2d(64)
      self.conv5 = nn.Conv2d(64, 128, kernel_size=(3, 3))
      self.bn5 = nn.BatchNorm2d(128)
      self.conv6 = nn.Conv2d(128, 128, kernel_size=(1, 1))
      self.bn6 = nn.BatchNorm2d(128)
      self.conv2_drop = nn.Dropout2d(p=0.2)
      self.fc1 = nn.Linear(128, 100)
      self.fc2 = nn.Linear(100, 10)

  def forward(self, x):
      ##  CONVOLUTIONAL LAYERS 
      x = F.relu(self.conv1(x))
      x = self.bn1(x)
      x = F.relu(self.conv2(x))
      x = self.conv2_drop(F.max_pool2d(self.bn2(x), 2))
      x = F.relu(self.conv3(x))
      x = self.bn3(x)
      x = F.relu(self.conv4(x))
      x = self.bn4(x)
      x = F.max_pool2d(x, 2)
      x = self.conv2_drop(x)
      x = F.relu(self.conv5(x))
      x = self.bn5(x)
      x = F.relu(self.conv6(x))
      x = self.bn6(x)
      ## CLASSIFICATION HEAD 
      size = x.size()[1] * x.size()[2] * x.size()[3]
      # flattening 
      x = x.view(-1, size)
      x = F.relu(self.fc1(x))
      x = self.fc2(x)
      return x


### The data 

In [None]:
# Here do 2 things: 
# 1. Download the MNIST dataset (already divided into train and test)
# 2. normalize the input s.t. we have a certain mean and sd (note that the provided
# mean and sd are the empirical ones from the data)
train_dataset = datasets.MNIST('mnist_data/', train=True, download=True, 
                               transform=transforms.Compose([transforms.ToTensor(), 
                                                             transforms.Normalize((0.1307,), (0.3081,))]))
test_dataset = datasets.MNIST('mnist_data/', train=False, download=True, 
                              transform=transforms.Compose([transforms.ToTensor(), 
                                                            transforms.Normalize((0.1307,), (0.3081,))]))

In [12]:
# Use a DataLoader to avoid iterating through the data yourself
# Notice the batch_size=64 that we defined above 
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

### Training 

In [20]:
# (This has any effect only if the model is not already there)
model = ConvNet().to(device)

#This has any effect only on certain modules
# (e.g. Dropout, BatchNorm) which behave differently 
# in train and test mode.
model.train()

ConvNet(
  (conv1): Conv2d(1, 32, kernel_size=(5, 5), stride=(1, 1))
  (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(32, 32, kernel_size=(5, 5), stride=(1, 1))
  (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
  (bn3): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv4): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1))
  (bn4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1))
  (bn5): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv6): Conv2d(128, 128, kernel_size=(1, 1), stride=(1, 1))
  (bn6): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2_drop): Dropout2d(p=0.2, inplace=False)
  (fc1): Linear(in_features=128, out_fe

In [21]:
learning_rate = 0.0001
num_epochs = 5
# Optimizers:
""" SGD vs Adam """
#opt = optim.SGD(params=model.parameters(), lr=learning_rate)
opt = optim.Adam(params=model.parameters(), lr=learning_rate)
# Loss: 
ce_loss = torch.nn.CrossEntropyLoss()

In [22]:
tot_steps = 0
for epoch in range(1,num_epochs+1):
  print("-------------- Epoch "+str(epoch)+" -------------")
  t1 = time.time()
  for batch_idx, (x_batch, y_batch) in enumerate(train_loader):
    x_batch, y_batch = x_batch.to(device), y_batch.to(device)
    tot_steps += 1
    opt.zero_grad()
    out = model(x_batch)
    batch_loss = ce_loss(out, y_batch)

    # show accuracy every 100 steps
    if batch_idx % 100 == 0:
      pred = torch.max(out, dim=1)[1] # predictions
      acc = pred.eq(y_batch).sum().item() / float(batch_size) # accuracy
      print("Batch "+str(batch_idx)+": "+ str(acc))

    batch_loss.backward()
    opt.step() 
  t2 = time.time()
  print("Time = %.2lf seconds"%(t2-t1))

-------------- Epoch 1 -------------
Batch 0: 0.15625
Batch 100: 0.703125
Batch 200: 0.953125
Batch 300: 0.921875
Batch 400: 0.921875
Batch 500: 0.96875
Batch 600: 0.96875
Batch 700: 0.96875
Batch 800: 0.96875
Batch 900: 1.0
Time = 172.04 seconds
-------------- Epoch 2 -------------
Batch 0: 1.0
Batch 100: 0.953125
Batch 200: 0.953125
Batch 300: 0.984375
Batch 400: 0.984375
Batch 500: 0.96875
Batch 600: 0.984375
Batch 700: 0.984375
Batch 800: 0.96875
Batch 900: 1.0
Time = 172.28 seconds
-------------- Epoch 3 -------------
Batch 0: 0.984375
Batch 100: 1.0
Batch 200: 0.984375
Batch 300: 0.96875
Batch 400: 1.0
Batch 500: 1.0
Batch 600: 0.96875
Batch 700: 0.984375
Batch 800: 0.96875
Batch 900: 1.0
Time = 172.63 seconds
-------------- Epoch 4 -------------
Batch 0: 1.0
Batch 100: 0.984375
Batch 200: 1.0
Batch 300: 0.984375
Batch 400: 1.0
Batch 500: 0.984375
Batch 600: 0.984375
Batch 700: 1.0
Batch 800: 0.984375
Batch 900: 0.96875
Time = 172.82 seconds
-------------- Epoch 5 -------------
B

In [23]:
# Evaluate on the test set 
tot_test, tot_acc = 0.0, 0.0
for batch_idx, (x_batch, y_batch) in enumerate(test_loader):
    x_batch, y_batch = x_batch.to(device), y_batch.to(device)
    out = model(x_batch)
    pred = torch.max(out, dim=1)[1]
    acc = pred.eq(y_batch).sum().item()
    tot_acc += acc
    tot_test += x_batch.size()[0]
acc = tot_acc/tot_test
print('Accuracy %.5lf'%(acc))

Accuracy 0.98870


Here I'll save the result of some trials:

    1) Net + Adam x 5 epochs = 0.9575 test accuracy / time x epoch around 11 s
    2) ConvNet + Adam x 5 epochs = 0.98870 test accuracy/ time x epoch around 170 s

Notice that this result are obtained by training on CPU

## Attacks

In [31]:
# We first load the test dataset again. 
# Notice that this time we don't want to normalize the input right away 
# (as we want to be able to search for adversarial examples in the original 
# image domain)
test_dataset = datasets.MNIST('mnist_data/', train=False, download=True, 
                              transform=transforms.Compose([transforms.ToTensor()]))
# In order to be able for our trained model to function with this un-normalized
# input we need to insert a normalization layer first 
class Normalize(nn.Module):
    def forward(self, x):
        return (x - 0.1307)/0.3081
model = nn.Sequential(Normalize(), model)
# and here we also create a version of the model that outputs the class probabilities
model_to_prob = nn.Sequential(model, nn.Softmax())
# we put the neural net into evaluation mode (this disables features like dropout)
model.eval()
model_to_prob.eval()

Sequential(
  (0): Sequential(
    (0): Normalize()
    (1): ConvNet(
      (conv1): Conv2d(1, 32, kernel_size=(5, 5), stride=(1, 1))
      (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(32, 32, kernel_size=(5, 5), stride=(1, 1))
      (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
      (bn3): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv4): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1))
      (bn4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1))
      (bn5): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv6): Conv2d(128, 128, kernel_size=(1, 1), stride=(1, 1))
      (bn6): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, trac

#### The attacks 
We're now going to implement 4 different attacks (actually 2, each one of them in its targeted and untargeted version). 

1) **FGSM** : fast gradient sign method. One shot algorithm, meaning that the output is obtained in only one step. 

2) **PGD** : projected gradient descent (method) - an iterative evolution of the previous one


In [77]:
def fgsm_targeted(model, x, target, eps):
    # The idea here is to minimise the model's loss 
    # with respect to the target label 

    # tell pytorch to track the gradients wrt input x
    x.requires_grad = True 
    # loss here 
    loss = ce_loss(model(x), torch.tensor([target], dtype=torch.long))
    # gradient of the loss 
    loss.backward() 
    # get adversarial example 
    x_adv = x - eps*torch.sign(x.grad)
    return x_adv

def fgsm_untargeted(model, x, label, eps):
    # The idea here is to maximise the model's loss wrt 
    # the original label ('label')

    # tell pytorch to track the gradients wrt input x
    x.requires_grad = True 
    # loss here 
    loss = ce_loss(model(x), torch.tensor([label], dtype=torch.long))
    # gradient of the loss 
    loss.backward() 
    # get adversarial example 
    x_adv = x + eps*torch.sign(x.grad)

    return x_adv

def pgd_targeted(model, x, target, k, eps, eps_step):
    # The idea here is to search for adversarial example 
    # inside a ball around the original example x 
    # As projection is easier in this case, we use an L-infinity 
    # ball of radius eps 
    # Moreover we restrict the outputs to the [0,1]^n box 
    x.requires_grad = True 
    # random initialization 
    eta = torch.rand(size=x.size())*2*eps - eps
    x = x + eta
    # iteration 
    for steps in range(1,k+1):
      loss = ce_loss(model(x), torch.tensor([target], dtype=torch.long))
      loss.backward() 
      # projecting the step
      delta = eps_step*torch.sign(x.grad)
      delta = torch.clamp(delta, min=-1*eps, max=eps)  
      # taking the step    
      x = x - delta
      # projecting the output to the [0,1] box
      x = torch.clamp(x, min=0, max=1)
      # check if we have an adversarial example 
      out = model(x)
      pred = torch.max(out, dim=1)
      if pred==target: return x

    return x

def pgd_untargeted(model, x, label, k, eps, eps_step):
  # Same as above with the only difference that we're now 
  # maximising the loss wrt the correct label 
  x.requires_grad = True 
  # random initialization 
  eta = torch.rand(size=x.size())*2*eps - eps
  x = x + eta
  # iteration 
  for steps in range(1,k+1):
    loss = ce_loss(model(x), torch.tensor([label], dtype=torch.long))
    loss.backward() 
    # projecting the step
    delta = eps_step*torch.sign(x.grad)
    delta = torch.clamp(delta, min=-1*eps, max=eps)  
    # taking the step    
    x = x + delta
    # projecting the output to the [0,1] box
    x = torch.clamp(x, min=0, max=1)
    # check if we have an adversarial example 
    out = model(x)
    pred = torch.max(out, dim=1)
    if pred!=label: return x

  return x

In [78]:
# define a show function that displays the original image together with the 
# adversarial example and the model predictions
def show(original, adv, model_to_prob):
    p0 = model_to_prob(original).detach().numpy()
    p1 = model_to_prob(adv).detach().numpy()
    f, axarr = plt.subplots(1,2)
    axarr[0].imshow(original.detach().numpy().reshape(28, 28), cmap='gray')
    axarr[0].set_title("Original, class: " + str(p0.argmax()))
    axarr[1].imshow(adv.detach().numpy().reshape(28, 28), cmap='gray')
    axarr[1].set_title("Original, class: " + str(p1.argmax()))
    print("Class\t\tOrig\tAdv")
    for i in range(10):
        print("Class {}:\t{:.2f}\t{:.2f}".format(i, float(p0[:, i]), float(p1[:, i])))

In [79]:
# try out our attacks
original = torch.unsqueeze(test_dataset[0][0], dim=0)
adv = pgd_untargeted(model, original, 7, 10, 0.08, 0.05)
show(original, adv, model_to_prob)

TypeError: ignored

In [68]:
original.shape

torch.Size([1, 1, 28, 28])

In [70]:
x = original
eps = 0.05
opt.zero_grad()
x.requires_grad = True 
# random initialization 
eta = torch.rand(size=x.size())*2*eps - eps
x = x + eta
# iteration 


In [71]:
label = 7
loss = ce_loss(model(x), torch.tensor([label], dtype=torch.long))
loss.backward() 

eps_step = 0.008


In [73]:
x.retain_grad

<bound method Tensor.retain_grad of tensor([[[[ 4.4875e-02,  9.4602e-03, -1.6144e-02, -3.2048e-02,  2.4450e-02,
            3.7579e-02,  4.6515e-02, -4.3480e-02, -1.4027e-02,  2.4049e-02,
           -1.4525e-02, -2.5734e-03, -9.7045e-03,  3.2777e-02, -9.0880e-03,
           -3.0407e-02,  4.0199e-02,  1.9781e-02,  1.6227e-03, -4.7843e-02,
            3.3347e-02,  2.7913e-02,  1.1315e-02,  3.6405e-02, -1.5163e-02,
            2.3544e-02, -4.4526e-02,  4.3570e-02],
          [-2.5711e-02,  4.0584e-02,  3.3374e-02,  3.4601e-03, -4.2974e-02,
           -3.5979e-02, -3.8622e-02, -7.1656e-03, -3.2187e-02,  3.4314e-02,
            4.9540e-02, -1.6837e-03, -4.4790e-02, -2.4792e-02, -2.2903e-02,
           -4.1168e-02, -6.6185e-03,  3.4845e-03,  4.4103e-02, -1.4742e-02,
           -1.6126e-02,  3.1369e-02, -3.5286e-02, -3.9850e-02, -4.2428e-02,
           -4.9056e-02,  2.1846e-03, -2.2378e-02],
          [-2.0936e-02,  2.6329e-02,  4.5960e-02, -2.5477e-02, -1.7220e-03,
            3.2488e-02,  2

In [46]:

# projecting the step
delta = eps_step*torch.sign(x.grad)
delta = torch.clamp(delta, min=-1*eps, max=eps)  
# taking the step    
x = x + delta
# projecting the output to the [0,1] box
x = torch.clamp(x, min=0, max=1)
# check if we have an adversarial example 
out = model(x)
pred = torch.max(out, dim=1)
if pred!=label: return x

  This is separate from the ipykernel package so we can avoid doing imports until


TypeError: ignored