# Set up the working environment
We make use of the DeepRobust package from https://github.com/DSE-MSU/DeepRobust to perform attacks


In [None]:
######################################################################
# Setup python environment and change the current working directory
######################################################################
!pip install torch torchvision
!pip install deeprobust # check https://github.com/DSE-MSU/DeepRobust, provides easy
# tools for attacking and training
!pip install matplotlib
%mkdir -p /content/csc413/project/
%cd /content/csc413/project

# Access our human-label data sets


In [None]:
from google.colab import drive
drive.mount('/content/gdrive')
# If access denied Login use xyzprojectuse@gmail.com to use this account's google drive
# xyzprojectuse@gmail.com Csc413zxyandrew

Mounted at /content/gdrive


In [None]:
import os
import torch
import torchvision
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
from torch.utils.data import Dataset, DataLoader
from  torchvision.transforms.functional import InterpolationMode

#LOW RESOLUTION MNIST DATASET
#################################################################################
#This file creates the dataset object that pytorch uses to create batches.
#Use this ModifiedMNISt instead of  datasets.MNIST when you want to train on the LOW resolution MNIST


class Modified_MNIST(Dataset):
    def __init__(self, train=True, resize=10):
        %cd /content/gdrive/My Drive/datav2/
        self.training = train

        if train is True:

            transform = torchvision.transforms.Compose([
                                                        torchvision.transforms.Resize(resize, interpolation=InterpolationMode.BILINEAR),
                                                        # Important to use InterpolationMode.NEAREST, to preserve blockiness
                                                        torchvision.transforms.Resize(28, interpolation=InterpolationMode.NEAREST),
                                                        torchvision.transforms.Lambda(lambda img: torchvision.transforms.functional.adjust_contrast(img, 4)),
                                                        torchvision.transforms.ToTensor()])
            self.dataset = torchvision.datasets.MNIST('./', train=train,
                                                      download=False,
                                                      transform=transform)
        else:
            transform = torchvision.transforms.Compose([torchvision.transforms.ToTensor()])
            self.dataset = torchvision.datasets.MNIST('./', train=train, download=False, transform=transform)


        with open('train_labels.json', 'r') as fp:
            self.labels_dict = json.load(fp)

        with open('correct_indices.json', 'r') as fp:
            self.correct_indices = json.load(fp)
        %cd /content/csc413/project


    def __len__(self):
        if self.training is True:
            return len(self.correct_indices) #Return number of correctly labelled
        else:
            return len(self.dataset)

    def __getitem__(self, idx):
        if self.training is True:
            actual_index = self.correct_indices[idx] #actual_index corresponds to the index for the raw MNIST dataset
            img = self.dataset[actual_index][0]
            return (img, int(self.labels_dict[str(actual_index)]["label"]))
        else:
            img = self.dataset[idx][0]
            original_label = self.dataset[idx][1]
            return (img, original_label)


class Limited_MNIST(Dataset):
    def __init__(self, train=True):
        self.training = train
        transform = torchvision.transforms.Compose([torchvision.transforms.ToTensor()])

        self.dataset = torchvision.datasets.MNIST('./', train=train, download=False, transform=transform)
        with open('correct_indices.json', 'r') as fp:
            self.correct_indices = json.load(fp)


    def __len__(self):
        if self.training is True:
            return len(self.correct_indices) #Return number of correctly labelled
        else:
            return len(self.dataset)

    def __getitem__(self, idx):
        if self.training is True:
            actual_index = self.correct_indices[idx] #actual_index corresponds to the index for the raw MNIST dataset
            img = self.dataset[actual_index][0]     #Here we use the original MNIST images
            original_label = self.dataset[actual_index][1] #Here we use the original MNIST labels
            return (img, original_label)
        else:
            img = self.dataset[idx][0]
            original_label = self.dataset[idx][1]         #Here we use the original MNIST images
            return (img, original_label)                #Here we use the original MNIST labels

# Networks used
The same version of CNN and LeNet is used for both datasets, only the in channel and other necessary dimensions are changed to make the training possible. Only MNIST is analysised due to time and resourses constraints.

In [None]:
# CNN for MNIST
from torch.nn import Module
from torch import nn

class CNN_For_MNIST(nn.Module):
    def __init__(self, in_channel1 = 1, out_channel1 = 32, out_channel2 = 64, H = 28, W = 28):
        super(CNN_For_MNIST, self).__init__()
        self.H = H
        self.W = W
        self.out_channel2 = out_channel2

        ## define two convolutional layers
        self.conv1 = nn.Conv2d(in_channels = in_channel1,
                               out_channels = out_channel1,
                               kernel_size = 5,
                               stride= 1,
                               padding = (2,2))
        self.conv2 = nn.Conv2d(in_channels = out_channel1,
                               out_channels = out_channel2,
                               kernel_size = 5,
                               stride = 1,
                               padding = (2,2))

        ## define two linear layers
        self.fc1 = nn.Linear(int(H/4)*int(W/4)* out_channel2, 1024)
        self.fc2 = nn.Linear(1024, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, int(self.H/4) * int(self.W/4) * self.out_channel2)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x
    
    def get_logits(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, int(self.H/4) * int(self.W/4) * self.out_channel2)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
# LeNet for MNIST
from torch.nn import Module
from torch import nn

class LeNet_For_MNIST(Module):
    def __init__(self):
        super(LeNet_For_MNIST, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(2)
        self.fc1 = nn.Linear(256, 120)
        self.relu3 = nn.ReLU()
        self.fc2 = nn.Linear(120, 84)
        self.relu4 = nn.ReLU()
        self.fc3 = nn.Linear(84, 10)
        self.relu5 = nn.ReLU()

    def forward(self, x):
        y = self.conv1(x)
        y = self.relu1(y)
        y = self.pool1(y)
        y = self.conv2(y)
        y = self.relu2(y)
        y = self.pool2(y)
        y = y.view(y.shape[0], -1)
        y = self.fc1(y)
        y = self.relu3(y)
        y = self.fc2(y)
        y = self.relu4(y)
        y = self.fc3(y)
        y = self.relu5(y)
        return y

    def get_logits(self, x):
      y = self.conv1(x)
      y = self.relu1(y)
      y = self.pool1(y)
      y = self.conv2(y)
      y = self.relu2(y)
      y = self.pool2(y)
      y = y.view(y.shape[0], -1)
      y = self.fc1(y)
      y = self.relu3(y)
      y = self.fc2(y)
      y = self.relu4(y)
      y = self.fc3(y)
      y = self.relu5(y)
      return y



In [None]:
# CNN for CIFAR10
class CNN_For_CIFAR(nn.Module):
    def __init__(self, in_channel1 = 3, out_channel1 = 32, out_channel2 = 64, H = 32, W = 32):
        super(CNN_For_CIFAR, self).__init__()
        self.H = H
        self.W = W
        self.out_channel2 = out_channel2

        ## define two convolutional layers
        self.conv1 = nn.Conv2d(in_channels = in_channel1,
                               out_channels = out_channel1,
                               kernel_size = 5,
                               stride= 1,
                               padding = (2,2))
        self.conv2 = nn.Conv2d(in_channels = out_channel1,
                               out_channels = out_channel2,
                               kernel_size = 5,
                               stride = 1,
                               padding = (2,2))

        ## define two linear layers
        self.fc1 = nn.Linear(int(H/4)*int(W/4)* out_channel2, 1024)
        self.fc2 = nn.Linear(1024, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, int(self.H/4) * int(self.W/4) * self.out_channel2)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
# LeNet for CIFAR10
from torch.nn import Module
from torch import nn

class LeNet_For_CIFAR(Module):
    def __init__(self):
        super(LeNet_For_CIFAR, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(2)
        self.fc1 = nn.Linear(16*5*5, 120)
        self.relu3 = nn.ReLU()
        self.fc2 = nn.Linear(120, 84)
        self.relu4 = nn.ReLU()
        self.fc3 = nn.Linear(84, 10)
        self.relu5 = nn.ReLU()

    def forward(self, x):
        y = self.conv1(x)
        y = self.relu1(y)
        y = self.pool1(y)
        y = self.conv2(y)
        y = self.relu2(y)
        y = self.pool2(y)
        y = y.view(y.shape[0], -1)
        y = self.fc1(y)
        y = self.relu3(y)
        y = self.fc2(y)
        y = self.relu4(y)
        y = self.fc3(y)
        y = self.relu5(y)
        return y

# Training
The trained models are saved in the google drive and can be loadaded directly, so there is no need to re-train the models before doing attacks. It is sufficient to only run the Helper functions for loading trained model section.


## Helper functions for training
Do not need to expand, just run this section once

In [None]:
def model_train(model, device, train_loader, optimizer, epoch):
    """train network.

    Parameters
    ----------
    model :
        model
    device :
        device(option:'cpu','cuda')
    train_loader :
        training data loader
    optimizer :
        optimizer
    epoch :
        epoch
    """
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        output = model(data)
        loss = F.cross_entropy(output, target)
        loss.backward()
        optimizer.step()

        #print every 10
        if batch_idx % 10 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                       100. * batch_idx / len(train_loader), loss.item()))


def model_test(model, device, test_loader):
    """test network.

    Parameters
    ----------
    model :
        model
    device :
        device(option:'cpu', 'cuda')
    test_loader :
        testing data loader
    """
    model.eval()

    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.cross_entropy(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

In [None]:
# We make change to the training in DeepRobust, remove other model and add LeNet
"""
This function help to train model of different archtecture easily. 
Select model archtecture and training data, then output corresponding model.

"""
from __future__ import print_function
import os
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F #233
import torch.optim as optim
from torchvision import datasets, transforms
import numpy as np
from PIL import Image

def modified_train(model, data, device, maxepoch, data_path = './', save_per_epoch = 10, seed = 100):
    """train.

    Parameters
    ----------
    model :
        model(option:'CNN', 'LeNet')
    data :
        data(option:'MNIST','CIFAR10')
    device :
        device(option:'cpu', 'cuda')
    maxepoch :
        training epoch
    data_path :
        data path(default = './')
    save_per_epoch :
        save_per_epoch(default = 10)
    seed :
        seed

    Examples
    --------
    """

    torch.manual_seed(seed)

    train_loader, test_loader = feed_dataset(data, data_path)

    if (model == 'CNN'):
        if (data == 'MNIST' or data == "Limited_MNIST" or data == "Modified_MNIST"):
          train_net = CNN_For_MNIST().to(device)
        else:
          train_net = CNN_For_CIFAR().to(device)
    elif (model == 'LeNet'):
        if (data == 'MNIST' or data == "Limited_MNIST" or data == "Modified_MNIST"):
          train_net = LeNet_For_MNIST().to(device)
        else:
          train_net = LeNet_For_CIFAR().to(device)

    optimizer = optim.SGD(train_net.parameters(), lr= 0.1, momentum=0.5)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size = 100, gamma = 0.1)
    save_model = True
    for epoch in range(1, maxepoch + 1):     ## 5 batches

        print(epoch)
        model_train(train_net, device, train_loader, optimizer, epoch)
        model_test(train_net, device, test_loader)

        if (save_model and (epoch % (save_per_epoch) == 0 or epoch == maxepoch)):
            if os.path.isdir('./trained_models/'):
                print('Save model.')
                torch.save(train_net.state_dict(), './trained_models/'+ data + "_" + model + "_epoch_" + str(epoch) + ".pt")
            else:
                os.mkdir('./trained_models/')
                print('Make directory and save model.')
                torch.save(train_net.state_dict(), './trained_models/'+ data + "_" + model + "_epoch_" + str(epoch) + ".pt")
        scheduler.step()

def feed_dataset(data, data_dict):
    if(data == 'CIFAR10'):
        transform_train = transforms.Compose([
                transforms.RandomCrop(32, padding=5),
                transforms.RandomHorizontalFlip(),
                transforms.ToTensor(),
                #transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
                ])

        transform_val = transforms.Compose([
                transforms.ToTensor(),
                #transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
                ])

        train_loader = torch.utils.data.DataLoader(
                 datasets.CIFAR10(data_dict, train=True, download = True,
                        transform=transform_train),
                 batch_size= 128, shuffle=True) #, **kwargs)

        test_loader  = torch.utils.data.DataLoader(
                 datasets.CIFAR10(data_dict, train=False, download = True,
                        transform=transform_val),
                batch_size= 1000, shuffle=True) #, **kwargs)

    
    elif(data == 'MNIST'):
        train_loader = torch.utils.data.DataLoader(
                 datasets.MNIST(data_dict, train=True, download = True,
                 transform=transforms.Compose([transforms.ToTensor(),
                 transforms.Normalize((0.1307,), (0.3081,))])),
                 batch_size=128,
                 shuffle=True)

        test_loader = torch.utils.data.DataLoader(
                datasets.MNIST(data_dict, train=False, download = True,
                transform=transforms.Compose([transforms.ToTensor(),
                transforms.Normalize((0.1307,), (0.3081,))])),
                batch_size=1000,
                shuffle=True)

    elif(data == 'Modified_MNIST'):
        data_transform = transforms.Compose([transforms.ToTensor(),
                transforms.Normalize((0.1307,), (0.3081,))])
        train_loader = torch.utils.data.DataLoader(Modified_MNIST(), 
        batch_size=128, shuffle=True)
        test_loader = torch.utils.data.DataLoader(
                datasets.MNIST(data_dict, train=False, download = True,
                transform=data_transform), batch_size=1000, shuffle=True)
    
    elif(data == 'Limited_MNIST'):
        data_transform = transforms.Compose([transforms.ToTensor(),
                transforms.Normalize((0.1307,), (0.3081,))])
        train_loader = torch.utils.data.DataLoader(Limited_MNIST(), 
        batch_size=128, shuffle=True)
        test_loader = torch.utils.data.DataLoader(
                datasets.MNIST(data_dict, train=False, download = True,
                transform=data_transform), batch_size=1000, shuffle=True)
    
    return train_loader, test_loader


def fine_tune(model, data, device, train_net, maxepoch, data_path = './', save_per_epoch = 10, seed = 100):
    """Fine_tune the last layer.

    Parameters
    ----------
    model :
        model(option:'CNN', 'LeNet')
    data :
        data(option:'MNIST','CIFAR10')
    device :
        device(option:'cpu', 'cuda')
    maxepoch :
        training epoch
    data_path :
        data path(default = './')
    save_per_epoch :
        save_per_epoch(default = 10)
    seed :
        seed

    Examples
    --------
    """

    torch.manual_seed(seed)

    train_loader, test_loader = feed_dataset(data, data_path)

    for param in train_net.parameters():
      param.requires_grad = False

    # Parameters of newly constructed modules have requires_grad=True by default
    if model == "CNN":
      #   CNN_For_MNIST
      # (conv1): Conv2d(1, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
      # (conv2): Conv2d(32, 64, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
      # (fc1): Linear(in_features=3136, out_features=1024, bias=True)
      # (fc2): Linear(in_features=1024, out_features=10, bias=True)
      # Last linear layer is fine-tuned
      train_net.fc2 = nn.Linear(1024, 10)
    else:
        # LeNet_For_MNIST(
        #   (conv1): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1))
        #   (relu1): ReLU()
        #   (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
        #   (conv2): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
        #   (relu2): ReLU()
        #   (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
        #   (fc1): Linear(in_features=256, out_features=120, bias=True)
        #   (relu3): ReLU()
        #   (fc2): Linear(in_features=120, out_features=84, bias=True)
        #   (relu4): ReLU()
        #   (fc3): Linear(in_features=84, out_features=10, bias=True)
        #   (relu5): ReLU()
        # )
      # train_net.conv2 = nn.Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
      train_net.fc3 = nn.Linear(84, 10)
    
    train_net = train_net.to(device)

    train_para = []
    for param in train_net.parameters():
      if param.requires_grad == True:
        train_para.append(param)

    optimizer = optim.SGD(train_para, lr= 0.1, momentum=0.5)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size = 100, gamma = 0.1)
    save_model = True
    for epoch in range(1, maxepoch + 1):     ## 5 batches

        print(epoch)
        model_train(train_net, device, train_loader, optimizer, epoch)
        model_test(train_net, device, test_loader)

        if (save_model and (epoch % (save_per_epoch) == 0 or epoch == maxepoch)):
            if os.path.isdir('./trained_models/'):
                print('Save model.')
                torch.save(train_net.state_dict(), './trained_models/'+ data + "_" + model + "_ft_epoch_" + str(epoch) + ".pt")
            else:
                os.mkdir('./trained_models/')
                print('Make directory and save model.')
                torch.save(train_net.state_dict(), './trained_models/'+ data + "_" + model + "_ft_epoch_" + str(epoch) + ".pt")
        scheduler.step()


## Helper functions for loading trained model

In [None]:
def load_model(network, dataset, cloud = True):
  location = "/content/csc413/project/trained_models/"
  if cloud: 
    location = "/content/gdrive/MyDrive/trained_models/"
  model = CNN_For_MNIST()
  if dataset == "MNIST" or dataset == "Modified_MNIST":
    location = location + dataset + "_" + network + "_epoch_20.pt"
    if network == "LeNet":
      model = LeNet_For_MNIST()
  else:
    location = location + dataset + "_" + network + "_epoch_20.pt"
    model = CNN_For_CIFAR()
    if network == "LeNet":
      model = LeNet_For_CIFAR()

  model.load_state_dict(torch.load(location, map_location = torch.device('cuda')))
  model.eval()
  return model

In [None]:
def load_model_ft(network, dataset, cloud = True):
  location = "/content/csc413/project/trained_models/"
  if cloud: 
    location = "/content/gdrive/MyDrive/trained_models/"
  model = CNN_For_MNIST()
  if dataset == "MNIST" or dataset == "Modified_MNIST":
    location = location + dataset + "_" + network + "_ft_epoch_20.pt"
    if network == "LeNet":
      model = LeNet_For_MNIST()
  else:
    location = location + dataset + "_" + network + "_ft_epoch_20.pt"
    model = CNN_For_CIFAR()
    if network == "LeNet":
      model = LeNet_For_CIFAR()

  model.load_state_dict(torch.load(location, map_location = torch.device('cuda')))
  model.eval()
  return model

## Interface for trainning and loading dataset
Only run this if you want to re-train the model


In [None]:
modified_train('CNN', 'MNIST', 'cuda', 20)
modified_train('CNN', 'Modified_MNIST', 'cuda', 20)
fine_tune('CNN', 'Modified_MNIST', 'cuda', load_model('CNN', 'MNIST'), 20)

In [None]:
modified_train('LeNet', 'MNIST', 'cuda', 20)
modified_train('LeNet', 'Modified_MNIST', 'cuda', 20)
fine_tune('LeNet', 'Modified_MNIST', 'cuda', load_model('LeNet', 'MNIST'), 20)

In [None]:
# If want to save the new models to drive
# !cp -R '/content/csc413/project/trained_models' '/content/gdrive/My Drive'

# Adversary attacking
We use CW (https://arxiv.org/pdf/1608.04644.pdf) and fgsm (https://arxiv.org/pdf/1706.06083.pdf) as the attacking algorithms.

## Helper functions for attacking



In [None]:
def cw_attack(model, num_test):
  import matplotlib.pyplot as plt
  import numpy as np
  import random
  import torch
  import torch.nn as nn
  import torch.nn.functional as F #233
  import torch.optim as optim
  from torchvision import datasets,models,transforms
  from PIL import Image

  import logging

  from deeprobust.image.attack.cw import CarliniWagner
  from deeprobust.image.config import attack_params

  # print log
  logging.basicConfig(level = logging.INFO, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  logger = logging.getLogger(__name__)
  logger.info("Start CW attack")

  test_loader = torch.utils.data.DataLoader(
                datasets.MNIST('./', train=False, download = True,
                transform=transforms.Compose([transforms.ToTensor()])),
                batch_size=1,
                shuffle=True)
  
  
  device = "cuda"
  batch_size = 1
  batch_num = num_test
  random_targeted = True
  test_loss = 0
  correct = 0
  count = 0
  classnum = 10
  attackmethod = CarliniWagner(model, device='cuda')
  
  for count, (data, target) in enumerate(test_loader):
      if count == batch_num:
          break
      print('batch:{}'.format(count))

      data, target = data.to(device), target.to(device)
      
      if(random_targeted == True):
          r = list(range(0, target)) + list(range(target+1, classnum))
          target_label = random.choice(r)
          adv_example = attackmethod.generate(data, target, target_label = target_label, classnum = 10, **attack_params['CW_MNIST'])

      elif(target_label >= 0):
          adv_example = attackmethod.generate(data, target, target_label = target_label, classnum = 10, **attack_params['CW_MNIST'])


      output = model(adv_example)
  
      test_loss += F.cross_entropy(output, target, reduction='sum').item()  # sum up batch loss

      pred = output.argmax(dim = 1, keepdim = True)  # get the index of the max log-probability.

      correct += pred.eq(target.view_as(pred)).sum().item()

  batch_num = count+1
  test_loss /= len(test_loader.dataset)
  print("===== ACCURACY =====")
  print('Attack Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, batch_num * batch_size,
        100. * correct / (batch_num * batch_size)))

In [None]:
def fgsm(model, num_test):
  import matplotlib.pyplot as plt
  import numpy as np
  import random
  import torch
  import torch.nn as nn
  import torch.nn.functional as F #233
  import torch.optim as optim
  from torchvision import datasets,models,transforms
  from PIL import Image

  import logging

  from deeprobust.image.attack.fgsm import FGSM
  from deeprobust.image.config import attack_params

  # print log
  logging.basicConfig(level = logging.INFO, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  logger = logging.getLogger(__name__)
  logger.info("Start FGSM attack")

  test_loader = torch.utils.data.DataLoader(
                datasets.MNIST('./', train=False, download = True,
                transform=transforms.Compose([transforms.ToTensor()])),
                batch_size=1,
                shuffle=True)
  
  
  device = "cuda"
  batch_size = 1
  batch_num = num_test
  random_targeted = True
  test_loss = 0
  correct = 0
  count = 0
  classnum = 10
  attackmethod = FGSM(model, device = "cuda")
  
  for count, (data, target) in enumerate(test_loader):
      if count == batch_num:
          break
      print('batch:{}'.format(count))

      data, target = data.to(device), target.to(device)
      
      if(random_targeted == True):
          r = list(range(0, target)) + list(range(target+1, classnum))
          target_label = random.choice(r)
          adv_example = attackmethod.generate(data, target, **attack_params['FGSM_MNIST'])

      elif(target_label >= 0):
          adv_example = attackmethod.generate(data, target, **attack_params['FGSM_MNIST'])


      output = model(adv_example)
  
      test_loss += F.cross_entropy(output, target, reduction='sum').item()  # sum up batch loss

      pred = output.argmax(dim = 1, keepdim = True)  # get the index of the max log-probability.

      correct += pred.eq(target.view_as(pred)).sum().item()

  batch_num = count+1
  test_loss /= len(test_loader.dataset)
  print("===== ACCURACY =====")
  print('Attack Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, batch_num * batch_size,
        100. * correct / (batch_num * batch_size)))

## Interface for attacking

In [None]:
model = load_model('CNN', 'MNIST')
# model = load_model('LeNet', 'MNIST')
# fgsm(model, 1000)
# cw_attack(model, 100)

In [None]:
model = load_model('CNN', 'Modified_MNIST')
# model = load_model('LeNet', 'Modified_MNIST')
# fgsm(model, 1000)
cw_attack(model, 100)

In [None]:
model = load_model_ft('CNN', 'Modified_MNIST')
# model = load_model_ft('LeNet', 'Modified_MNIST')
# fgsm(model, 1000)
cw_attack(model, 100)

# Visualisation 
This provides the visualisation for conv layer and fc layer.

## Helper functions for visualisation


In [None]:
def get_activation(name, activation):
      def hook(model, input, output):
          activation[name] = output.detach()
      return hook

def view_conv(model, layer='conv2'):
  import matplotlib.pyplot as plt
  activation = {}
  model.conv2.register_forward_hook(get_activation(layer, activation))
  dataset=datasets.MNIST("/content/gdrive/MyDrive/", transform=transforms.ToTensor(), download=False)
  data, label = dataset[2]
  #print(label)
  data.unsqueeze_(0)
  output = model(data)
  act = activation[layer].squeeze()
  col = int(act.size(0) ** 0.5) + 1
  fig, axarr = plt.subplots(col, col, squeeze=False)
  # print(act)
  for idx in range(act.size(0)):
    axarr[idx // col][idx % col].axis('off')
    axarr[idx // col][idx % col].imshow(act[idx])

  for idx in range(act.size(0), col * col):
    fig.delaxes(axarr[idx // col][idx % col])


def view_fc(model, layer='fc2'):
  weights=model.fc2.weight.data
  if layer == 'fc3':
    weights=model.fc3.weight.data
  col = int(weights.size(0) ** 0.5) + 1
  n = int(weights[0].size(0) ** 0.5)
  fig, axarr = plt.subplots(col, col)
  for idx in range(weights.size(0)):
    axarr[idx // col][idx % col].axis('off')
    axarr[idx // col][idx % col].imshow(weights[idx].view(n,n))
    
  for idx in range(weights.size(0), col * col):
    fig.delaxes(axarr[idx // col][idx % col])

In [None]:
def view_cw(model, data, target):
  import numpy as np
  import torch
  import torch.nn as nn
  import torch.nn.functional as F #233
  import torch.optim as optim
  from torchvision import datasets,models,transforms
  from PIL import Image

  import logging

  from deeprobust.image.attack.cw import CarliniWagner
  from deeprobust.image.netmodels.CNN import Net
  from deeprobust.image.config import attack_params

  # print log
  logging.basicConfig(level = logging.INFO, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  logger = logging.getLogger(__name__)
  logger.info("Start test cw attack")

  xx = datasets.MNIST('./', download = True).data[data]
  xx = xx.unsqueeze_(0).float()/255
  xx = xx.unsqueeze_(0).float().to('cuda')
  X = xx.clone()

  ## Set Target
  yy = datasets.MNIST('/content/gdrive/MyDrive/', download = False).targets[data]
  yy = yy.float()


  attack = CarliniWagner(model, device='cuda')
  AdvExArray = attack.generate(xx, yy, target_label = target, classnum = 10, **attack_params['CW_MNIST'])
  Adv = AdvExArray.clone()

  # test the result
  predict0 = model(xx)
  predict0= predict0.argmax(dim=1, keepdim=True)

  # AdvExArray = torch.from_numpy(AdvExArray)
  predict1 = model(Adv)
  predict1= predict1.argmax(dim=1, keepdim=True)

  import matplotlib.pyplot as plt
  Adv = Adv.cpu()
  X = X.cpu()

  plt.figure()

  #subplot(r,c) provide the no. of rows and columns
  f, axarr = plt.subplots(1,2) 
  
  print("========CW========")
  print("Original Prediction")
  print(predict0)
  print("Prediction under CW Attack")
  print(predict1)
  # use the created array to output your multiple images. In this case I have stacked 2 images vertically
  axarr[0].imshow(X[0, 0]*255,cmap='gray',vmin=0,vmax=255)
  axarr[1].imshow(Adv[0,0]*255,cmap='gray',vmin=0,vmax=255)

In [None]:
def view_fgsm(model, data):
  import numpy as np
  import torch
  import torch.nn as nn
  import torch.nn.functional as F #233
  import torch.optim as optim
  from torchvision import datasets,models,transforms
  from PIL import Image
  import argparse

  from deeprobust.image.attack.fgsm import FGSM
  from deeprobust.image.netmodels.CNN import Net
  from deeprobust.image.config import attack_params
  from deeprobust.image.utils import download_model

  xx = datasets.MNIST('/content/gdrive/MyDrive/', download = False).data[data:data + 1].to('cuda')
  xx = xx.unsqueeze_(1).float()/255
  yy = datasets.MNIST('/content/gdrive/MyDrive/', download = False).targets[data:data + 1].to('cuda')

  """
  Generate adversarial examples
  """

  F1 = FGSM(model, device = "cuda")       
  AdvExArray = F1.generate(xx, yy, **attack_params['FGSM_MNIST'])

  predict0 = model(xx)
  predict0= predict0.argmax(dim=1, keepdim=True)

  predict1 = model(AdvExArray)
  predict1= predict1.argmax(dim=1, keepdim=True)

  print("========FGSM========")
  print("FGSM original prediction:")
  print(predict0)

  print("FGSM attack prediction:")
  print(predict1)

  xx = xx.cpu().detach().numpy()
  AdvExArray = AdvExArray.cpu().detach().numpy()

  import matplotlib.pyplot as plt
  #subplot(r,c) provide the no. of rows and columns
  f, axarr = plt.subplots(1,2) 
  # use the created array to output your multiple images. In this case I have stacked 2 images vertically
  axarr[0].imshow(xx[0,0]*255,cmap='gray',vmin=0,vmax=255)
  axarr[1].imshow(AdvExArray[0,0]*255,cmap='gray',vmin=0,vmax=255)

## Interface for visualising

In [None]:
#model = load_model('CNN', 'MNIST')
#model = load_model('LeNet', 'MNIST')
#view_conv(model)

#model = load_model('CNN', 'Modified_MNIST')
# view_conv(model)
# view_fc(model)
for i in range(5):
  view_cw(model, i, target=9)
  view_fgsm(model, i)

In [None]:
model = load_model_ft('CNN', 'Modified_MNIST')
# model = load_model_ft('LeNet', 'Modified_MNIST')
view_conv(model)
view_fc(model)

# Measure rubustness against number of pixels mutated
To see how the models trained with different datasets perform differently.

## Helper functions for ploting

In [None]:
import numpy as np
from random import random
from IPython.display import clear_output
# test against distortion
def distort(x, num_pixels=1, value=1.0):
    for _ in range(num_pixels):
        x[0][int(random()*28)][int(random()*28)] = value
    return x

def test_label_predictions(model, device, test_loader):
    model.eval()
    actuals = []
    predictions = []
    with torch.no_grad():
        for data, target in test_loader:
            # data, target = data.to(device), target.to(device)
            output = model(data)
            prediction = output.argmax(dim=1, keepdim=True)
            actuals.extend(target.view_as(prediction))
            predictions.extend(prediction)
    return [i.item() for i in actuals], [i.item() for i in predictions]

def accuracy_score(actuals, predictions):
  a = np.array(actuals)
  b = np.array(predictions)
  return np.mean(a == b) 

def plot_accuracies(distorted_pixels, accuracies, model_names):
    clear_output()
    plt.figure(figsize=(14, 4))
    plt.xlabel('Number of distorted pixels')
    plt.ylabel('Accuracy')
    plt.ylim((0.4, 1))
    for i in range(len(model_names)):
      plt.plot(distorted_pixels[i], accuracies[i], marker='o', label=model_names[i])
    plt.legend()
    plt.show()

In [None]:
def measure_acc():
  model_names = ['CNN', "CNN trained on Modified", "CNN fined tuned"
                , 'LeNet', "LeNet trained on Modified", "LeNet fined tuned"]
  distorted_pixels_all = [[] for _ in range(len(model_names))]
  accuracies_all = [[] for _ in range(len(model_names))]
  models = [load_model('CNN', 'MNIST'), 
            load_model('CNN', 'Modified_MNIST'),
            load_model_ft('CNN', 'Modified_MNIST'), 
            load_model('LeNet', 'MNIST'), 
            load_model('LeNet', 'Modified_MNIST'),
            load_model_ft('LeNet', 'Modified_MNIST')]
  for i in range(0, 20):
      my_test_loader = torch.utils.data.DataLoader(
          datasets.MNIST(
              './',
              train=False,
              download=True,
              transform=transforms.Compose([
                transforms.ToTensor(),
                transforms.Lambda(lambda x: distort(x, num_pixels=i, value=5.0)),
                transforms.Normalize((0.1307,), (0.3081,)),
              ])
          ),
          batch_size=1000,
          shuffle=True)
      
      for j in range(len(model_names)):
        actuals, predictions = test_label_predictions(models[j], 'cuda', my_test_loader)
        distorted_pixels_all[j].append(i)
        accuracies_all[j].append(accuracy_score(actuals, predictions))
      
      plot_accuracies(distorted_pixels_all, accuracies_all, model_names)

## Interface for ploting

In [None]:
measure_acc()