In [2]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms

from tqdm import tqdm
from matplotlib import pyplot as plt

from networks.rnn import CNNLSTM

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

  from .autonotebook import tqdm as notebook_tqdm


Device: cpu


In [3]:
from PIL import Image
class CustomCIFAR(datasets.CIFAR10):
    def __getitem__(self, index: int):
        """
        Args:
            index (int): Index
        Returns:
            tuple: (image, target) where target is index of the target class.
        """
        img, target = self.data[index], self.targets[index]
        img = Image.fromarray(img)
        if self.transform is not None:
            #CHANGED from the torchvision implementation: pass the target into transform
            img = self.transform((img, target))
        if self.target_transform is not None:
            target = self.target_transform(target)
        return img, target

In [5]:
import torch
from networks.cnn import CNN
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Fast gradient sign method
def fgsm(image, epsilon, data_grad):
    """Generate a perturbed image using the Fast Gradient Sign Method."""
    # eta = epsilon * sign(gradient of loss w.r.t input image)
    sign_data_grad = data_grad.sign()
    perturbed_image = image + epsilon * sign_data_grad
    perturbed_image = torch.clamp(perturbed_image, 0, 1)
    return perturbed_image


def simple_test(test_loader, criterion, model):
    model.to(device)
    model.eval()
    losses = []
    accuracies = []
    for inputs, labels in test_loader:
        # for every batch
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        accuracy = (torch.max(outputs, dim=1)[1] == labels).to(torch.float32).mean()
        losses.append(loss.cpu().detach().numpy())
        accuracies.append(accuracy.cpu().numpy())

    loss, accuracy = np.mean(losses), np.mean(accuracies)

    print(f"Loss: {loss:.4f}, Accuracy: {accuracy:.4f}")
    return loss, accuracy


def test_pre_post_fgsm(
    model, test_dataset, test_loader, epsilon, criterion
):
    """this function runs the model and compares the outputs with and without fgsm attack"""
    correct = 0
    adv_examples = []
    for images, labels in test_loader:
        # Send the data and label to the device
        images, labels = images.to(device), labels.to(device)

        # Set requires_grad attribute of tensor
        images.requires_grad = True

        # Forward pass the data through the model
        output = model(images)
        # get the index of the max log-probability
        init_pred = torch.max(output, dim=1)[1]
        loss = criterion(output, labels)

        model.zero_grad()
        loss.backward()

        # Collect gradients
        data_grad = images.grad.data

        # Call FGSM Attack
        perturbed_images = fgsm(images, epsilon, data_grad)

        # Re-classify the perturbed image
        output = model(perturbed_images)

        final_pred = torch.max(output, dim=1)[
            1
        ]  # get the index of the max log-probability
        correct_idx = final_pred == labels
        correct += sum(correct_idx.to(torch.float32)).item()

        # only get pred that was right buut now wrong
        incorrect_idx = (final_pred != labels) & (init_pred == labels)

        # saving examples of perturbed images for later visualization
        if len(adv_examples) < 5:
            # Save some adv examples for visualization later
            # p is the single perturbed image, y is the correct label, initial and final and pre- and post-fgsm predictions
            for initial, final, p, y in zip(
                init_pred[incorrect_idx],
                final_pred[incorrect_idx],
                perturbed_images[incorrect_idx],
                labels[incorrect_idx],
            ):
                adv_ex = p.squeeze().detach().cpu().numpy()
                adv_examples.append((initial.item(), final.item(), y.item(), adv_ex))
                # returned adv_examples is 1 x batchsize x 4, holding items: pre-fgsm pred, post-fgsm pred, ground truth, post-fgsm image

            # Special case for saving 0 epsilon examples
            if epsilon == 0:
                for initial, final in zip(
                    init_pred[correct_idx], final_pred[correct_idx]
                ):
                    adv_ex = perturbed_images.squeeze().detach().cpu().numpy()
                    adv_examples.append(
                        (initial.item(), final.item(), final.item(), adv_ex)
                    )

    data_len = len(test_dataset)
    # Calculate final accuracy for this epsilon
    final_acc = correct / float(data_len)
    print(
        "Epsilon: {}\tTest Accuracy = {} / {} = {}".format(
            epsilon, correct, data_len, final_acc
        )
    )
    # Return the accuracy and an adversarial example
    return final_acc, adv_examples


In [6]:
transform = transforms.Compose([transforms.ToTensor()])

# Download CIFAR10 dataset

train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)

# Create data loaders
train_loader= torch.utils.data.DataLoader(dataset=train_dataset, batch_size=1, shuffle=True)

classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

Files already downloaded and verified


In [7]:
# Create the model
model = CNNLSTM().to(device)
# Define the loss function and optimizer
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

EPOCHS = 10

In [6]:
# train the batch_size=1 model
# Train the model
progress = tqdm(total=len(train_loader)*EPOCHS, desc="Training") # add a progress bar
for epoch in range(EPOCHS):
    for images, labels in train_loader:
        model.train()
        images = images.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        progress.update(1)
    
    progress.write(f'Epoch [{epoch+1}/{10}], Loss: {loss.item():.4f}')

# saving model
torch.save(model.state_dict(), "models/cnn-lstm-batchsize1.pth")

Training:  10%|█         | 50017/500000 [09:04<1:17:05, 97.27it/s] 

Epoch [1/10], Loss: 2.7461


Training:  17%|█▋        | 84618/500000 [14:57<1:10:05, 98.77it/s] 

In [8]:
BASE_MODEL_PATH = "./models/cnn-lstm.pth" 
base_model = CNNLSTM().to(device)
base_model.load_state_dict(torch.load(BASE_MODEL_PATH))
EPOCHS = 10
EPSILONS =  [0.005, 0.01, 0.015, 0.2]

In [9]:
test_dataset = datasets.CIFAR10(root='./data', train=False, transform=transform)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=64, shuffle=True)

classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
accuracies = {}
losses = {}

In [10]:
class FGSMTransform:
    """Perform a fast gradient sign attack on an image."""

    def __init__(self, epsilon=0.005, prob=1):
        self.epsilon = epsilon
        self.model = CNNLSTM().to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
        self.model.load_state_dict(torch.load("./models/cnn-lstm-batchsize1.pth"))
        self.criterion = torch.nn.CrossEntropyLoss()
        self.prob = prob #probability of applying fgsm on an image
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
    def __call__(self,sample):
        x, labels = sample
        if np.random.rand() >= self.prob:
            return x
        #if we process a single image instead of batch, we need to add a fourth dimension as the batch dimension
        x = x.unsqueeze(0)
        labels = labels.unsqueeze(0)
        x = x.to(self.device)
        x.requires_grad = True

        # forward, backward pass to calculate gradient
        output = self.model(x)
        loss = self.criterion(output, labels)
        self.model.zero_grad()
        loss.backward()

        # Collect gradients
        data_grad = x.grad.data

        # Call FGSM Attack, same as fgsm()
        sign_data_grad = data_grad.sign()
        perturbed_images = x + self.epsilon * sign_data_grad
        perturbed_images = torch.clamp(perturbed_images, 0, 1)
        return perturbed_images.squeeze()
    
class ToTensor:
    """Convert ndarrays in sample to Tensors. Works the same as transforms.ToTensor() but includes labels"""
    def __call__(self, sample):
        x, label = sample
        return (transforms.functional.to_tensor(x), torch.from_numpy(np.array(label)))

In [11]:
def compare_test(base_path):
    """given ~two~ ONE model (changed from before), compare how they do under different epsilons of fgsm attack"""
    #testing our trained model
    base_model = CNN().to(device)
    base_model.load_state_dict(torch.load(base_path))

    criterion = torch.nn.CrossEntropyLoss()

    accuracies = []
    losses = []
    for e in EPSILONS:
        transform_fgsm = transforms.Compose([
                ToTensor(),
                FGSMTransform(epsilon=e) #epsilon
            ])
        fgsm_test = CustomCIFAR(root='./data', train=False, transform=transform_fgsm)
        fgsm_loader = torch.utils.data.DataLoader(dataset=fgsm_test, batch_size=64)
        l, a = simple_test(test_loader=fgsm_loader, criterion=criterion, model=base_model)
        accuracies.append(a)
        losses.append(l)
    print("\tAccuracies:", accuracies)
    print("\tLosses:", losses)
    return accuracies, losses
    # print("Augmented model accuracies:", accuracies_fgsm)

In [12]:
for epsilon in EPSILONS:
    transform = transforms.Compose([transforms.ToTensor()])
    fgsm_transform = transforms.Compose([ToTensor(),FGSMTransform(epsilon=epsilon)])

    # Download CIFAR10 dataset
    train_dataset1 = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
    train_dataset2 = CustomCIFAR(root='./data', train=True, transform=fgsm_transform)
    augmented_dataset = torch.utils.data.ConcatDataset([train_dataset1, train_dataset2])
    # Create data loaders
    train_loader= torch.utils.data.DataLoader(dataset=augmented_dataset, batch_size=64, shuffle=True)

    # Train the model
    model = CNNLSTM().to(device)
    # Define the loss function and optimizer
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    print("Training model with epsilon: ", epsilon)
    for epoch in range(EPOCHS):
        for images, labels in train_loader:
            model.train()
            images = images.to(device)
            labels = labels.to(device)

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    # #saving model
    name =  str(epsilon)[2:]
    torch.save(model.state_dict(), "./models/cnn-lstm_fgsm"+name+".pth")
    #testing model
    accuracies[epsilon], losses[epsilon] = compare_test("./models/cnn-lstm_fgsm"+name+".pth")

Files already downloaded and verified
Training model with epsilon:  0.005


KeyboardInterrupt: 

In [None]:
accuracies[0], losses[0] = compare_test("./models/cnn.pth")
plt.plot(EPSILONS,accuracies[0], "-",label="Base model")

#plotting
for epsilon in EPSILONS:
    plt.plot(EPSILONS,accuracies[epsilon], "-",label="Trained on ep="+str(epsilon))
plt.title("Epsilon vs Accuracy ")
plt.xlabel("Epsilon")
plt.ylabel("Accuracy")
plt.legend()
plt.show()