In [1]:
import requests
import torch
import torch.nn as nn
import os
from torchvision import models
import torch.optim as optim
from torch.utils.data import Dataset
from typing import Tuple
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split
import torch.nn.functional as F
import copy
import matplotlib as plt
import numpy as np

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=3),
    transforms.ToTensor(),
])

In [4]:
class TaskDataset(Dataset):
    def __init__(self, images, labels):
        self.images = [transform(img) for img in images]
        self.labels = labels

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        return self.images[idx], self.labels[idx]

In [5]:
public_dataset = torch.load("/content/drive/MyDrive/tml-assignment/Train.pt")
dataset = TaskDataset(public_dataset.imgs, public_dataset.labels)

In [6]:
dataset_size = len(dataset)
test_size = int(0.1 * dataset_size)
train_size = dataset_size - test_size

In [23]:
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

In [8]:
print(f"Train dataset size: {len(train_dataset)}")
print(f"Test dataset size: {len(test_dataset)}")

Train dataset size: 90000
Test dataset size: 10000


In [24]:
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

In [10]:
model_name = 'resnet50'

In [11]:
# Model
model = models.resnet50(pretrained=False)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 10)



In [12]:
if torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cuda")

model.to(device)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [13]:
checkpoint = torch.load("/content/drive/MyDrive/tml-assignment/out/models/resnet50_pgd.pt", map_location=device)
model.load_state_dict(checkpoint)

<All keys matched successfully>

In [14]:
def lp_loss(x, p=2):
    return torch.norm(x.view(x.size(0), -1), p=p, dim=1).mean()

def PGD(net, x, y, alpha, epsilon, iter, p, random_start=True):
    delta = torch.zeros_like(x)
    if random_start:
        delta = torch.rand_like(x) * 2 * epsilon - epsilon
        delta = torch.clamp(delta, -epsilon, epsilon)

    delta.requires_grad = True

    for t in range(iter):
        if delta.grad is not None:
            delta.grad.zero_()

        outputs = net(x + delta)
        ce_loss = torch.nn.functional.cross_entropy(outputs, y)
        lp_loss_value = lp_loss(delta, p)
        loss = ce_loss + lp_loss_value
        loss.backward(retain_graph=True)

        if delta.grad is None:
            raise ValueError("Delta gradient is not populated.")

        with torch.no_grad():
            delta = (delta + alpha * delta.grad.sign()).clamp(-epsilon, epsilon)

        delta.requires_grad_()

    pert = delta.detach()
    return pert, outputs, ce_loss.item(), lp_loss_value.item()

In [28]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=8e-5, weight_decay=5e-5, momentum=0.9)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.7)

In [16]:
def train_pgd(net, alpha, epsilon, iter, p=2):
    net.train()
    train_loss = 0
    correct = 0
    total = 0
    for _, (images, labels) in enumerate(train_loader):
        images, labels = images.to(device), labels.to(device)
        x_adv,_,_,_ = PGD(net,images,labels,alpha,epsilon,iter,p)
        optimizer.zero_grad()
        outputs = net(x_adv)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
    return train_loss/len(train_loader)

In [17]:
def test(net):
    global acc
    net.eval()
    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (images, labels) in enumerate(test_loader):
            images, labels = images.to(device), labels.to(device)
            outputs = net(images)
            loss = criterion(outputs, labels)

            test_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
    acc = 100 * correct / total
    return test_loss/len(test_loader)

In [18]:
import time

In [29]:
train_losses_pgd = []
test_losses_pgd = []
accuracy_list = []
epochs = 10
alpha = 0.01
epsilon = 0.1
iter = 5
patience = 10
best_loss = float('inf')
epochs_no_improve = 0
lp_list = [1,2,3,4,float('inf')]

In [None]:
for epoch in range(epochs):
    start_time = time.time()
    train_loss = train_pgd(model, alpha, epsilon, iter, p=2)
    test_loss = test(model)

    train_losses_pgd.append(train_loss)
    test_losses_pgd.append(test_loss)
    accuracy_list.append(acc)

    scheduler.step()

    end_time = time.time()
    epoch_time = end_time - start_time
    print(f'Time taken for epoch {epoch+1}: {epoch_time:.2f} seconds')
    print(f'Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}, Accuracy: {acc:.2f}%')


Time taken for epoch 1: 258.83 seconds
Epoch 1, Train Loss: 1.6454, Test Loss: 2.3768, Accuracy: 21.06%
Time taken for epoch 2: 258.64 seconds
Epoch 2, Train Loss: 1.6558, Test Loss: 2.2222, Accuracy: 22.86%
Time taken for epoch 3: 258.59 seconds
Epoch 3, Train Loss: 1.6540, Test Loss: 2.3349, Accuracy: 20.67%


In [None]:
print('Accuracy of the network on the test images: %d %%' % (acc))

Accuracy of the network on the test images: 75 %


In [None]:
import matplotlib.pyplot as plt
import numpy as np


In [None]:
print(len(train_losses_pgd))

9


In [None]:
torch.save(model.state_dict(), f'out/models/{model_name}_pgd_lp_incomplete.pt')

In [None]:
allowed_models = {
    "resnet18": models.resnet18,
    "resnet34": models.resnet34,
    "resnet50": models.resnet50,
}
with open("out/models/resnet50_pgd.pt", "rb") as f:
    try:
        model: torch.nn.Module = allowed_models["resnet50"](weights=None)
        model.fc = torch.nn.Linear(model.fc.weight.shape[1], 10)
    except Exception as e:
        raise Exception(
            f"Invalid model class, {e=}, only {allowed_models.keys()} are allowed",
        )
    try:
        state_dict = torch.load(f, map_location=torch.device("cpu"))
        model.load_state_dict(state_dict, strict=True)
        model.eval()
        out = model(torch.randn(1, 3, 32, 32))
    except Exception as e:
        raise Exception(f"Invalid model, {e=}")

    assert out.shape == (1, 10), "Invalid output shape"


In [None]:
response = requests.post("http://34.71.138.79:9090/robustness", files={"file": open("out/models/resnet50_pgd_incomplete.pt", "rb")}, headers={"token": "40034445", "model-name":model_name})
print(response.json())

{'clean_accuracy': 0.599, 'fgsm_accuracy': 0.15333333333333332, 'pgd_accuracy': 0.022}
