In [1]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import time

import sys
ROOT = "../../"
sys.path.append(ROOT) # Add root directory to path

from src.utils.perturbations import *
from src.utils.REPResNet import Bottleneck, REPResNet

from art.attacks.evasion import FastGradientMethod
from art.estimators.classification import PyTorchClassifier
from art.defences.preprocessor.preprocessor import PreprocessorPyTorch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, TensorDataset, Dataset

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Step 1: Load the CIFAR-100 dataset
trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transforms.ToTensor())
testset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transforms.ToTensor())

x_train = trainset.data / 255.0  # Scale pixel values to [0, 1]
y_train = np.array(trainset.targets)

x_test = testset.data / 255.0  # Scale pixel values to [0, 1]
y_test = np.array(testset.targets)

x_train = np.transpose(x_train, (0, 3, 1, 2)).astype(np.float32)
x_test = np.transpose(x_test, (0, 3, 1, 2)).astype(np.float32)

y_train = np.eye(100)[y_train].astype(np.float32)
y_test = np.eye(100)[y_test].astype(np.float32)

min_pixel_value = 0.0
max_pixel_value = 1.0

Files already downloaded and verified
Files already downloaded and verified


In [3]:
class REPPreprocessor(PreprocessorPyTorch):
    def __init__(self, perturbations, include_original=True, shuffle=False):
        super(REPPreprocessor, self).__init__()
        self.perturbations = perturbations
        self.include_original = include_original
        self.shuffle = shuffle
        self.multiplicity = len(self.perturbations) + int(self.include_original)

    def forward(self, x, y=None):
        # Apply perturbations to the input batch
        x = x.to(self.device)
        B, C, H, W = x.size()  # batch size, channels, height, width

        # Apply perturbations to the input
        layers = [x]
        if not self.include_original:
            layers = []

        for perturbation in self.perturbations:
            layers.append(perturbation(x))

        if self.shuffle:
            permutation = torch.randperm(len(layers))
            layers = [layers[i] for i in permutation]

        # Concatenate the perturbations along the channel dimension
        perturbed_x = torch.cat(layers, dim=1)  # shape (B, multiplicity * C, H, W)

        return perturbed_x, y

# Testing Round 1

In [4]:
perturbation_tests = [
    [],
    [["Gaussian", 0.025]],
    [["Gaussian", 0.05]],
    [[["Gaussian", 0.05], ["ContrastBrightness", 0.9, 0.9]]],
    [[["Gaussian", 0.05], ["RotationFlip", 0.3]]],
    [[["Gaussian", 0.05], ["SaltPepper", 0.05]]],
    [["Laplacian", 0.025]],
    [["Laplacian", 0.05]],
    [[["Laplacian", 0.05], ["ContrastBrightness", 0.9, 0.9]]],
    [[["Laplacian", 0.05], ["RotationFlip", 0.3]]],
    [[["Laplacian", 0.05], ["SaltPepper", 0.05]]],
    [["Lp-norm", 0.5, 1]],
    [[["Lp-norm", 0.5, 1], ["ContrastBrightness", 0.9, 0.9]]],
    [[["Lp-norm", 0.5, 1], ["RotationFlip", 0.3]]],
    [[["Lp-norm", 0.5, 1], ["SaltPepper", 0.05]]],
    [["Lp-norm", 0.5, 2]],
    [["Lp-norm", 0.5, 3]],
    [["Lp-norm", 0.5, 4]],
    [["Lp-norm", 0.5, float('inf')]],
    [["ContrastBrightness", 0.9, 0.9]],
    [["RotationFlip", 0.3]],
    [["SaltPepper", 0.05]],
    [["Gaussian", 0.05], ["Laplacian", 0.05]],
    [[["Gaussian", 0.05], ["Laplacian", 0.05]]],
    [["Gaussian", 0.05], ["Lp-norm", 0.5, 1]],
    [["Laplacian", 0.05], ["Lp-norm", 0.5, 1]],
    [["Lp-norm", 0.5, 1], ["Lp-norm", 0.5, 2]],
    [["Gaussian", 0.05], ["Laplacian", 0.05], ["Lp-norm", 0.5, 1]],
    [["Gaussian", 0.05], [["Laplacian", 0.05], ["SaltPepper", 0.05]]],
    [["Gaussian", 0.05], [["Lp-norm", 0.5, 1], ["ContrastBrightness", 0.9, 0.9]]],
    [["Laplacian", 0.05], [["Lp-norm", 0.5, 1], ["RotationFlip", 0.3]]],
    [[["Gaussian", 0.05], ["SaltPepper", 0.05]], [["Gaussian", 0.05], ["RotationFlip", 0.3]]],
    [[["Laplacian", 0.05], ["SaltPepper", 0.05]], [["Laplacian", 0.05], ["RotationFlip", 0.3]]],
    [[["Lp-norm", 0.5, 1], ["SaltPepper", 0.05]], [["Lp-norm", 0.5, 1], ["RotationFlip", 0.3]]],
    [[["Gaussian", 0.05], ["SaltPepper", 0.05]], [["Gaussian", 0.05], ["RotationFlip", 0.3]], [["Lp-norm", 0.5, 1], ["SaltPepper", 0.05]], [["Lp-norm", 0.5, 1], ["RotationFlip", 0.3]]],
    [[["Gaussian", 0.05], ["SaltPepper", 0.05]], [["Gaussian", 0.05], ["RotationFlip", 0.3]], [["Lp-norm", 0.5, 1], ["SaltPepper", 0.05]], [["Lp-norm", 0.5, 1], ["RotationFlip", 0.3]], [["Laplacian", 0.05], ["SaltPepper", 0.05]], [["Laplacian", 0.05], ["RotationFlip", 0.3]]]
]

In [5]:
test_history = []

def display_test_history(test_history):
    for i in range(len(test_history)):
        test = test_history[i]
        num_p = len(test['perturbations'])
        io = test['include_original']
        shuffle = test['shuffle']
        training_time = test['training_time']
        baseline = test['baseline_accuracy']
        adversarial = test['adversarial_accuracy']
        adv_str = f"{adversarial[0]*100}/{adversarial[1]*100}/{adversarial[2]*100}/{adversarial[3]*100}"
        print(f"Test {i}: Perturbations ({num_p}), Original ({io}), Shuffle ({shuffle}), Training Time ({training_time}s), Baseline ({baseline*100}%), Adversarial ({adv_str}%)")

In [7]:
for perturbation_description in perturbation_tests:
    perturbations = create_perturbations(perturbation_description)
    include_original = perturbation_description == []
    shuffle = True
    multiplicity = len(perturbation_description) + int(include_original)

    # Create REPResNet Model with multiplicity applied
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    ## Model is structured after ResNet50
    model = REPResNet(Bottleneck, [3, 4, 5, 3], num_classes=10, multiplicity=multiplicity).to(device)

    lr = 0.001
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=0.0001)

    preprocessing_defense = REPPreprocessor(perturbations, include_original, shuffle)
    classifier = PyTorchClassifier(
        model=model,
        loss=criterion,
        optimizer=optimizer,
        input_shape=(3*multiplicity, 32, 32),
        nb_classes=10,
        preprocessing_defences=[preprocessing_defense]
    )

    batch_size = 128
    super_epochs = 5
    epochs_per = 10
    print(f"Training model with perturbations:")
    print(perturbation_description)
    training_start = time.time()
    for epoch in range(super_epochs):
        classifier.fit(x_train, y_train, batch_size=batch_size, nb_epochs=epochs_per, verbose=False)
        with torch.no_grad():
            train_predictions = classifier.predict(x_train)
            train_accuracy = np.sum(np.argmax(train_predictions, axis=1) == np.argmax(y_train, axis=1)) / len(y_train)
            test_predictions = classifier.predict(x_test)
            test_accuracy = np.sum(np.argmax(test_predictions, axis=1) == np.argmax(y_test, axis=1)) / len(y_test)
            print(f"Epoch {(epoch+1)*epochs_per}/{super_epochs*epochs_per} Complete After {int(time.time() - training_start)}s! Train Acc: {train_accuracy}, Test Acc: {test_accuracy}")
    total_time = time.time() - training_start

    # Step 5: Evaluate the ART classifier on benign test examples
    predictions = classifier.predict(x_test)
    baseline_accuracy = np.sum(np.argmax(predictions, axis=1) == np.argmax(y_test, axis=1)) / len(y_test)
    print("Accuracy on benign test examples: {}%".format(baseline_accuracy * 100))

    # Step 6: Generate adversarial test examples
    attack = FastGradientMethod(estimator=classifier, eps=0.025)
    x_test_adv1 = attack.generate(x=x_test)
    predictions = classifier.predict(x_test_adv1)
    adversarial_accuracy1 = np.sum(np.argmax(predictions, axis=1) == np.argmax(y_test, axis=1)) / len(y_test)
    print("Accuracy on adversarial test examples (epsilon = 0.025): {}%".format(adversarial_accuracy1 * 100))

    attack = FastGradientMethod(estimator=classifier, eps=0.05)
    x_test_adv2 = attack.generate(x=x_test)
    predictions = classifier.predict(x_test_adv2)
    adversarial_accuracy2 = np.sum(np.argmax(predictions, axis=1) == np.argmax(y_test, axis=1)) / len(y_test)
    print("Accuracy on adversarial test examples (epsilon = 0.05): {}%".format(adversarial_accuracy2 * 100))

    attack = FastGradientMethod(estimator=classifier, eps=0.075)
    x_test_adv3 = attack.generate(x=x_test)
    predictions = classifier.predict(x_test_adv3)
    adversarial_accuracy3 = np.sum(np.argmax(predictions, axis=1) == np.argmax(y_test, axis=1)) / len(y_test)
    print("Accuracy on adversarial test examples (epsilon = 0.075): {}%".format(adversarial_accuracy3 * 100))

    attack = FastGradientMethod(estimator=classifier, eps=0.1)
    x_test_adv4 = attack.generate(x=x_test)
    predictions = classifier.predict(x_test_adv4)
    adversarial_accuracy4 = np.sum(np.argmax(predictions, axis=1) == np.argmax(y_test, axis=1)) / len(y_test)
    print("Accuracy on adversarial test examples (epsilon = 0.1): {}%".format(adversarial_accuracy4 * 100))

    test_history.append({'model': "REPResNet-50",
                        'layers': [3, 4, 5, 3],
                        'perturbations': perturbation_description,
                        'include_original': include_original,
                        'shuffle': shuffle,
                        'loss': criterion,
                        'lr': lr,
                        'batch_size': batch_size,
                        'epochs': int(super_epochs*epochs_per),
                        'training_time': total_time,
                        'baseline_accuracy': baseline_accuracy,
                        'adversarial_accuracy': [adversarial_accuracy1, adversarial_accuracy2, adversarial_accuracy3, adversarial_accuracy4]})



Training model with perturbations:
[]


In [None]:
display_test_history(test_history=test_history)

Test 0: Perturbations (2), Original (False), Shuffle (True), Training Time (208.2573003768921s), Baseline (29.160000000000004%), Adversarial (17.02/11.72/9.139999999999999/7.000000000000001%)


In [None]:
import csv

def save_to_csv(data, filename):
    if not data:
        return

    # Extract the keys from the first dictionary as headers
    headers = list(data[0].keys())

    with open(filename, 'w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=headers)

        # Write the headers
        writer.writeheader()

        # Write each dictionary as a row in the CSV file
        for item in data:
            writer.writerow(item)

    print(f"Data saved to {filename} successfully.")

save_to_csv(test_history, ROOT + "notebooks/experiment_log/cifar_10_resnet_round_1_training_results.csv")

Data saved to ../../notebooks/experiment_log/cifar_100_round_2_training_results.csv successfully.
