# Adversarial Training Example

In [8]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import Compose, Resize, ToTensor 
from torchvision.utils import save_image
import matplotlib.pyplot as plt

In [20]:
# download training and test data
train_data = datasets.MNIST(
    root="data",
    train=True,
    download=True,
    transform=Compose([Resize(32), ToTensor()])
)
test_data = datasets.MNIST(
    root="data",
    train=False,
    download=True,
    transform=Compose([Resize(32), ToTensor()])
)

batch_size = 64

train_dl = DataLoader(train_data, batch_size=batch_size)
test_dl = DataLoader(test_data, batch_size=batch_size)

In [3]:
# model
class VGG11(nn.Module):
    def __init__(self):
        super().__init__()
        # Convolution layers
        self.conv_layers = nn.Sequential(
            # Layer 1
            nn.Conv2d(1, 64, 3, 1, 1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            # Layer 2
            nn.Conv2d(64, 128, 3, 1, 1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            # Layer 3
            nn.Conv2d(128, 256, 3, 1, 1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            # Layer 4
            nn.Conv2d(256, 256, 3, 1, 1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            # Layer 5
            nn.Conv2d(256, 512, 3, 1, 1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            # Layer 6
            nn.Conv2d(512, 512, 3, 1, 1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            # Layer 7
            nn.Conv2d(512, 512, 3, 1, 1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            # Layer 8
            nn.Conv2d(512, 512, 3, 1, 1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )
        # Flattener
        self.flatten = nn.Flatten()
        # Fully connected layers
        self.fc_layers = nn.Sequential(
            # Layer 9
            nn.Linear(512, 4096),
            nn.ReLU(),
            nn.Dropout(0.5),
            # Layer 10
            nn.Linear(4096, 4096),
            nn.ReLU(),
            nn.Dropout(0.5),
            # Layer 11; final
            nn.Linear(4096, 10)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = self.flatten(x)
        x = self.fc_layers(x)
        return x

# https://pytorch.org/tutorials/beginner/basics/quickstart_tutorial.html
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)

def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    return test_loss, correct

In [5]:
model = VGG11().to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

epochs = 5
train_loss = [0] * epochs
test_loss = [0] * epochs
train_acc = [0] * epochs
test_acc = [0] * epochs

for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dl, model, loss_fn, optimizer)
    train_loss[t], train_acc[t] = test(train_dl, model, loss_fn)
    test_loss[t], test_acc[t] = test(test_dl, model, loss_fn)
print("Done!")

torch.save(model.state_dict(), 'q3/model_weights.pth')

Epoch 1
-------------------------------
loss: 2.368035  [   64/60000]
loss: 2.004352  [ 6464/60000]
loss: 1.680520  [12864/60000]
loss: 1.294444  [19264/60000]
loss: 0.868130  [25664/60000]
loss: 0.655955  [32064/60000]
loss: 0.364761  [38464/60000]
loss: 0.336109  [44864/60000]
loss: 0.334184  [51264/60000]
loss: 0.303655  [57664/60000]
Test Error: 
 Accuracy: 96.7%, Avg loss: 0.156593 

Test Error: 
 Accuracy: 97.0%, Avg loss: 0.146812 

Epoch 2
-------------------------------
loss: 0.246662  [   64/60000]
loss: 0.207480  [ 6464/60000]
loss: 0.177055  [12864/60000]
loss: 0.172872  [19264/60000]
loss: 0.107401  [25664/60000]
loss: 0.166205  [32064/60000]
loss: 0.067316  [38464/60000]
loss: 0.132390  [44864/60000]
loss: 0.147848  [51264/60000]
loss: 0.157896  [57664/60000]
Test Error: 
 Accuracy: 98.4%, Avg loss: 0.065584 

Test Error: 
 Accuracy: 98.2%, Avg loss: 0.064515 

Epoch 3
-------------------------------
loss: 0.086865  [   64/60000]
loss: 0.132496  [ 6464/60000]
loss: 0.0916

3b

In [15]:
def fgsm(image, epsilon, grad):
    sign_grad = grad.sign()
    perturbed_image = image + epsilon * sign_grad
    return perturbed_image

def adversarial_test(dataloader, model, loss_fn, adv_gen, epsilon):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    adv_samples = []
    for X, y in dataloader:
        X, y = X.to(device), y.to(device)
        X.requires_grad = True
        init_pred = model(X)
        attack_loss = loss_fn(init_pred, y)
        model.zero_grad()
        attack_loss.backward()
        grad = X.grad.data
        adversarial_X = adv_gen(X, epsilon, grad)
        if len(adv_samples) < 5:
            adv_samples.append(adversarial_X)
        pred = model(adversarial_X)
        test_loss += loss_fn(pred, y).item()
        correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    return adv_samples

In [16]:
for epsilon in (0.1, 0.2, 0.5):
    adv_samples = adversarial_test(test_dl, model, loss_fn, fgsm, epsilon)
    for i in range(len(adv_samples)):
        x = adv_samples[i]
        save_image(x.view(batch_size, 1, 32, 32), f'figs/q3p2-fgsm-{epsilon}-{i+1}.png')

Test Error: 
 Accuracy: 64.6%, Avg loss: 1.158134 

Test Error: 
 Accuracy: 10.8%, Avg loss: 5.048278 

Test Error: 
 Accuracy: 9.7%, Avg loss: 11.755159 



In [30]:
def adversarial_train(dataloader, model, loss_fn, optimizer, adv_gen, epsilon):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        X.requires_grad = True

        optimizer.zero_grad()
        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        grad = X.grad.data
        optimizer.step()
        
        # generate adversarial
        adv_X = adv_gen(X, epsilon, grad)
        adv_pred = model(adv_X)
        adv_loss = loss_fn(adv_pred, y)

        adv_loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, adv_loss, current = loss.item(), adv_loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}, adversarial: {adv_loss:>7f}  [{current:>5d}/{size:>5d}]")

In [32]:
model = VGG11().to(device)
epsilon = 0.2
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
epochs = 3
for epoch in range(epochs):
    print(f"Epoch {epoch+1}\n-------------------------------")
    adversarial_train(train_dl, model, loss_fn, optimizer, fgsm, epsilon)
for epsilon in (0.1, 0.2, 0.5):
    adversarial_test(test_dl, model, loss_fn, fgsm, epsilon)

Epoch 1
-------------------------------
loss: 2.346556, adversarial: 2.329785  [   64/60000]
loss: 1.356106, adversarial: 1.661337  [ 6464/60000]
loss: 0.570784, adversarial: 1.007124  [12864/60000]
loss: 0.349652, adversarial: 0.825833  [19264/60000]
loss: 0.184833, adversarial: 0.687427  [25664/60000]
loss: 0.177587, adversarial: 0.810080  [32064/60000]
loss: 0.105211, adversarial: 0.585238  [38464/60000]
loss: 0.124611, adversarial: 0.754988  [44864/60000]
loss: 0.135895, adversarial: 0.650268  [51264/60000]
loss: 0.149023, adversarial: 0.695961  [57664/60000]
Epoch 2
-------------------------------
loss: 0.097780, adversarial: 0.413085  [   64/60000]
loss: 0.107108, adversarial: 0.415530  [ 6464/60000]
loss: 0.077144, adversarial: 0.405170  [12864/60000]
loss: 0.073841, adversarial: 0.469090  [19264/60000]
loss: 0.054380, adversarial: 0.393210  [25664/60000]
loss: 0.062997, adversarial: 0.607207  [32064/60000]
loss: 0.056725, adversarial: 0.246300  [38464/60000]
loss: 0.067643, adv

3d

In [33]:
MAX_IT = 20
def pgd(image, epsilon, grad):
    perturbed_image = image
    for t in range(1, MAX_IT):
        step = 1/t
        perturbed_image = perturbed_image + step * grad
        perturbed_image /= torch.max(perturbed_image) * epsilon
    return perturbed_image

In [34]:
model = VGG11().to(device)
epsilon = 0.2
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
for epoch in range(epochs):
    print(f"Epoch {epoch+1}\n-------------------------------")
    adversarial_train(train_dl, model, loss_fn, optimizer, pgd, epsilon)
for epsilon in (0.1, 0.2, 0.5):
    adversarial_test(test_dl, model, loss_fn, fgsm, epsilon)
    adversarial_test(test_dl, model, loss_fn, pgd, epsilon)

Epoch 1
-------------------------------


RuntimeError: one of the variables needed for gradient computation has been modified by an inplace operation: [torch.cuda.FloatTensor [64, 1, 32, 32]], which is output 0 of DivBackward0, is at version 1; expected version 0 instead. Hint: enable anomaly detection to find the operation that failed to compute its gradient, with torch.autograd.set_detect_anomaly(True).