#Adversarial Training

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from torchvision.models import resnet18
from torch.utils.data import DataLoader
from tqdm.notebook import tqdm_notebook
import matplotlib.pyplot as plt
import random
import numpy as np
from google.colab import drive
drive.mount('/content/drive')
torch.manual_seed(42)
random.seed(42)
np.random.seed(42)
torch.cuda.manual_seed(42)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
transform = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
])
normalizer = transforms.Normalize((0.49139968, 0.48215827 ,0.44653124), (0.24703233, 0.24348505, 0.26158768))
train_set = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_set = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

train_loader = DataLoader(train_set, batch_size=128, shuffle=True)
test_loader = DataLoader(test_set, batch_size=128, shuffle=False)
test_adv_loader = DataLoader(test_set, batch_size=128, shuffle=False)

Files already downloaded and verified
Files already downloaded and verified


In [None]:
model = resnet18()
model.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
model.maxpool = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
model.fc = nn.Linear(512, 10)

optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)
criterion = nn.CrossEntropyLoss()

lr_sched = optim.lr_scheduler.CosineAnnealingLR(optimizer, 100)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=1, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [None]:
def pqd_attacks(model, X, y, epsilon=8/255, alpha=0.01, num_iter=7, restarts=1, normalizer=normalizer):
    max_loss = torch.zeros_like(y).to(X.device)
    max_delta = torch.zeros_like(X).to(X.device)
    for _ in range(restarts):
        delta = torch.empty_like(X).uniform_(-epsilon, epsilon).to(X.device).requires_grad_(True)

        for _ in range(num_iter):
            loss = nn.CrossEntropyLoss(reduction="none")(model(normalizer(torch.clamp(X + delta, 0, 1))), y)
            loss_b = loss.mean()
            loss_b.backward()
            max_delta[loss >= max_loss] = delta.clone()[loss >= max_loss]
            max_loss = torch.max(max_loss, loss.clone())
            delta = (delta + alpha * delta.grad.detach().sign()).clamp(-epsilon, epsilon).detach().clone().requires_grad_(True)

        with torch.no_grad():
            loss = nn.CrossEntropyLoss(reduction="none")(model(normalizer(torch.clamp(X + delta, 0, 1))), y)
            max_delta[loss >= max_loss] = delta.clone()[loss >= max_loss]
            max_loss = torch.max(max_loss, loss.clone())

    return max_delta

In [None]:
def train_adv(model, device, train_loader, optimizer, epoch, epsilon=8/255, alpha=0.01, num_iter=7, restarts=1, normalizer=normalizer):
    model.train()
    running_loss = 0
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)

        delta = pqd_attacks(model, data, target, epsilon, alpha, num_iter, restarts)
        adv_data = normalizer(torch.clamp(data + delta, 0, 1))

        optimizer.zero_grad()

        adv_output = model(adv_data)

        loss = criterion(adv_output, target)
        running_loss += loss.item()
        loss.backward()
        optimizer.step()

        if batch_idx % 100 == 0:
            print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {running_loss/(batch_idx+1):.6f}')

    return running_loss/(batch_idx + 1)

In [None]:
def test(model, device, test_loader, normalizer=normalizer):
    model.eval()
    test_loss = 0
    acc_mean = 0
    k = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(normalizer(data))
            test_loss += criterion(output, target).item()
            pred = output.argmax(dim=1)
            acc_mean += (pred == target).float().mean().item()
            k += 1

    test_loss /= k
    test_accuracy = (100. * acc_mean)/ k
    print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy:{test_accuracy:.0f}%\n')
    return test_accuracy

In [None]:
def test_adv(model, device, test_adv_loader, normalizer=normalizer, epsilon = 8/255, alpha = 0.01, num_iter = 7, restarts = 1):
    model.eval()
    test_loss = 0
    acc_mean = 0
    k = 0
    for data, target in test_adv_loader:
        data, target = data.to(device), target.to(device)
        delta = pqd_attacks(model, data, target, epsilon, alpha, num_iter, restarts)
        adv_data = normalizer(torch.clamp(data + delta, 0, 1))
        with torch.no_grad():
            adv_output = model(adv_data)
        test_loss += criterion(adv_output, target).item()
        pred = adv_output.argmax(dim=1)
        acc_mean += (pred == target).float().mean().item()
        k += 1

    test_loss /= k
    test_adv_accuracy = (100. * acc_mean)/ k
    print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy:{test_adv_accuracy:.0f}%\n')
    return test_adv_accuracy

In [None]:
for epoch in tqdm_notebook(range(1, 100)):
    running_loss = train_adv(model, device, train_loader, optimizer, epoch)
    test_acc = test(model, device, test_loader)
    test_acc_adv = test_adv(model, device, test_adv_loader)
    PATH = "/content/drive/MyDrive/adversarial_model_upgraded.pth"
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': running_loss,
    }, PATH)
    print(f'\nAttack success rate:{test_acc - test_acc_adv:.2f}%\n')
    lr_sched.step()

  0%|          | 0/99 [00:00<?, ?it/s]



KeyboardInterrupt: 