# MNIST: Training and Testing on a Clean Dataset & Adversarial Detection

## Imports and MNIST loading

In [1]:
# Imports all the module paths
import sys

import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Variable
from tqdm.notebook import tnrange, tqdm

sys.path.append("../../")

# Loads the rest of the modules

# File containing all the required training methods
import defences.mnist as defences

# For testing
import utils.clean_test as clean_test

# Contains the data loadders
import utils.dataloaders as dataloaders

# For printing outcomes
# import utils.printing as printing

# Example printing, but I removed it to simplify results
# for epsilon in epsilons:
#     printing.print_attack(
#         model,
#         testSetLoader,
#         "FGSM",
#         attacks["FGSM"],
#         epsilon=epsilon,
#     )

Notebook will use PyTorch Device: CUDA
Notebook will use PyTorch Device: CUDA
Notebook will use PyTorch Device: CUDA
Notebook will use PyTorch Device: CUDA


In [2]:
# Define the `device` PyTorch will be running on, please hope it is CUDA
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Notebook will use PyTorch Device: " + device.upper())

Notebook will use PyTorch Device: CUDA


## Load the dataset

In [3]:
DATA_ROOT = "../../datasets/"

trainSetLoader, _, testSetLoader = dataloaders.get_MNIST_data_loaders(
    DATA_ROOT,
    trainSetSize=50000,
    validationSetSize=0,
    batchSize=128,
)

## Attacks and Their Results

In [4]:
# A possible attacks array (for nice printing):
# Some attacks use a helper library
import torchattacks

import attacks.fgsm as fgsm
import attacks.ifgsm as ifgsm
import attacks.pgd as pgd
import utils.attacking as attacking

loss_function = nn.CrossEntropyLoss()
attacks = {}

attacks["FGSM"] = fgsm.fgsm_attack
attacks["I-FGSM"] = ifgsm.ifgsm_attack
attacks["PGD"] = pgd.pgd_attack

## Load two models (standard and FGSM trained)

In [5]:
standard_model = torch.load("../../models_data/MNIST/mnist_standard")
standard_model.eval()

pgd_model = torch.load("../../models_data/MNIST/mnist_pgd")
pgd_model.eval()

LeNet5(
  (conv1): Conv2d(1, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (max_pool_1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(32, 64, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (max_pool_2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=3136, out_features=1024, bias=True)
  (fc2): Linear(in_features=1024, out_features=10, bias=True)
)

In [6]:
# Evaluate the two models
clean_test.test_trained_model(standard_model, testSetLoader)
clean_test.test_trained_model(pgd_model, testSetLoader)

Testing the model...


Testing Progress:   0%|          | 0/79 [00:00<?, ?it/s]

... done! Accuracy: 99.14%
Testing the model...


Testing Progress:   0%|          | 0/79 [00:00<?, ?it/s]

... done! Accuracy: 99.27%


In [7]:
SAVE_LOAD_ROOT = "../../models_data/MNIST"

model = defences.standard_training(
    trainSetLoader,
    load_if_available=True,
    load_path=SAVE_LOAD_ROOT + "/mnist_standard_with_feature_list",
)

Found already trained model...
... loaded!


In [8]:
# Test the model
clean_test.test_trained_model(model, testSetLoader)

Testing the model...


Testing Progress:   0%|          | 0/79 [00:00<?, ?it/s]

... done! Accuracy: 99.23%


In [9]:
# Save the model
torch.save(model, SAVE_LOAD_ROOT + "/mnist_standard_with_feature_list")

In [10]:
# A possible attacks array (for nice printing):
# Some attacks use a helper library
import torchattacks

import attacks.fgsm as fgsm
import attacks.ifgsm as ifgsm
import attacks.pgd as pgd
import utils.attacking as attacking

attacks = {}

attacks["FGSM"] = fgsm.fgsm_attack
attacks["I-FGSM"] = ifgsm.ifgsm_attack
attacks["PGD"] = pgd.pgd_attack

## Classification score approach for detecting adversarial example in deep neural network
https://link.springer.com/article/10.1007/s11042-020-09167-z

## Standard Model

In [11]:
# Here you decide on the threshold for the clean dataset
threshold = 0.1

# Rejected
accepted = 0
rejected = 0
correct = 0

# Use a pretty progress bar to show updates
for j, (images, labels) in enumerate(
    tqdm(testSetLoader, desc="Testing Progress", leave=False)
):
    # Cast to proper tensor
    images, labels = images.to(device), labels.to(device)

    # Predict
    logits = standard_model(images)

    # The highest class represents the chosen class (input, k, dimension)
    _, preds = torch.topk(logits, 2, 1)

    # Check each image and see if it is adversarial
    for index in range(len(images)):
        max_index = preds[index][0]
        sec_index = preds[index][1]

        diff = logits[index][max_index] - logits[index][sec_index]

        if diff < threshold:
            rejected += 1
        else:
            accepted += 1
            correct += max_index == labels[index]


print(
    "... done! Rejected {}, Accepted {}, Accuracy: {}%".format(
        rejected, accepted, float(correct) * 100 / accepted
    )
)

Testing Progress:   0%|          | 0/79 [00:00<?, ?it/s]

... done! Rejected 1, Accepted 9999, Accuracy: 99.14991499149914%


In [12]:
# Here you decide on the threshold for the clean dataset
threshold = 0.4

# Rejected
accepted = 0
rejected = 0
correct = 0

# Use a pretty progress bar to show updates
for j, (images, labels) in enumerate(
    tqdm(testSetLoader, desc="Testing Progress", leave=False)
):
    # Cast to proper tensor
    images, labels = images.to(device), labels.to(device)

    # Perturb the images using the attack
    perturbed_images = fgsm.fgsm_attack(
        images,
        labels,
        standard_model,
        loss_function,
        epsilon=0.35,
        alpha=None,
        scale=True,
        iterations=None,
    )

    # Predict
    logits = standard_model(perturbed_images)

    # The highest class represents the chosen class (input, k, dimension)
    _, preds = torch.topk(logits, 2, 1)

    # Check each image and see if it is adversarial
    for index in range(len(images)):
        max_index = preds[index][0]
        sec_index = preds[index][1]

        diff = logits[index][max_index] - logits[index][sec_index]

        if diff < threshold:
            rejected += 1
        else:
            accepted += 1
            correct += max_index == labels[index]

print(
    "... done! Rejected {}, Accepted {}, Accuracy: {}%".format(
        rejected, accepted, float(correct) * 100 / accepted
    )
)

Testing Progress:   0%|          | 0/79 [00:00<?, ?it/s]

... done! Rejected 791, Accepted 9209, Accuracy: 4.104680204148116%


## PGD Model

In [13]:
# Here you decide on the threshold for the clean dataset
threshold = 0.1

# Rejected
accepted = 0
rejected = 0
correct = 0

# Use a pretty progress bar to show updates
for j, (images, labels) in enumerate(
    tqdm(testSetLoader, desc="Testing Progress", leave=False)
):
    # Cast to proper tensor
    images, labels = images.to(device), labels.to(device)

    # Predict
    logits = pgd_model(images)

    # The highest class represents the chosen class (input, k, dimension)
    _, preds = torch.topk(logits, 2, 1)

    # Check each image and see if it is adversarial
    for index in range(len(images)):
        max_index = preds[index][0]
        sec_index = preds[index][1]

        diff = logits[index][max_index] - logits[index][sec_index]

        if diff < threshold:
            rejected += 1
        else:
            accepted += 1
            correct += max_index == labels[index]

print(
    "... done! Rejected {}, Accepted {}, Accuracy: {}%".format(
        rejected, accepted, float(correct) * 100 / accepted
    )
)

Testing Progress:   0%|          | 0/79 [00:00<?, ?it/s]

... done! Rejected 6, Accepted 9994, Accuracy: 99.28957374424655%


In [19]:
# Here you decide on the threshold for the clean dataset
threshold = 0.25

# Rejected
accepted = 0
rejected = 0
correct = 0

# Use a pretty progress bar to show updates
for j, (images, labels) in enumerate(
    tqdm(testSetLoader, desc="Testing Progress", leave=False)
):
    # Cast to proper tensor
    images, labels = images.to(device), labels.to(device)

    # Perturb the images using the attack
    perturbed_images = fgsm.fgsm_attack(
        images,
        labels,
        pgd_model,
        loss_function,
        epsilon=0.35,
        alpha=None,
        scale=True,
        iterations=None,
    )

    # Predict
    logits = pgd_model(perturbed_images)

    # The highest class represents the chosen class (input, k, dimension)
    _, preds = torch.topk(logits, 2, 1)

    # Check each image and see if it is adversarial
    for index in range(len(images)):
        max_index = preds[index][0]
        sec_index = preds[index][1]

        diff = logits[index][max_index] - logits[index][sec_index]

        if diff < threshold:
            rejected += 1
        else:
            accepted += 1
            correct += max_index == labels[index]

print(
    "... done! Rejected {}, Accepted {}, Accuracy: {}%".format(
        rejected, accepted, float(correct) * 100 / accepted
    )
)

Testing Progress:   0%|          | 0/79 [00:00<?, ?it/s]

... done! Rejected 835, Accepted 9165, Accuracy: 30.98745226404801%


## Maximum Mean Discrepancy Test is Aware of Adversarial Attacks
https://arxiv.org/abs/2010.11415

In [15]:
# Not implemented due to breaking changes

In [16]:
# Firstly extract semantic features from trained model
# Note: we just need the penultimate layer (so not complete pain)
# Train a model that also returns penultimate layer

## A Simple Unified Framework for Detecting Out-of-Distribution Samples and Adversarial Attacks
https://arxiv.org/abs/1807.03888

## Characterizing Adversarial Subspaces Using Local Intrinsic Dimensionality
https://arxiv.org/abs/1801.02613

## PCA Detection

In [17]:
from sklearn.decomposition import PCA

# Copy the MNIST data and then fit using PCA
# First convert to numpy arrays (and make it float)
numpyTrainingData = trainSetLoader.dataset.data.numpy().astype("float32")
# Note you also need to reshape the input data for your sanity
reshapedNumpyTrainingData = numpyTrainingData.reshape((len(numpyTrainingData), 28 * 28))

# Then perform PCA on training data to get principal components
# Note it should reflect dimension of image, i.e. 28 * 28
pca = PCA(n_components=28 * 28).fit(reshapedNumpyTrainingData)

In [42]:
# Now on clean data check if there are any adversarial samples
numpyTestData = testSetLoader.dataset.data.numpy().astype("float32")
reshapedNumpyTestData = numpyTestData.reshape((len(numpyTestData), 28 * 28))

# Original predictions on data
testTensor = torch.from_numpy(
    np.reshape(numpyTestData, (len(numpyTestData), 1, 28, 28))
).to(device)
logits = standard_model(testTensor).detach().cpu().numpy()
predictions_base = np.array([np.argmax(logits[i]) for i in range(len(numpyTestData))])

# Transform clean data along principal components
transformedTestData = pca.transform(reshapedNumpyTestData)

# Decides how many of the least significant coefficients (of components) to perturb
num_components = 200

# How many trials to run
num_trials = 25

# Track results
result = np.zeros(len(numpyTestData), dtype=int)

# Actual attempts
for trial in range(num_trials):
    random_noise = np.random.standard_normal(size=num_components)

    # Copy the data
    transformedTestDataNoisy = np.copy(transformedTestData)

    # Update the components with the right data
    for index in range(len(numpyTestData)):
        transformedTestDataNoisy[index][(28 * 28 - num_components) :] += (
            10 * random_noise
        )

    # Now calculate the inverse using PCA and the noise
    inverseTestDataNoisy = pca.inverse_transform(transformedTestDataNoisy)

    # Reshape into image
    testDataNoisy = np.reshape(inverseTestDataNoisy, (len(numpyTestData), 1, 28, 28))

    # Predict
    testTensor = torch.from_numpy(
        np.reshape(testDataNoisy, (len(testDataNoisy), 1, 28, 28))
    ).to(device)
    logits = logits = standard_model(testTensor).detach().cpu().numpy()
    predictions_modified = np.array(
        [np.argmax(logits[i]) for i in range(len(testDataNoisy))]
    )

    check = np.not_equal(predictions_modified, predictions_base)
    result = np.logical_or(check, result)

# Printing
print(np.sum(result))

4


In [43]:
# Now do the same on adversarial data check if there are any adversarial samples
# Use a pretty progress bar to show updates
data = []

for j, (images, labels) in enumerate(
    tqdm(testSetLoader, desc="Testing Progress", leave=False)
):
    # Cast to proper tensor
    images, labels = images.to(device), labels.to(device)

    # Perturb the images using the attack
    perturbed_images = fgsm.fgsm_attack(
        images,
        labels,
        pgd_model,
        loss_function,
        epsilon=0.75,
        alpha=None,
        scale=True,
        iterations=None,
    )

    for perturbed_image in perturbed_images:
        data.append(perturbed_image.detach().cpu().numpy())

data = np.asarray(data)
numpyTestData = data.astype("float32")
reshapedNumpyTestData = numpyTestData.reshape((len(numpyTestData), 28 * 28))

# Original predictions on data
testTensor = torch.from_numpy(
    np.reshape(numpyTestData, (len(numpyTestData), 1, 28, 28))
).to(device)
logits = standard_model(testTensor).detach().cpu().numpy()
predictions_base = np.array([np.argmax(logits[i]) for i in range(len(numpyTestData))])

# Transform clean data along principal components
transformedTestData = pca.transform(reshapedNumpyTestData)

# Decides how many of the least significant coefficients (of components) to perturb
num_components = 200

# How many trials to run
num_trials = 25

# Track results
result = np.zeros(len(numpyTestData), dtype=int)

# Actual attempts
for trial in range(num_trials):
    random_noise = np.random.standard_normal(size=num_components)

    # Copy the data
    transformedTestDataNoisy = np.copy(transformedTestData)

    # Update the components with the right data
    for index in range(len(numpyTestData)):
        transformedTestDataNoisy[index][(28 * 28 - num_components) :] += (
            10 * random_noise
        )

    # Now calculate the inverse using PCA and the noise
    inverseTestDataNoisy = pca.inverse_transform(transformedTestDataNoisy)

    # Reshape into image
    testDataNoisy = np.reshape(inverseTestDataNoisy, (len(numpyTestData), 1, 28, 28))

    # Predict
    testTensor = torch.from_numpy(
        np.reshape(testDataNoisy, (len(testDataNoisy), 1, 28, 28))
    ).to(device)
    logits = logits = standard_model(testTensor).detach().cpu().numpy()
    predictions_modified = np.array(
        [np.argmax(logits[i]) for i in range(len(testDataNoisy))]
    )

    check = np.not_equal(predictions_modified, predictions_base)
    result = np.logical_or(check, result)

# Printing
print(np.sum(result))

Testing Progress:   0%|          | 0/79 [00:00<?, ?it/s]

9995


## Applying PCA on a trained PGD model

In [44]:
from sklearn.decomposition import PCA

# Copy the MNIST data and then fit using PCA
# First convert to numpy arrays (and make it float)
numpyTrainingData = trainSetLoader.dataset.data.numpy().astype("float32")
# Note you also need to reshape the input data for your sanity
reshapedNumpyTrainingData = numpyTrainingData.reshape((len(numpyTrainingData), 28 * 28))

# Then perform PCA on training data to get principal components
# Note it should reflect dimension of image, i.e. 28 * 28
pca = PCA(n_components=28 * 28).fit(reshapedNumpyTrainingData)

In [45]:
# Now on clean data check if there are any adversarial samples
numpyTestData = testSetLoader.dataset.data.numpy().astype("float32")
reshapedNumpyTestData = numpyTestData.reshape((len(numpyTestData), 28 * 28))

# Original predictions on data
testTensor = torch.from_numpy(
    np.reshape(numpyTestData, (len(numpyTestData), 1, 28, 28))
).to(device)
logits = pgd_model(testTensor).detach().cpu().numpy()
predictions_base = np.array([np.argmax(logits[i]) for i in range(len(numpyTestData))])

# Transform clean data along principal components
transformedTestData = pca.transform(reshapedNumpyTestData)

# Decides how many of the least significant coefficients (of components) to perturb
# 100 seems to be the magic number, so very interesting it actually does impact
num_components = 200

# How many trials to run
num_trials = 25

# Track results
result = np.zeros(len(numpyTestData), dtype=int)

# Actual attempts
for trial in range(num_trials):
    random_noise = np.random.standard_normal(size=num_components)

    # Copy the data
    transformedTestDataNoisy = np.copy(transformedTestData)

    # Update the components with the right data
    for index in range(len(numpyTestData)):
        transformedTestDataNoisy[index][(28 * 28 - num_components) :] += (
            10 * random_noise
        )

    # Now calculate the inverse using PCA and the noise
    inverseTestDataNoisy = pca.inverse_transform(transformedTestDataNoisy)

    # Reshape into image
    testDataNoisy = np.reshape(inverseTestDataNoisy, (len(numpyTestData), 1, 28, 28))

    # Predict
    testTensor = torch.from_numpy(
        np.reshape(testDataNoisy, (len(testDataNoisy), 1, 28, 28))
    ).to(device)
    logits = logits = standard_model(testTensor).detach().cpu().numpy()
    predictions_modified = np.array(
        [np.argmax(logits[i]) for i in range(len(testDataNoisy))]
    )

    check = np.not_equal(predictions_modified, predictions_base)
    result = np.logical_or(check, result)

# Printing
print(np.sum(result))

96


In [46]:
# Now do the same on adversarial data check if there are any adversarial samples
# Use a pretty progress bar to show updates
data = []

for j, (images, labels) in enumerate(
    tqdm(testSetLoader, desc="Testing Progress", leave=False)
):
    # Cast to proper tensor
    images, labels = images.to(device), labels.to(device)

    # Perturb the images using the attack
    perturbed_images = fgsm.fgsm_attack(
        images,
        labels,
        pgd_model,
        loss_function,
        epsilon=0.75,
        alpha=None,
        scale=True,
        iterations=None,
    )

    for perturbed_image in perturbed_images:
        data.append(perturbed_image.detach().cpu().numpy())

data = np.asarray(data)
numpyTestData = data.astype("float32")
reshapedNumpyTestData = numpyTestData.reshape((len(numpyTestData), 28 * 28))

# Original predictions on data
testTensor = torch.from_numpy(
    np.reshape(numpyTestData, (len(numpyTestData), 1, 28, 28))
).to(device)
logits = standard_model(testTensor).detach().cpu().numpy()
predictions_base = np.array([np.argmax(logits[i]) for i in range(len(numpyTestData))])

# Transform clean data along principal components
transformedTestData = pca.transform(reshapedNumpyTestData)

# Decides how many of the least significant coefficients (of components) to perturb
num_components = 100

# How many trials to run
num_trials = 25

# Track results
result = np.zeros(len(numpyTestData), dtype=int)

# Actual attempts
for trial in range(num_trials):
    random_noise = np.random.standard_normal(size=num_components)

    # Copy the data
    transformedTestDataNoisy = np.copy(transformedTestData)

    # Update the components with the right data
    for index in range(len(numpyTestData)):
        transformedTestDataNoisy[index][(28 * 28 - num_components) :] += (
            10 * random_noise
        )

    # Now calculate the inverse using PCA and the noise
    inverseTestDataNoisy = pca.inverse_transform(transformedTestDataNoisy)

    # Reshape into image
    testDataNoisy = np.reshape(inverseTestDataNoisy, (len(numpyTestData), 1, 28, 28))

    # Predict
    testTensor = torch.from_numpy(
        np.reshape(testDataNoisy, (len(testDataNoisy), 1, 28, 28))
    ).to(device)
    logits = logits = pgd_model(testTensor).detach().cpu().numpy()
    predictions_modified = np.array(
        [np.argmax(logits[i]) for i in range(len(testDataNoisy))]
    )

    check = np.not_equal(predictions_modified, predictions_base)
    result = np.logical_or(check, result)

# Printing
print(np.sum(result))

Testing Progress:   0%|          | 0/79 [00:00<?, ?it/s]

9981


In [47]:
# Now do the same on adversarial data check if there are any adversarial samples
# Use a pretty progress bar to show updates
data = []

for j, (images, labels) in enumerate(
    tqdm(testSetLoader, desc="Testing Progress", leave=False)
):
    # Cast to proper tensor
    images, labels = images.to(device), labels.to(device)

    # Perturb the images using the attack
    perturbed_images = pgd.pgd_attack(
        images,
        labels,
        pgd_model,
        loss_function,
        epsilon=0.75,
        alpha=(2 / 255),
        scale=True,
        iterations=20,
    )

    for perturbed_image in perturbed_images:
        data.append(perturbed_image.detach().cpu().numpy())

data = np.asarray(data)
numpyTestData = data.astype("float32")
reshapedNumpyTestData = numpyTestData.reshape((len(numpyTestData), 28 * 28))

# Original predictions on data
testTensor = torch.from_numpy(
    np.reshape(numpyTestData, (len(numpyTestData), 1, 28, 28))
).to(device)
logits = standard_model(testTensor).detach().cpu().numpy()
predictions_base = np.array([np.argmax(logits[i]) for i in range(len(numpyTestData))])

# Transform clean data along principal components
transformedTestData = pca.transform(reshapedNumpyTestData)

# Decides how many of the least significant coefficients (of components) to perturb
num_components = 200

# How many trials to run
num_trials = 25

# Track results
result = np.zeros(len(numpyTestData), dtype=int)

# Actual attempts
for trial in range(num_trials):
    random_noise = np.random.standard_normal(size=num_components)

    # Copy the data
    transformedTestDataNoisy = np.copy(transformedTestData)

    # Update the components with the right data
    for index in range(len(numpyTestData)):
        transformedTestDataNoisy[index][(28 * 28 - num_components) :] += (
            10 * random_noise
        )

    # Now calculate the inverse using PCA and the noise
    inverseTestDataNoisy = pca.inverse_transform(transformedTestDataNoisy)

    # Reshape into image
    testDataNoisy = np.reshape(inverseTestDataNoisy, (len(numpyTestData), 1, 28, 28))

    # Predict
    testTensor = torch.from_numpy(
        np.reshape(testDataNoisy, (len(testDataNoisy), 1, 28, 28))
    ).to(device)
    logits = logits = pgd_model(testTensor).detach().cpu().numpy()
    predictions_modified = np.array(
        [np.argmax(logits[i]) for i in range(len(testDataNoisy))]
    )

    check = np.not_equal(predictions_modified, predictions_base)
    result = np.logical_or(check, result)

# Printing
print(np.sum(result))

Testing Progress:   0%|          | 0/79 [00:00<?, ?it/s]

10000
