# Baseline

This part aims to answer the following question: Does the model perform well in the absence of attacks?
The results obtained here serve as a baseline for future comparisons after the adversarial attacks.

#### Classes ID and meaning:

0- Speed limit (20km/h)

1- Speed limit (30km/h)

2- Speed limit (50km/h)

3- Speed limit (60km/h)

4- Speed limit (70km/h)

5- Speed limit (80km/h)

6- End of speed limit (80km/h)

7- Speed limit (100km/h)

8- Speed limit (120km/h)

9- No passing

10- No passing for vehicles over 3.5 metric tons

11- Right-of-way at intersection

12- Priority road

13- Yield

14- Stop

15- No vehicle

16- Vehicles over 3.5 tons prohibited

17- No entry

18- General caution

19- Dangerous curve left

20- Dangerous curve right

21- Double curve

22- Bumpy road

23- Slippery road

24- Road narrows on the right

25- Road work

26- Traffic signals

27- Pedestrians

28- Children crossing

29- Bicycles crossing

30- Beware of ice/snow

31- Wild animals crossing

32- End of all speed and passing limits

33- Turn right ahead

34- Turn left ahead

35- Ahead only

36- Go straight or right

37- Go straight or left

38- Keep right

39- Keep left

40- Roundabout mandatory

41- End of no passing

42- End no passing vehicles over 3.5 tons

In [None]:
import os

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
import matplotlib.pyplot as plt
import numpy as np
import random
from collections import defaultdict, Counter

DATA_ROOT = "./data_gtsrb"                      
CKPT_PATH = "./checkpoints/resnet101_gtsrb_best.pt"
NUM_CLASSES = 43
IMG_SIZE = 224                                
BATCH_SIZE = 48
NUM_WORKERS = 2

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

In [None]:
imagenet_mean = (0.485, 0.456, 0.406)
imagenet_std  = (0.229, 0.224, 0.225)

test_tfms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(imagenet_mean, imagenet_std),
])

test_set = datasets.GTSRB(root=DATA_ROOT, split="test", download=False, transform=test_tfms)
test_loader = DataLoader(
    test_set,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    pin_memory=True
)

print("Test size:", len(test_set))

In [None]:
# Rebuild same architecture
weights = models.ResNet101_Weights.IMAGENET1K_V2
model = models.resnet101(weights=weights)
model.fc = nn.Linear(model.fc.in_features, NUM_CLASSES)

ckpt = torch.load(CKPT_PATH, map_location=device)

# Robust: checkpoint might be {"model_state": ...} or directly a state_dict
state_dict = ckpt["model_state"] if isinstance(ckpt, dict) and "model_state" in ckpt else ckpt
model.load_state_dict(state_dict)

model.to(device).eval()
print("Checkpoint loaded")

In [None]:
confmat = torch.zeros(NUM_CLASSES, NUM_CLASSES, dtype=torch.long)
correct = 0
total = 0

with torch.no_grad():
    for x, y in test_loader:
        x = x.to(device, non_blocking=True)
        y = y.to(device, non_blocking=True)

        logits = model(x)
        preds = logits.argmax(dim=1)

        correct += (preds == y).sum().item()
        total += y.size(0)

        # update confusion matrix
        for t, p in zip(y.view(-1), preds.view(-1)):
            confmat[t.long(), p.long()] += 1

acc = correct / total
print(f"Overall Test Accuracy: {acc*100:.2f}% ({correct}/{total})")


# Adversarial attacks

This part consists of two types of adversarial patch attacks:
(A) we overlay the images with a non-optimized sticker to analyze its impact on the model’s performance; and
(B) we optimize the sticker in order to further degrade the model’s performance.

## Optimized sticker (A)

In [None]:
# ---------------------------
# Universal targeted patch attack for GTSRB
# (paste into your "Add attack code here" cell)
# ---------------------------
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
import torch
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets
from torch.utils.data import Subset
from tqdm.auto import tqdm
from torchvision.transforms.functional import gaussian_blur


# 1) Patch application (in *normalized* space)
def _normalized_minmax(mean, std, device):
    """
    Given mean/std used for Normalize on [0,1] inputs, compute the min/max
    possible values in normalized space when original pixels are in [0,1].
    """
    mean = torch.tensor(mean, device=device).view(1, 3, 1, 1)
    std  = torch.tensor(std, device=device).view(1, 3, 1, 1)
    x_min = (0.0 - mean) / std
    x_max = (1.0 - mean) / std
    return x_min, x_max

def apply_noise_patch_batch(
    x_norm: torch.Tensor,
    patch: torch.Tensor,
    patch_size: int = 32,
    random_loc: bool = True,
):
    """
    Overlay a (3,P,P) patch onto a batch of normalized images (N,3,H,W).
    Returns patched images (still normalized).

    x_norm: (N,3,H,W) already normalized.
    patch : (3,P,P) in normalized space.
    """
    assert x_norm.dim() == 4 and x_norm.size(1) == 3
    assert patch.dim() == 3 and patch.size(0) == 3
    N, C, H, W = x_norm.shape
    P = patch_size
    assert patch.size(1) == P and patch.size(2) == P
    assert P <= H and P <= W

    x = x_norm.clone()

    if random_loc:
        xs = torch.randint(0, H - P + 1, (N,), device=x.device)
        ys = torch.randint(0, W - P + 1, (N,), device=x.device)
    else:
        xs = torch.full((N,), (H - P) // 2, device=x.device, dtype=torch.long)
        ys = torch.full((N,), (W - P) // 2, device=x.device, dtype=torch.long)

    for i in range(N):
        x[i, :, xs[i]:xs[i] + P, ys[i]:ys[i] + P] = patch

    # keep values within feasible normalized bounds for [0,1] images
    x_min, x_max = _normalized_minmax(imagenet_mean, imagenet_std, x.device)
    x = torch.max(torch.min(x, x_max), x_min)
    return x

def init_patch_from_blurred_target(
    train_loader,
    device,
    target_class: int,
    patch_size: int,
    imagenet_mean,
    imagenet_std,
    random_loc_for_crop: bool = False,
    blur_kernel: int | None = None,
    blur_sigma: float = 2.0,
):
    """
    Initializes a (3,P,P) patch in *normalized* space by:
    - finding one sample of target_class in train_loader
    - denormalizing to [0,1]
    - taking a P×P crop (center or random)
    - applying Gaussian blur
    - renormalizing back to ImageNet space
    """
    mean = torch.tensor(imagenet_mean, device=device).view(3, 1, 1)
    std  = torch.tensor(imagenet_std,  device=device).view(3, 1, 1)

    # Choose a reasonable odd blur kernel if not provided
    if blur_kernel is None:
        # about P/4, but at least 3, must be odd
        k = max(3, patch_size // 4)
        blur_kernel = k if (k % 2 == 1) else (k + 1)
    else:
        if blur_kernel % 2 == 0:
            blur_kernel += 1

    for x, y in train_loader:
        # y is on CPU; x is CPU unless you move it
        mask = (y == target_class)
        if mask.any():
            x = x.to(device, non_blocking=True)
            img_norm = x[mask][0]  # (3,H,W), normalized

            # denorm to [0,1]
            img01 = (img_norm * std + mean).clamp(0, 1)

            _, H, W = img01.shape
            P = patch_size

            if random_loc_for_crop:
                top  = torch.randint(0, H - P + 1, (1,), device=device).item()
                left = torch.randint(0, W - P + 1, (1,), device=device).item()
            else:
                top  = (H - P) // 2
                left = (W - P) // 2

            patch01 = img01[:, top:top+P, left:left+P]  # (3,P,P) in [0,1]

            # blur in pixel space
            patch01_blur = gaussian_blur(
                patch01, kernel_size=[blur_kernel, blur_kernel], sigma=[blur_sigma, blur_sigma]
            )

            # renorm back to normalized space
            patch_norm = (patch01_blur - mean) / std  # (3,P,P)
            return patch_norm

    # fallback if no target sample found
    return None


def cw_margin_loss(logits: torch.Tensor, target: torch.Tensor, kappa: float = 0.0):
    """
    Targeted CW-style loss: maximize (z_t - max_{i!=t} z_i).
    We *minimize* max(max_other - z_t + kappa, 0).
    """
    # logits: (N, C), target: (N,)
    N, C = logits.shape
    z_t = logits.gather(1, target.view(-1, 1)).squeeze(1)  # (N,)

    # max logit among non-target classes
    mask = F.one_hot(target, num_classes=C).bool()
    z_other = logits.masked_fill(mask, float("-inf")).max(dim=1).values  # (N,)

    # targeted margin objective (hinge)
    loss = torch.clamp(z_other - z_t + kappa, min=0.0).mean()
    return loss

def neg_target_logit_loss(logits: torch.Tensor, target: torch.Tensor):
    """
    Minimize negative target logit => maximize target logit.
    """
    z_t = logits.gather(1, target.view(-1, 1)).squeeze(1)
    return (-z_t).mean()

def kappa_linear(ep: int, kappa_start=0.0, kappa_end=10.0, ramp_start=5, ramp_end=30):
    if ep <= ramp_start:
        return kappa_start
    if ep >= ramp_end:
        return kappa_end
    t = (ep - ramp_start) / (ramp_end - ramp_start)
    return kappa_start + t * (kappa_end - kappa_start)

def optimize_targeted_patch(
    model,
    train_loader,
    device,
    target_class: int,
    patch_size: int = 32,
    epochs: int = 5,
    lr: float = 0.05,
    random_loc: bool = True,
    steps_per_epoch: int | None = None,
    tv_weight: float = 0,
    init_from_blurred_target: bool = True,
    blur_kernel: int | None = None,
    blur_sigma: float = 2.0,
):
    model.eval()

    P = patch_size

    # Feasible normalized range for [0,1] pixels
    x_min, x_max = _normalized_minmax(imagenet_mean, imagenet_std, device)

    # --- NEW: initialize patch from blurred target-class crop ---
    patch_init = None
    if init_from_blurred_target:
        patch_init = init_patch_from_blurred_target(
            train_loader=train_loader,
            device=device,
            target_class=target_class,
            patch_size=P,
            imagenet_mean=imagenet_mean,
            imagenet_std=imagenet_std,
            random_loc_for_crop=False,   # set True if you want random crop
            blur_kernel=blur_kernel,
            blur_sigma=blur_sigma,
        )

    if patch_init is None:
        # fallback: small random noise
        patch_init = 0.01 * torch.randn(3, P, P, device=device)

    patch = patch_init.clone().detach().requires_grad_(True)
    opt = torch.optim.Adam([patch], lr=lr)

    def total_variation(p):
        tv_h = (p[:, 1:, :] - p[:, :-1, :]).abs().mean()
        tv_w = (p[:, :, 1:] - p[:, :, :-1]).abs().mean()
        return tv_h + tv_w

    for ep in range(epochs):
        success = 0
        total = 0
        running_loss = 0.0
        steps = 0
        kappa = kappa_linear(ep, kappa_start=0.0, kappa_end=50.0, ramp_start=1, ramp_end=40)

        # Determine how many iterations the bar should run
        if steps_per_epoch is None:
            total_steps = len(train_loader)
        else:
            total_steps = min(steps_per_epoch, len(train_loader))

        pbar = tqdm(
            enumerate(train_loader),
            total=total_steps,
            desc=f"Epoch {ep+1}/{epochs}",
            leave=True
        )

        for step, (x, y) in pbar:
            if steps_per_epoch is not None and step >= steps_per_epoch:
                break

            x = x.to(device, non_blocking=True)

            x_patched = apply_noise_patch_batch(
                x_norm=x,
                patch=patch,
                patch_size=P,
                random_loc=random_loc
            )

            logits = model(x_patched)
            target = torch.full((x.size(0),), target_class, device=device, dtype=torch.long)

            # Define different loss types
            loss_type = "cw"  # "cw" | "logit" | "ce"

            if loss_type == "cw":
                loss = cw_margin_loss(logits, target, kappa=kappa)
            elif loss_type == "logit":
                loss = neg_target_logit_loss(logits, target)
            else:
                loss = F.cross_entropy(logits, target)
            
            if tv_weight > 0:
                loss = loss + tv_weight * total_variation(patch)

            opt.zero_grad(set_to_none=True)
            loss.backward()
            opt.step()

            # Clamp patch to feasible normalized range
            patch.data = torch.max(torch.min(patch.data, x_max[0]), x_min[0])

            # Stats
            with torch.no_grad():
                preds = logits.argmax(dim=1)
                batch_success = (preds == target).sum().item()
                success += batch_success
                total += target.numel()

            steps += 1
            running_loss += loss.item()

            pbar.set_postfix({
                "loss": f"{running_loss/steps:.4f}",
                "succ%": f"{100.0*success/max(total,1):.2f}",
            })

        print(f"Epoch {ep+1}/{epochs} | targeted success: {100.0*success/max(total,1):.2f}% | avg loss: {running_loss/max(steps,1):.4f}")

    return patch.detach().cpu()


# 3) Build a train loader (GTSRB has a train split)
# --- Debug: small subset of train set ---
DEBUG_TRAIN_N = 11968  # try 64 / 128 / 256

train_set_full = datasets.GTSRB(root=DATA_ROOT, split="train", download=True, transform=test_tfms)

rng = np.random.default_rng(0)  # deterministic
subset_idx = rng.choice(len(train_set_full), size=min(DEBUG_TRAIN_N, len(train_set_full)), replace=False)

train_set = Subset(train_set_full, subset_idx.tolist())

train_loader = DataLoader(
    train_set,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=NUM_WORKERS,
    pin_memory=True
)

print("Train size (debug):", len(train_set))


# 4) Run the attack
# Choose a target class (GTSRB labels 0..42).
# Common example: 14 = "Stop"
TARGET_CLASS = 9

P = 64  # patch size used later by your notebook
patch = optimize_targeted_patch(
    model=model,
    train_loader=train_loader,
    device=device,
    target_class=TARGET_CLASS,
    patch_size=64,
    epochs=50,
    lr=0.03,
    random_loc=True,
    steps_per_epoch=50,
    init_from_blurred_target=True,
    blur_kernel=3,      # optional
    blur_sigma=2.0      # optional
)

print("Patch learned:", patch.shape, "target =", TARGET_CLASS)

In [None]:
torch.save(
    {
        "patch": patch.detach().cpu(),
        "patch_size": P,
        "imagenet_mean": imagenet_mean,
        "imagenet_std": imagenet_std,
    },
    "optimized_patch.pt"
)


In [None]:
def eval_accuracy_with_patch(model, loader, device, patch, P=32, random_loc=True):
    model.eval()
    correct = 0
    total = 0
    patch = patch.to(device)

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device, non_blocking=True)
            y = y.to(device, non_blocking=True)

            x_patched = apply_noise_patch_batch(
                x_norm=x,
                patch_size=P,
                patch=patch,
                random_loc=random_loc
            )

            logits = model(x_patched)
            preds = logits.argmax(dim=1)

            correct += (preds == y).sum().item()
            total += y.numel()

    return correct / total

patched_acc = eval_accuracy_with_patch(model, test_loader, device, patch, P=64, random_loc=True)
print("Patched test acc (random loc):", patched_acc)

In [None]:
ID_TO_NAME = [
 "Speed limit (20km/h)", "Speed limit (30km/h)", "Speed limit (50km/h)", "Speed limit (60km/h)",
 "Speed limit (70km/h)", "Speed limit (80km/h)", "End of speed limit (80km/h)", "Speed limit (100km/h)",
 "Speed limit (120km/h)", "No passing", "No passing for vehicles over 3.5 metric tons",
 "Right-of-way at intersection", "Priority road", "Yield", "Stop", "No vehicles",
 "Vehicles over 3.5 tons prohibited", "No entry", "General caution", "Dangerous curve left",
 "Dangerous curve right", "Double curve", "Bumpy road", "Slippery road", "Road narrows on the right",
 "Road work", "Traffic signals", "Pedestrians", "Children crossing", "Bicycles crossing",
 "Beware of ice/snow", "Wild animals crossing", "End of all speed and passing limits",
 "Turn right ahead", "Turn left ahead", "Ahead only", "Go straight or right",
 "Go straight or left", "Keep right", "Keep left", "Roundabout mandatory",
 "End of no passing", "End no passing vehicles over 3.5 tons",
]

def fmt_label(idx: int, names=None):
    if names is not None and idx < len(names) and names[idx] is not None:
        return f"{idx} - {names[idx]}"
    if idx < len(ID_TO_NAME):
        return f"{idx} - {ID_TO_NAME[idx]}"
    return str(idx)

IMAGENET_MEAN = (0.485, 0.456, 0.406)
IMAGENET_STD  = (0.229, 0.224, 0.225)

_mean = torch.tensor(IMAGENET_MEAN).view(1, 3, 1, 1)
_std  = torch.tensor(IMAGENET_STD).view(1, 3, 1, 1)

def denorm(x):
    # x: normalized tensor Bx3xHxW -> returns in [roughly 0..1]
    return x * _std.to(x.device) + _mean.to(x.device)

def renorm(x01):
    # x01: Bx3xHxW in [0..1] -> returns normalized tensor
    return (x01 - _mean.to(x01.device)) / _std.to(x01.device)

@torch.no_grad()
def collect_patch_effect_miscls(model, loader, device, patch, P=64, max_images=12):
    """
    Coleta casos em que:
    - sem patch: acerta
    - com patch: erra
    """
    model.eval()
    cases = []

    for x, y in loader:
        x = x.to(device)
        y = y.to(device)

        logits_clean = model(x)
        pred_clean = logits_clean.argmax(dim=1)

        x_patched = apply_noise_patch_batch(x_norm=x, patch_size=P, patch=patch, random_loc=True)
        logits_patch = model(x_patched)
        probs_patch = torch.softmax(logits_patch, dim=1)
        pred_patch = probs_patch.argmax(dim=1)
        conf_patch = probs_patch.max(dim=1).values

        mask = (pred_clean == y) & (pred_patch != y)
        if mask.any():
            x_clean_cpu  = x[mask].detach().cpu()
            x_patch_cpu  = x_patched[mask].detach().cpu()
            y_cpu        = y[mask].detach().cpu()
            pred_p_cpu   = pred_patch[mask].detach().cpu()
            conf_p_cpu   = conf_patch[mask].detach().cpu()

            for i in range(x_clean_cpu.size(0)):
                cases.append((
                    x_clean_cpu[i], x_patch_cpu[i],
                    int(y_cpu[i]), int(pred_p_cpu[i]), float(conf_p_cpu[i])
                ))
                if len(cases) >= max_images:
                    return cases

    return cases


def show_patch_comparison(cases, class_names=None):
    if len(cases) == 0:
        print("No cases where the patch worsens the situation (correct → incorrect) were found")
        return

    rows = len(cases)
    plt.figure(figsize=(10, 4*rows))

    for i, (x_clean_norm, x_patch_norm, y_true, y_pred_patch, conf_patch) in enumerate(cases):
        # clean
        clean01 = denorm(x_clean_norm.unsqueeze(0)).clamp(0,1)[0]
        clean_img = clean01.permute(1,2,0).numpy()

        # patched
        patch01 = denorm(x_patch_norm.unsqueeze(0)).clamp(0,1)[0]
        patch_img = patch01.permute(1,2,0).numpy()

        true_name = fmt_label(y_true, class_names)
        pred_name = fmt_label(y_pred_patch, class_names)

        ax1 = plt.subplot(rows, 2, 2*i + 1)
        ax1.imshow(clean_img)
        ax1.set_title(f"CLEAN (true: {true_name})")
        ax1.axis("off")

        ax2 = plt.subplot(rows, 2, 2*i + 2)
        ax2.imshow(patch_img)
        ax2.set_title(f"PATCHED (pred: {pred_name}, conf={conf_patch:.2f})")
        ax2.axis("off")

    plt.tight_layout()
    plt.show()


class_names = getattr(test_loader.dataset, "classes", None)
cases = collect_patch_effect_miscls(model, test_loader, device, patch, P=64, max_images=10)
show_patch_comparison(cases, class_names=class_names)

In [None]:
@torch.no_grad()
def compute_confusion_matrix(model, loader, device, num_classes: int,
                             apply_patch: bool = False, patch=None, P: int = 32):
    model.eval()
    cm = torch.zeros((num_classes, num_classes), dtype=torch.int64)

    for x, y in loader:
        x = x.to(device, non_blocking=True)
        y = y.to(device, non_blocking=True)

        if apply_patch:
            assert patch is not None
            x = apply_noise_patch_batch(x_norm=x, patch_size=P, patch=patch, random_loc=True)

        logits = model(x)
        pred = logits.argmax(dim=1)

        idx = y * num_classes + pred
        cm += torch.bincount(idx, minlength=num_classes * num_classes).view(num_classes, num_classes).cpu()

    return cm


def plot_confusion_matrix(cm: torch.Tensor, class_names=None, normalize: bool = True,
                          title: str = "Confusion Matrix", figsize=(12, 10),
                          show_every: int = 1):
    cm = cm.clone().float()

    if normalize:
        row_sums = cm.sum(dim=1, keepdim=True).clamp(min=1.0)
        cm = cm / row_sums

    plt.figure(figsize=figsize)
    plt.imshow(cm.numpy(), interpolation="nearest")  # sem seaborn
    plt.title(title)
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.colorbar()

    n = cm.shape[0]
    ticks = list(range(0, n, show_every))
    plt.xticks(ticks, ticks, rotation=90)
    plt.yticks(ticks, ticks)

    plt.tight_layout()
    plt.show()


def top_confusions(cm: torch.Tensor, k: int = 15, class_names=None):
    cm = cm.clone()
    n = cm.shape[0]
    cm.fill_diagonal_(0)  # zera acertos
    flat = cm.view(-1)

    vals, idxs = torch.topk(flat, k=min(k, flat.numel()))
    results = []

    for v, idx in zip(vals.tolist(), idxs.tolist()):
        if v == 0:
            continue
        true = idx // n
        pred = idx % n

        def fmt(i):
            if class_names and i < len(class_names):
                return f"{i} - {class_names[i]}"
            if ID_TO_NAME and i < len(ID_TO_NAME):
                return f"{i} - {ID_TO_NAME[i]}"
            return str(i)

        results.append((v, fmt(true), fmt(pred)))

    print("\nTop confusões (erros mais frequentes):")
    for v, t, p in results:
        print(f"{v:6d}  true: {t:<40}  pred: {p}")

In [None]:
num_classes = 43
class_names = getattr(test_loader.dataset, "classes", None)

cm_clean = compute_confusion_matrix(model, test_loader, device, num_classes, apply_patch=False)
plot_confusion_matrix(cm_clean, class_names=class_names, normalize=True,
                      title="GTSRB - Confusion Matrix (CLEAN, normalized)", show_every=1)
top_confusions(cm_clean, k=15, class_names=class_names)


In [None]:
cm_patch = compute_confusion_matrix(model, test_loader, device, num_classes,
                                    apply_patch=True, patch=patch, P=64)
plot_confusion_matrix(cm_patch, class_names=class_names, normalize=True,
                      title="GTSRB - Confusion Matrix (PATCHED, normalized)", show_every=1)
top_confusions(cm_patch, k=15, class_names=class_names)


In [None]:
cm_diff = (cm_patch - cm_clean).clamp(min=0)
plot_confusion_matrix(cm_diff, normalize=False,
                      title="GTSRB - Confusion Matrix DIFF (PATCHED - CLEAN)", show_every=1)
top_confusions(cm_diff, k=15, class_names=class_names)
