In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import torchvision.models as tvm
from torch.utils.data import DataLoader, Subset
from tqdm import tqdm
import numpy as np
import random

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if device == "cuda":
    torch.cuda.manual_seed_all(seed)

Device: cuda


In [2]:
IMAGENET_ROOT = "/mnt/SafeAILab/datasets/imagenet"

def get_imagenet_val_loader(root, batch_size=64, subset_size=1000):
    transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225]),
    ])

    full_val = torchvision.datasets.ImageNet(
        root=root,
        split="val",
        transform=transform
    )

    if subset_size is not None and subset_size < len(full_val):
        indices = list(range(subset_size))
        val_dataset = Subset(full_val, indices)
        print(f"Using subset of ImageNet val: {subset_size} samples")
    else:
        val_dataset = full_val
        print(f"Using full ImageNet val: {len(full_val)} samples")

    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=0
    )
    return val_loader


val_loader = get_imagenet_val_loader(IMAGENET_ROOT, batch_size=64, subset_size=1000)

Using subset of ImageNet val: 1000 samples


In [3]:
def get_imagenet_models(device):
    models_dict = {}

    models_dict["alexnet"]   = tvm.alexnet(weights=tvm.AlexNet_Weights.IMAGENET1K_V1)
    models_dict["resnet18"]  = tvm.resnet18(weights=tvm.ResNet18_Weights.IMAGENET1K_V1)
    models_dict["vgg16_bn"]  = tvm.vgg16_bn(weights=tvm.VGG16_BN_Weights.IMAGENET1K_V1)
    models_dict["densenet121"] = tvm.densenet121(weights=tvm.DenseNet121_Weights.IMAGENET1K_V1)
    models_dict["mobilenet_v2"] = tvm.mobilenet_v2(weights=tvm.MobileNet_V2_Weights.IMAGENET1K_V1)

    for name, model in models_dict.items():
        model.to(device)
        model.eval()
        print(f"Loaded {name} on {device}")

    return models_dict

models_dict = get_imagenet_models(device)

Loaded alexnet on cuda
Loaded resnet18 on cuda
Loaded vgg16_bn on cuda
Loaded densenet121 on cuda
Loaded mobilenet_v2 on cuda


In [None]:
def pgd_attack_vanilla(model, x, y=None, epsilon=2/255, num_steps=10, step_size=1e-2, random_start=True, device="cuda"):
    model.eval()
    x = x.to(device)

    if y is None:
        with torch.no_grad():
            logits = model(x)
            y = logits.argmax(dim=1)
    y = y.to(device)

    loss_fn = nn.CrossEntropyLoss()

    if random_start:
        delta = torch.empty_like(x).uniform_(-epsilon, epsilon)
    else:
        delta = torch.zeros_like(x)
    delta = delta.to(device)
    
    for _ in range(num_steps):
        delta.requires_grad_(True)

        x_adv = torch.clamp(x + delta, 0, 1)
        logits = model(x_adv)
        loss = loss_fn(logits, y)

        grad = torch.autograd.grad(loss, delta, retain_graph=False, create_graph=False)[0]

        with torch.no_grad():
            delta = delta + step_size * grad.sign()
            delta.clamp_(-epsilon, epsilon)

        delta = delta.detach()

    x_adv = torch.clamp(x + delta, 0, 1)
    return x_adv.detach()

In [None]:
def pgd_attack_with_optimizer(model, x, y=None, epsilon=2/255, num_steps=10, optimizer_name="sgd", step_size=1e-2, random_start=True, device="cuda"):
    model.eval()
    x = x.to(device)

    if y is None:
        with torch.no_grad():
            logits = model(x)
            y = logits.argmax(dim=1)
    y = y.to(device)

    loss_fn = nn.CrossEntropyLoss()

    if random_start:
        delta = torch.empty_like(x).uniform_(-epsilon, epsilon)
    else:
        delta = torch.zeros_like(x)
    delta = delta.to(device)
    delta.requires_grad_(True)

    # optimizer
    if optimizer_name.lower() == "sgd":
        optimizer = optim.SGD([delta], lr=step_size)
    elif optimizer_name.lower() == "momentum":
        optimizer = optim.SGD([delta], lr=step_size, momentum=0.9)
    elif optimizer_name.lower() == "adam":
        optimizer = optim.Adam([delta], lr=step_size)
    elif optimizer_name.lower() == "adamw":
        optimizer = optim.AdamW([delta], lr=step_size)
    elif optimizer_name.lower() == "adamax":
        optimizer = optim.Adamax([delta], lr=step_size)
    elif optimizer_name.lower() == "rmsprop":
        optimizer = optim.RMSprop([delta], lr=step_size, alpha=0.99)
    elif optimizer_name.lower() == "adagrad":
        optimizer = optim.Adagrad([delta], lr=step_size)

    else:
        raise ValueError(f"Unknown optimizer: {optimizer_name}")

    for t in range(num_steps):
        x_adv = torch.clamp(x + delta, 0, 1)

        logits = model(x_adv)
        ce_loss = loss_fn(logits, y)
        attack_loss = -ce_loss

        optimizer.zero_grad()
        attack_loss.backward()
        optimizer.step()

        with torch.no_grad():
            delta.clamp_(-epsilon, epsilon)

    with torch.no_grad():
        x_adv = torch.clamp(x + delta, 0, 1)

    return x_adv.detach()

In [25]:
def eval_attack_strength(
    model, dataloader, optimizer_name, step_size,
    epsilon, num_steps, device="cuda", max_samples=None
):
    model.eval()
    n = 0

    ce_loss_clean_sum = 0.0
    ce_loss_adv_sum = 0.0

    clean_correct = 0
    adv_correct = 0

    loss_fn = nn.CrossEntropyLoss(reduction="sum")
    softmax = nn.Softmax(dim=1)

    loop = tqdm(dataloader, desc=f"{optimizer_name.upper()} attack", leave=False)

    for images, labels in loop:
        if max_samples is not None and n >= max_samples:
            break

        images = images.to(device)
        labels = labels.to(device)
        bs = images.size(0)

        if max_samples is not None and n + bs > max_samples:
            bs = max_samples - n
            images = images[:bs]
            labels = labels[:bs]
        n += bs

        # clean
        with torch.no_grad():
            logits_clean = model(images)
            prob_clean = softmax(logits_clean)

        ce_loss_clean_sum += loss_fn(logits_clean, labels).item()
        pred_clean = logits_clean.argmax(dim=1)
        clean_correct += (pred_clean == labels).sum().item()

        # adversarial
        if optimizer_name == "vanilla":
            x_adv = pgd_attack_vanilla(
                model=model,
                x=images,
                y=labels,
                epsilon=epsilon,
                num_steps=num_steps,
                step_size=step_size,
                device=device,
            )
        else:
            x_adv = pgd_attack_with_optimizer(
                model=model,
                x=images,
                y=labels,
                epsilon=epsilon,
                num_steps=num_steps,
                optimizer_name=optimizer_name,
                step_size=step_size,
                device=device,
            )

        with torch.no_grad():
            logits_adv = model(x_adv)
            prob_adv = softmax(logits_adv)

        ce_loss_adv_sum += loss_fn(logits_adv, labels).item()
        pred_adv = logits_adv.argmax(dim=1)
        adv_correct += (pred_adv == labels).sum().item()

        loop.set_postfix({
            "clean_acc": clean_correct / n,
            "robust_acc": adv_correct / n
        })

    clean_acc = clean_correct / n
    robust_acc = adv_correct / n
    asr = 1.0 - robust_acc

    return {
        "clean_acc": clean_acc,
        "robust_acc": robust_acc,
        "ASR": asr,
        "n": n,
    }

In [9]:
def log_to_file(filepath, text):
    with open(filepath, "a") as f:
        f.write(text + "\n")

In [30]:
max_samples = 1000

eps_list = [1/255, 2/255, 3/255, 4/255, 5/255, 6/255, 7/255, 8/255]
# steps_list = [5, 10, 15, 20, 25, 30]
steps_list = [10]
step_factor_list = [1.0]

optimizers = ["vanilla", "sgd", "momentum", "adam", "adamw", "adamax", "rmsprop", "adagrad"]

for eps in eps_list:
    for T in steps_list:
        for factor in step_factor_list:
            
            step_size = factor * eps / T

            log_file = (
                f"results_imagenet_attack_ablation_many_eps_"
                f"eps{eps:.5f}_steps{T}_factor{factor}.txt"
            )
            open(log_file, "w").close()
            log_to_file(log_file, "=== ImageNet PGD-Optimizer Attack Ablation Results ===")

            log_to_file(
                log_file,
                f"\n=============================="
                f"\n[SETTING] eps={eps:.5f}, steps={T}, factor={factor}, step_size={step_size:.6f}"
                f"\n=============================="
            )

            for model_name, model in models_dict.items():
                if model_name != "densenet121":
                    continue
                log_to_file(log_file, f"\n----- Model: {model_name} -----")

                for opt_name in optimizers:
                    stats = eval_attack_strength(
                        model=model,
                        dataloader=val_loader,
                        optimizer_name=opt_name,
                        step_size=step_size,
                        epsilon=eps,
                        num_steps=T,
                        device=device,
                        max_samples=max_samples,
                    )

                    line = (
                        f"[{opt_name}] "
                        f"clean_acc={stats['clean_acc']:.4f}, "
                        f"robust_acc={stats['robust_acc']:.4f}, "
                        f"ASR={stats['ASR']:.4f}, "
                        f"samples={stats['n']}"
                    )
                    log_to_file(log_file, line)

                                                                                                    