In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as img
import torchvision
import torch
import torch.nn as nn
import torch.optim as optim
import signal
import IPython

In [2]:
# load the standard mnist dataset
mnist_train = torchvision.datasets.MNIST(
    root = 'pytorch-data/',  # where to put the files
    download = True,         # if files aren't here, download them
    train = True,            # whether to import the test or the train subset
    # PyTorch uses PyTorch tensors internally, not numpy arrays, so convert them.
    transform = torchvision.transforms.ToTensor()
)
mnist_batched = torch.utils.data.DataLoader(mnist_train, batch_size=100)

In [3]:
mnist_test = torchvision.datasets.MNIST(
    root = 'pytorch-data/',  # where to put the files
    download = True,         # if files aren't here, download them
    train = False,            # whether to import the test or the train subset
    # PyTorch uses PyTorch tensors internally, not numpy arrays, so convert them.
    transform = torchvision.transforms.ToTensor()
)

In [4]:
#interruptable class can be useful for model training
class Interruptable():
    class Breakout(Exception):
        pass
    def __init__(self):
        self.interrupted = False
        self.orig_handler = None
    def __enter__(self):
        self.orig_handler = signal.getsignal(signal.SIGINT)
        signal.signal(signal.SIGINT, self.handle)
        return self.check
    def __exit__(self, exc_type, exc_val, exc_tb):
        signal.signal(signal.SIGINT, self.orig_handler)
        if exc_type == Interruptable.Breakout:
            print(' stopped')
            return True
        return False
    def handle(self, signal, frame):
        if self.interrupted:
            self.orig_handler(signal, frame)
        print('Interrupting ...', end='')
        self.interrupted = True
    def check(self):
        if self.interrupted:
            raise Interruptable.Breakout
            
def enumerate_cycle(g, shuffle=True):
    epoch = 0
    while True:
        if shuffle:
            for i,j in enumerate(np.random.permutation(len(g))):
                yield (epoch,i), g[j]
        else:
            for i,x in enumerate(g):
                yield (epoch,i), x
        epoch = epoch + 1

In [5]:
# based on the classification model used in lecture
class mnist_nn_model(nn.Module):
    def __init__(self):
        # just need to define it the other way, and then can just return the outputs from any given layer in a similar way to classify
        super().__init__()
        self.conv1 = nn.Conv2d(1,32,3,1)
        self.conv2 = nn.Conv2d(32,64,3,1)
        self.pool = nn.MaxPool2d(2)
        self.drop1 = nn.Dropout2d(.25)
        self.flat = nn.Flatten(1)
        self.lin1 = nn.Linear(9216,128)
        self.drop2 = nn.Dropout2d(.25)
        self.lin2 = nn.Linear(128,16)
        self.lin3 = nn.Linear(16,10)
    
    def forward(self,x,y):
        # log likelihood for a batch of data
        return - nn.functional.cross_entropy(self.f(x), y, reduction='none')
    
    def last_layer(self, x):
        # computing in parts so it is easy to pull out different internal pieces
        pooled = self.pool(nn.functional.relu(self.conv2(nn.functional.relu(self.conv1(x)))))
        lin1 = self.lin1(self.flat(self.drop1(nn.functional.relu(pooled))))
        lin2 = self.lin2(self.drop2(nn.functional.relu(lin1)))
        return lin2

    def f(self, x):
        return self.lin3(nn.functional.relu(self.last_layer(x)))
    def classify(self,x):
        # class probabilities for a single data point
        q = self.f(torch.as_tensor(x)[None,...])[0]
        return nn.functional.softmax(q, dim=0)

In [6]:
model = mnist_nn_model()
optimizer = optim.Adam(model.parameters())
model.train(mode = True)

mnist_nn_model(
  (conv1): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1))
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (drop1): Dropout2d(p=0.25, inplace=False)
  (flat): Flatten(start_dim=1, end_dim=-1)
  (lin1): Linear(in_features=9216, out_features=128, bias=True)
  (drop2): Dropout2d(p=0.25, inplace=False)
  (lin2): Linear(in_features=128, out_features=16, bias=True)
  (lin3): Linear(in_features=16, out_features=10, bias=True)
)

In [None]:
# train the model
iter_mnist = enumerate_cycle(mnist_batched, shuffle=False)
with Interruptable() as check_interrupted:
    for (epoch,batch_num),(imgs,lbls) in iter_mnist:
        check_interrupted()
        optimizer.zero_grad()
        loglik = model(imgs, lbls)
        e = - torch.mean(loglik)
        e.backward()
        optimizer.step()

        if batch_num % 25 == 0:
            IPython.display.clear_output(wait=True)
            print(f'epoch={epoch} batch={batch_num}/{len(mnist_batched)} loglik={-e.item()}')
    epoch += 1

In [7]:
# model can be saved and loaded
model.load_state_dict(torch.load('cnn_classifier.pt'))
model.train(mode=False)

mnist_nn_model(
  (conv1): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1))
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (drop1): Dropout2d(p=0.25, inplace=False)
  (flat): Flatten(start_dim=1, end_dim=-1)
  (lin1): Linear(in_features=9216, out_features=128, bias=True)
  (drop2): Dropout2d(p=0.25, inplace=False)
  (lin2): Linear(in_features=128, out_features=16, bias=True)
  (lin3): Linear(in_features=16, out_features=10, bias=True)
)

Now that we have a trained CNN classifier, we need to create datasets of features and labels for all the different inputs we want to test. For now, we are only going to be working with a binary Gaussian Process classifier, so every dataset will be limited to the images that are labeled as 0 or 1.

In [8]:
# save features for the original MNIST dataset
X_bin = torch.stack([img for img,lbl in mnist_train if lbl==0 or lbl ==1])
features = model.last_layer(X_bin)
Y_bin = [lbl for img,lbl in mnist_train if lbl == 0 or lbl == 1]
pd.DataFrame(features.detach().numpy()).to_csv('X_orig_binary.csv')
pd.DataFrame(Y_bin).to_csv('Y_orig_binary.csv')

In [9]:
# save features for the test dataset
X_bin_test = torch.stack([img for img,lbl in mnist_test if lbl == 0 or lbl == 1])
features = model.last_layer(X_bin_test)
Y_bin_test = [lbl for img, lbl in mnist_test if lbl == 0 or lbl == 1]
pd.DataFrame(features.detach().numpy()).to_csv('X_test_binary.csv')
pd.DataFrame(Y_bin_test).to_csv('Y_test_binary.csv')

Now, we load several noisy datasets and save the features of these datasets to a csv as well to test the Gaussian Process Classifier.

In [10]:
import requests, io
import scipy.io

In [11]:
# load the MNIST dataset with additive white gaussian noise
r = requests.get("https://www.cl.cam.ac.uk/teaching/2122/DataSci/data/nmnist/mnist-with-awgn.mat")

with io.BytesIO(r.content) as f:
    data = scipy.io.loadmat(f)
wn_train_x = data['train_x'].reshape(60000,1,28,28)
wn_train_y = data['train_y']
decoded = np.argmax(wn_train_y, axis=1)
wn_y_bin = np.array([lbl for lbl in decoded if lbl == 0 or lbl == 1])
wn_x_bin = torch.stack([torch.Tensor(img)/255 for i,img in enumerate(wn_train_x) if decoded[i]==0 or decoded[i]==1])
features = model.last_layer(wn_x_bin)
pd.DataFrame(features.detach().numpy()).to_csv('X_white_noise_binary.csv')
pd.DataFrame(wn_y_bin).to_csv('Y_white_noise_binary.csv')

In [12]:
# load the dataset with motion blur

r = requests.get("https://www.cl.cam.ac.uk/teaching/2122/DataSci/data/nmnist/mnist-with-motion-blur.mat")

with io.BytesIO(r.content) as f:
    data = scipy.io.loadmat(f)
    mb_train_x = data['train_x'].reshape(60000,1,28,28)
    mb_train_y = data['train_y']
    decoded = np.argmax(mb_train_y, axis=1)
    mb_y_bin = np.array([lbl for lbl in decoded if lbl == 0 or lbl == 1])
    mb_x_bin = torch.stack([torch.Tensor(img)/255 for i,img in enumerate(mb_train_x) if decoded[i]==0 or decoded[i]==1])
features = model.last_layer(mb_x_bin)
pd.DataFrame(features.detach().numpy()).to_csv('X_motion_blur_binary.csv')
pd.DataFrame(mb_y_bin).to_csv('Y_motion_blur_binary.csv')

In [13]:
# load the dataset with reduced contrast and added white gaussian noise
r = requests.get("https://www.cl.cam.ac.uk/teaching/2122/DataSci/data/nmnist/mnist-with-reduced-contrast-and-awgn.mat")

with io.BytesIO(r.content) as f:
    data = scipy.io.loadmat(f)
    rc_train_x = data['train_x'].reshape(60000,1,28,28)
    rc_train_y = data['train_y']
    decoded = np.argmax(rc_train_y, axis=1)
    rc_y_bin = np.array([lbl for lbl in decoded if lbl == 0 or lbl == 1])
    rc_x_bin = torch.stack([torch.Tensor(img)/255 for i,img in enumerate(rc_train_x) if decoded[i]==0 or decoded[i]==1])
features = model.last_layer(rc_x_bin)
pd.DataFrame(features.detach().numpy()).to_csv('X_reduced_contrast_binary.csv')
pd.DataFrame(rc_y_bin).to_csv('Y_reduced_contrast_binary.csv')

For the final binary datasets we will test on, we create new data by using adversarial attacks. These attacks take an input image and distort it in a way that attempts to make the CNN classifier produce outputs that are as different as possible from the original outputs. This technique often makes adversarial examples successful at 'confusing' neural network classifiers, so it will be interesting to see if a Gaussian Process classifier can recognize these differences.

In [11]:
from cleverhans.torch.attacks.projected_gradient_descent import projected_gradient_descent
from cleverhans.torch.attacks.fast_gradient_method import fast_gradient_method

We'll also generate adversarial examples with varying amounts of noise to demonstrate how well our Gaussian Process classifier can handle different levels of adversarial examples.

In [12]:
X_fgm_10 = fast_gradient_method(model.f, X_bin,.1,np.inf)
features = model.last_layer(X_fgm_10)
pd.DataFrame(features.detach().numpy()).to_csv('X_fgm_10_binary.csv')
pd.DataFrame(Y_bin).to_csv('Y_fgm_10_binary.csv')

In [13]:
X_fgm_25 = fast_gradient_method(model.f, X_bin[:5000],.25,np.inf)
features = model.last_layer(X_fgm_25)
pd.DataFrame(features.detach().numpy()).to_csv('X_fgm_25_binary.csv')
pd.DataFrame(Y_bin[:5000]).to_csv('Y_fgm_25_binary.csv')

In [14]:
X_fgm_50 = fast_gradient_method(model.f, X_bin[:5000],.5,np.inf)
features = model.last_layer(X_fgm_50)
pd.DataFrame(features.detach().numpy()).to_csv('X_fgm_50_binary.csv')
pd.DataFrame(Y_bin[:5000]).to_csv('Y_fgm_50_binary.csv')

In [15]:
X_fgm_100 = fast_gradient_method(model.f, X_bin[:5000],1,np.inf)
features = model.last_layer(X_fgm_100)
pd.DataFrame(features.detach().numpy()).to_csv('X_fgm_100_binary.csv')
pd.DataFrame(Y_bin[:5000]).to_csv('Y_fgm_100_binary.csv')

With the projected gradient descent method, it can take a long time to generate adversarial examples. To limit processing time, we'll only generate a few thousand examples for every noise level.

In [16]:
X_pgd_10 = projected_gradient_descent(model.f, X_bin[:1500], .1,.01, 40, np.inf)
features = model.last_layer(X_pgd_10)
pd.DataFrame(features.detach().numpy()).to_csv('X_pgd_10_binary.csv')
pd.DataFrame(Y_bin[:1500]).to_csv('Y_pgd_10_binary.csv')

In [17]:
X_pgd_25 = projected_gradient_descent(model.f, X_bin[:1500], .25,.01, 40, np.inf)
features = model.last_layer(X_pgd_25)
pd.DataFrame(features.detach().numpy()).to_csv('X_pgd_25_binary.csv')
pd.DataFrame(Y_bin[:1500]).to_csv('Y_pgd_25_binary.csv')

In [18]:
X_pgd_50 = projected_gradient_descent(model.f, X_bin[:1500], .5,.01, 40, np.inf)
features = model.last_layer(X_pgd_50)
pd.DataFrame(features.detach().numpy()).to_csv('X_pgd_50_binary.csv')
pd.DataFrame(Y_bin[:1500]).to_csv('Y_pgd_50_binary.csv')

In [19]:
X_pgd_100 = projected_gradient_descent(model.f, X_bin[:1500], 1,.01, 40, np.inf)
features = model.last_layer(X_pgd_100)
pd.DataFrame(features.detach().numpy()).to_csv('X_pgd_100_binary.csv')
pd.DataFrame(Y_bin[:1500]).to_csv('Y_pgd_100_binary.csv')

In [20]:
X_fgm_10.shape

torch.Size([12665, 1, 28, 28])

In [21]:
torch.save(X_fgm_10,'X_fgm_10_images.pt')
torch.save(X_fgm_25,'X_fgm_25_images.pt')
torch.save(X_fgm_50,'X_fgm_50_images.pt')
torch.save(X_fgm_100,'X_fgm_100_images.pt')
torch.save(X_pgd_10,'X_pgd_10_images.pt')
torch.save(X_pgd_25,'X_pgd_25_images.pt')
torch.save(X_pgd_50,'X_pgd_50_images.pt')
torch.save(X_pgd_100,'X_pgd_100_images.pt')