In [1]:
import timm
import time 
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torchvision import datasets 
from torch.utils.data import DataLoader
# from medmnist import INFO
import numpy as np
import faiss
import copy
from tqdm import tqdm

from torch.nn.functional import softmax, cosine_similarity
from collections import Counter
import matplotlib.pyplot as plt
import torchvision.transforms.functional as TF
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import os 

import warnings
warnings.filterwarnings("ignore")

In [6]:
device = torch.device("cuda:2" if torch.cuda.is_available() else "cpu")

print("Using device:", device)

Using device: cuda:2


In [7]:
import os
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from PIL import Image

class CustomImageListDataset(torch.utils.data.Dataset):
    def __init__(self, file_list, class_to_idx, transform=None):
        with open(file_list, "r") as f:
            self.samples = [line.strip() for line in f]
        self.transform = transform
        self.class_to_idx = class_to_idx

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_path = self.samples[idx]
        class_folder = os.path.basename(os.path.dirname(img_path))
        label = self.class_to_idx.get(class_folder, -1)
        img = Image.open(img_path).convert("RGB")
        if self.transform:
            img = self.transform(img)
        return img, label


# ---------------- Create a combined class mapping ----------------
root_dir = "dataset/imagenet_tests"
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Collect class mappings from all 10 partitions
combined_class_to_idx = {}
for i in range(1, 11):
    test_dir = os.path.join(root_dir, f"test{i}")
    dataset = datasets.ImageFolder(test_dir, transform=transform)
    combined_class_to_idx.update(dataset.class_to_idx)

print(f"âœ… Combined class mapping built: {len(combined_class_to_idx)} total classes")

# ---------------- Load your 1000-image subset ----------------
subset_file = "results/hard_cases_missed_by_mobilenet.txt"
hard_dataset = CustomImageListDataset(subset_file, class_to_idx=combined_class_to_idx, transform=transform)
hard_loader = DataLoader(hard_dataset, batch_size=1, shuffle=False)

print(f"âœ… Loaded {len(hard_dataset)} hard samples")

âœ… Combined class mapping built: 1000 total classes
âœ… Loaded 1000 hard samples


In [8]:
import os
from torchvision import datasets, transforms
import torch

print(f"Step 1: Loading dataset with resize transform...")

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

val_data_dir = 'dataset/imagenet_validation' 

val_dataset = datasets.ImageFolder(os.path.join(val_data_dir), transform=transform) 


print(f"Validation samples: {len(val_dataset)}")

Step 1: Loading dataset with resize transform...
Validation samples: 30000


In [9]:
def get_models(dataset, model_name, key): 
    if dataset == 'imagenet':
        # save_root_path = r"checkpoint/tinyimagenet"
        model = timm.create_model(model_name, pretrained=True, num_classes=1000).to(device)
        model.eval()
        if 'inc' in key or 'vit' in key or 'bit' in key:
            return torch.nn.Sequential(transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)), model)
        else:
            return torch.nn.Sequential(transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)), model)

### Ensemble Attack 

In [10]:
import torch
import torch.nn.functional as F
from typing import List

def ensemble_mi_fgsm(
    models: List[torch.nn.Module],
    x: torch.Tensor,
    y: torch.Tensor,
    eps: float = 8/255,
    alpha: float = 2/255,
    iters: int = 10,
    decay: float = 1.0,
    clip_min: float = 0.0,
    clip_max: float = 1.0,
    loss_fn=None,
    device: str = None,
):
    if device is None:
        device = x.device

    if not isinstance(models, (list, tuple)):
        models = [models]

    for m in models:
        m.to(device).eval()

    if loss_fn is None:
        loss_fn = torch.nn.CrossEntropyLoss(reduction='mean')

    x_orig = x.clone().detach().to(device).float()
    x_adv = x_orig.clone().detach()
    momentum = torch.zeros_like(x_adv).to(device)
    y = y.to(device)

    for _ in range(iters):
        x_adv.requires_grad_(True)

        # ----- Liu et al. (2017): sum/average logits before loss -----
        sum_logits = None
        for m in models:
            out = m(x_adv)
            if isinstance(out, (tuple, list)):
                out = out[0]
            sum_logits = out if sum_logits is None else sum_logits + out
        avg_logits = sum_logits / len(models)
        total_loss = loss_fn(avg_logits, y)
        # --------------------------------------------------------------

        grad = torch.autograd.grad(total_loss, x_adv, retain_graph=False, create_graph=False)[0]
        grad = grad / (torch.norm(grad, p=1) + 1e-8)

        # momentum update (MI-FGSM)
        momentum = decay * momentum + grad
        step = alpha * momentum.sign()

        x_adv = x_adv.detach() + step.detach()
        delta = torch.clamp(x_adv - x_orig, min=-eps, max=eps)
        x_adv = torch.clamp(x_orig + delta, min=clip_min, max=clip_max).detach()

    return x_adv


In [11]:
def PGD_SMER(surrogate_models, images, labels, args, num_iter=10):
    eps = args.eps / 255.0
    alpha = args.alpha / 255.0  # step size
    beta = alpha                # inner step
    image_min = clip_by_tensor(images - eps, 0.0, 1.0)
    image_max = clip_by_tensor(images + eps, 0.0, 1.0)

    m = len(surrogate_models)
    m_smer = m * 4

    weight_selection = Weight_Selection(m).to(images.device)
    optimizer = torch.optim.SGD(weight_selection.parameters(), lr=2e-2, weight_decay=2e-3)

    for t in range(num_iter):
        if images.grad is not None:
            images.grad.zero_()

        images = Variable(images, requires_grad=True)
        x_before = images.clone()

        noise_inner_all = torch.zeros([m_smer, *images.shape]).to(images.device)
        grad_inner = torch.zeros_like(images)

        # build random permutation groups
        options = []
        for i in range(int(m_smer / m)):
            perm = list(range(m))
            np.random.shuffle(perm)
            options.append(perm)
        options = np.reshape(options, -1)

        x_inner = images.detach()

        # SMER multi-model inner optimization
        for j in range(m_smer):
            option = options[j]
            grad_single = surrogate_models[option]

            x_inner.requires_grad = True
            out_logits = grad_single(x_inner)

            if isinstance(out_logits, list):
                out = weight_selection(out_logits[0], option)
                aux_out = weight_selection(out_logits[1], option)
                loss = F.cross_entropy(out, labels) + F.cross_entropy(aux_out, labels)
            else:
                out = weight_selection(out_logits, option)
                loss = F.cross_entropy(out, labels)

            # compute single-model gradient
            noise_single = torch.autograd.grad(loss, x_inner)[0]

            # SMER group-level gradient
            group_logits = 0
            group_aux_logits = 0

            for m_step, model_s in enumerate(surrogate_models):
                logit_s = model_s(x_inner)

                if isinstance(logit_s, list):
                    logits = weight_selection(logit_s[0], m_step)
                    aux_logits = weight_selection(logit_s[1], m_step)
                    group_aux_logits += aux_logits / m
                else:
                    logits = weight_selection(logit_s, m_step)

                group_logits += logits / m

            loss_group = F.cross_entropy(group_logits, labels)
            if isinstance(logit_s, list):
                loss_group += F.cross_entropy(group_aux_logits, labels)

            # SMER outer optimization
            outer_loss = -torch.log(loss_group)
            x_inner.requires_grad = False

            outer_loss.backward()
            optimizer.step()
            optimizer.zero_grad()

            # normalize gradient
            noise_single = noise_single / (torch.mean(torch.abs(noise_single), dim=[1,2,3], keepdims=True) + 1e-8)
            grad_inner = grad_inner + noise_single

            # PGD inner update (REAL gradient, not sign)
            x_inner = x_inner + beta * grad_inner
            x_inner = clip_by_tensor(x_inner, image_min, image_max)

            noise_inner_all[j] = grad_inner.clone()

        # final gradient used for PGD update
        final_grad = noise_inner_all[-1]
        final_grad = final_grad / (torch.mean(torch.abs(final_grad), dim=[1,2,3], keepdims=True) + 1e-8)

        # PGD update: add raw gradient
        images = x_before + alpha * final_grad

        # project back to Lâˆž ball
        images = clip_by_tensor(images, image_min, image_max)

    return images


In [12]:
import numpy as np
import torch
import torch.nn.functional as F
from torch.autograd import Variable
import torch.nn as nn

def clip_by_tensor(t, t_min, t_max):
    """
    clip_by_tensor
    :param t: tensor
    :param t_min: min
    :param t_max: max
    :return: cliped tensor
    """
    result = (t >= t_min).float() * t + (t < t_min).float() * t_min
    result = (result <= t_max).float() * result + (result > t_max).float() * t_max
    return result

class Weight_Selection(nn.Module):
    def __init__(self, weight_len) -> None:
        super(Weight_Selection,self).__init__()
        self.weight = nn.parameter.Parameter(torch.ones([weight_len]))

    def forward(self, x, index):
        x = self.weight[index] * x
        return x

def MI_FGSM_SMER(surrogate_models,images, labels, args,num_iter = 10):
    eps = args.eps/255.0
    alpha = args.alpha/255.0
    beta = alpha
    momentum = args.momentum
    image_min = clip_by_tensor(images - eps, 0.0, 1.0)
    image_max = clip_by_tensor(images + eps, 0.0, 1.0)
    m = len(surrogate_models) 
    m_smer = m*4
    weight_selection = Weight_Selection(m).to(images.device)
    optimizer = torch.optim.SGD(weight_selection.parameters(),lr=2e-2,weight_decay=2e-3)
    noise = 0
    grad = 0
    for i in range(num_iter):
        if images.grad is not None:
            images.grad.zero_()
        images = Variable(images, requires_grad = True)
        x_inner = images.detach()
        x_before = images.clone()
        noise_inner_all = torch.zeros([m_smer, *images.shape]).to(images.device)
        grad_inner = torch.zeros_like(images)
        options = []
        for i in range(int(m_smer / m)):
            options_single=[j for j in range(m)]
            np.random.shuffle(options_single)
            options.append(options_single)
        options = np.reshape(options,-1)
        for j in range(m_smer):
            option = options[j]
            grad_single = surrogate_models[option]
            x_inner.requires_grad = True
            out_logits = grad_single(x_inner)
            if type(out_logits) is list:
                out = weight_selection(out_logits[0],option)
                aux_out = weight_selection(out_logits[1],option)
            else:
                out = weight_selection(out_logits,option)
            loss = F.cross_entropy(out, labels)
            if type(out_logits) is list:
                loss = loss + F.cross_entropy(aux_out, labels)
            noise_im_inner = torch.autograd.grad(loss,x_inner)[0]
            group_logits = 0
            group_aux_logits = 0
            for m_step, model_s in enumerate(surrogate_models):
                out_logits = model_s(x_inner)
                if type(out_logits) is list:
                    logits = weight_selection(out_logits[0],m_step)
                    aux_logits = weight_selection(out_logits[1],m_step)
                else:
                    logits = weight_selection(out_logits,m_step)
                group_logits = group_logits + logits / m
                if type(out_logits) is list:
                    group_aux_logits = group_aux_logits + aux_logits / m
            loss = F.cross_entropy(group_logits,labels)
            if type(out_logits) is list:
                loss = loss + F.cross_entropy(group_aux_logits,labels)
            outer_loss = -torch.log(loss)
            x_inner.requires_grad = False
            outer_loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            noise_inner = noise_im_inner
            noise_inner = noise_inner / torch.mean(torch.abs(noise_inner), dim=[1, 2, 3], keepdims=True)
            grad_inner = grad_inner + noise_inner
            x_inner = x_inner + beta * torch.sign(grad_inner)
            x_inner = clip_by_tensor(x_inner, image_min, image_max)
            noise_inner_all[j] = grad_inner.clone()
        noise =noise_inner_all[-1].clone() 
        noise = noise / torch.mean(torch.abs(noise), dim=[1, 2, 3], keepdims=True)
        grad = noise + momentum * grad
        images = x_before +  alpha * torch.sign(grad)
        images = clip_by_tensor(images, image_min, image_max)
    return images

def I_FGSM_SMER(surrogate_models,images, labels, args,num_iter = 10):
    eps = args.eps/255.0
    alpha = args.alpha/255.0
    beta = alpha
    image_min = clip_by_tensor(images - eps, 0.0, 1.0)
    image_max = clip_by_tensor(images + eps, 0.0, 1.0)
    m = len(surrogate_models) 
    m_smer = m*4
    weight_selection = Weight_Selection(m).to(images.device)
    optimizer = torch.optim.SGD(weight_selection.parameters(),lr=2e-2,weight_decay=2e-3)
    grad = 0
    for _ in range(num_iter):
        if images.grad is not None:
            images.grad.zero_()
        images = Variable(images, requires_grad = True)
        x_inner = images.detach()
        x_before = images.clone()
        noise_inner_all = torch.zeros([m_smer, *images.shape]).to(images.device)
        grad_inner = torch.zeros_like(images)
        options = []
        for _ in range(int(m_smer / m)):
            options_single=[j for j in range(m)]
            np.random.shuffle(options_single)
            options.append(options_single)
        options = np.reshape(options,-1)
        for j in range(m_smer):
            option = options[j]
            grad_single = surrogate_models[option]
            x_inner.requires_grad = True
            out_logits = grad_single(x_inner)
            if type(out_logits) is list:
                out = weight_selection(out_logits[0],option)
                aux_out = weight_selection(out_logits[1],option)
            else:
                out = weight_selection(out_logits,option)
            loss = F.cross_entropy(out, labels)
            if type(out_logits) is list:
                loss = loss + F.cross_entropy(aux_out, labels)
            noise_im_inner = torch.autograd.grad(loss,x_inner)[0]
            group_logits = 0
            group_aux_logits = 0
            for m_step, model_s in enumerate(surrogate_models):
                out_logits = model_s(x_inner)
                if type(out_logits) is list:
                    logits = weight_selection(out_logits[0],m_step)
                    aux_logits = weight_selection(out_logits[1],m_step)
                else:
                    logits = weight_selection(out_logits,m_step)
                group_logits = group_logits + logits / m
                if type(out_logits) is list:
                    group_aux_logits = group_aux_logits + aux_logits / m
            loss = F.cross_entropy(group_logits,labels)
            if type(out_logits) is list:
                loss = loss + F.cross_entropy(group_aux_logits,labels)
            outer_loss = -torch.log(loss)
            x_inner.requires_grad = False
            outer_loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            noise_inner = noise_im_inner
            noise_inner = noise_inner / torch.mean(torch.abs(noise_inner), dim=[1, 2, 3], keepdims=True)
            grad_inner = grad_inner + noise_inner
            x_inner = x_inner + beta * torch.sign(grad_inner)
            x_inner = clip_by_tensor(x_inner, image_min, image_max)
            noise_inner_all[j] = grad_inner.clone()
        grad =noise_inner_all[-1].clone()
        images = x_before +  alpha * torch.sign(grad)
        images = clip_by_tensor(images, image_min, image_max)
    return images

### DEAA 

#### Helper (DEAA)

In [13]:
class NormalizedModel(nn.Module):
    def __init__(self, model, mean, std):
        super().__init__()
        self.model = model
        self.register_buffer('mean', torch.tensor(mean).view(1,3,1,1))
        self.register_buffer('std', torch.tensor(std).view(1,3,1,1))

    def forward(self, x):
        x = (x - self.mean) / self.std
        return self.model(x)

def get_last_linear_layer(model):
    """
    Find the last Linear layer in timm models or wrapped models (NormalizedModel).
    """
    # unwrap NormalizedModel if needed
    if isinstance(model, NormalizedModel):
        model = model.model

    # Common attribute names for classifier heads
    candidate_attrs = ['head', 'heads', 'fc', 'classifier', 'mlp_head']

    for attr in candidate_attrs:
        if hasattr(model, attr):
            layer = getattr(model, attr)
            # If it's a Linear layer
            if isinstance(layer, nn.Linear):
                return layer
            # If it's a Sequential or Module, search inside
            if isinstance(layer, nn.Module):
                last_linear = None
                for m in reversed(list(layer.modules())):
                    if isinstance(m, nn.Linear):
                        last_linear = m
                        break
                if last_linear is not None:
                    return last_linear

    # Fallback: scan all modules
    last_linear = None
    for m in model.modules():
        if isinstance(m, nn.Linear):
            last_linear = m
    if last_linear is not None:
        return last_linear

    raise RuntimeError(f"No Linear layer found in model {model.__class__.__name__}")



def get_features_before_last_linear(model, x):
    """
    Extract features before the final classifier, works for CNNs and ViTs.
    """
    # unwrap NormalizedModel if present
    if isinstance(model, NormalizedModel):
        model = model.model

    # Common classifier attributes
    candidate_attrs = ['head', 'heads', 'fc', 'classifier', 'mlp_head']
    classifier = None
    for attr in candidate_attrs:
        if hasattr(model, attr):
            classifier = getattr(model, attr)
            break

    features = {}

    def hook(module, input, output):
        features['feat'] = input[0].detach()

    if classifier is not None:
        handle = classifier.register_forward_hook(hook)
    else:
        # fallback: attach hook to last module
        last_module = list(model.modules())[-1]
        handle = last_module.register_forward_hook(hook)

    model.eval()
    with torch.no_grad():
        _ = model(x)
    handle.remove()

    if 'feat' not in features:
        raise RuntimeError(f"Failed to capture features from model {model.__class__.__name__}")

    return features['feat']

#### DEAA Main 

In [14]:
import torch
import torch.nn as nn
import torchvision.models as models
from torch.utils.data import DataLoader
import numpy as np
import faiss
from tqdm import tqdm
import torch.nn.functional as F


def softmax(x, dim=0):
    return F.softmax(x, dim=dim)


def cosine_similarity(x, y, dim=1, eps=1e-8):
    return F.cosine_similarity(x, y, dim=dim, eps=eps)


# ðŸ‘€ Dummy visualization (replace with your function)
def visualize_test_and_roc(test_img, roc_imgs, local_labels):
    print("Visualization placeholder: Test image + RoC samples")
    print(f"RoC labels: {local_labels}")


class VisionDES_2: 
    def __init__(self, dsel_dataset, pool): 
        self.dsel_dataset = dsel_dataset
        self.dsel_loader = DataLoader(dsel_dataset, batch_size=32, shuffle=False) 
        self.dino_model = timm.create_model('vit_base_patch16_224.dino', pretrained=True).to(device)
        self.dino_model.eval()  
        self.pool = pool 

        self.suspected_model_votes = [] 
        
        
    def dino_embedder(self, images):
        if images.shape[1] == 1:
            images = images.repeat(1, 3, 1, 1)
        return self.dino_model.forward_features(images)


    def fit(self): 
        dsel_embeddings = []
        dsel_labels = []
    
        with torch.no_grad():
            for imgs, labels in tqdm(self.dsel_loader):
                imgs = imgs.to(device)
                embs = self.dino_embedder(imgs).cpu()  
                dsel_embeddings.append(embs)
                dsel_labels.append(labels)
    
        # Keep as tensor
        dsel_embeddings_tensor = torch.cat(dsel_embeddings).detach().cpu()  
        cls_tensor = dsel_embeddings_tensor[:, 0, :]  
    
        # Convert to NumPy
        cls_embeddings = np.ascontiguousarray(cls_tensor.numpy(), dtype='float32')
        self.dsel_embeddings = cls_embeddings
        self.dsel_labels = torch.cat(dsel_labels).numpy()
    
        # Build FAISS index
        embedding_dim = cls_embeddings.shape[1]
        self.index = faiss.IndexFlatL2(embedding_dim)
        self.index.add(cls_embeddings)

    
    def get_top_n_competent_models(self, test_img, k=7, top_n=3, use_sim=False, sim_threshold=0, alpha=0.6):
        # Step 1: Get DINO CLS embedding for the test image
        img_for_dino = test_img.unsqueeze(0).to(device)

        with torch.no_grad():
            test_emb = self.dino_model.forward_features(img_for_dino).cpu().numpy().astype('float32')
            test_emb = test_emb[:, 0, :]  # CLS token only
    
        # Step 2: Find k nearest neighbors in FAISS
        distances, neighbors = self.index.search(test_emb, k)
        neighbor_idxs = neighbors[0]
        local_labels = np.array(self.dsel_labels[neighbor_idxs]).flatten()
    
        # Step 3: Get RoC images
        with torch.no_grad():
            roc_imgs = torch.stack([self.dsel_dataset[idx][0] for idx in neighbor_idxs]).to(device)
    
        # Step 4: Evaluate classifiers
        competences, feature_similarities, correct_counts, grad_vectors  = [], [], [], []
    
        for clf in self.pool:
            clf.eval()
            with torch.no_grad():
                outputs = clf(roc_imgs)
                preds = outputs.argmax(dim=1).cpu().numpy()
                correct = (preds == local_labels).sum()
                competences.append(correct / k)
                correct_counts.append(correct)
    
                # Feature similarity
                test_feat = get_features_before_last_linear(clf, test_img.unsqueeze(0).to(device))
                roc_feats = get_features_before_last_linear(clf, roc_imgs)
                mean_feat = roc_feats.mean(dim=0, keepdim=True)
                sim = cosine_similarity(test_feat.flatten().unsqueeze(0), mean_feat.flatten().unsqueeze(0))
                feature_similarities.append(sim.item())

            test_img_req = test_img.clone().detach().unsqueeze(0).to(device)
            test_img_req.requires_grad_(True)

            out = clf(test_img_req)
            pseudo_label = out.argmax(dim=1)  # keep grad path alive
            loss = F.cross_entropy(out, pseudo_label)
            grad = torch.autograd.grad(loss, test_img_req)[0]
            grad_vec = grad.flatten()  # flatten for cosine similarity
            grad_vectors.append(grad_vec.cpu())


        # Step 5: Compute gradient-based diversity
        grads_tensor = torch.stack(grad_vectors).to(device) 
        # L2-normalize per-model gradient vector
        grads_norm = F.normalize(grads_tensor, p=2, dim=1, eps=1e-8)  # (K, D)
        
        # compute cosine similarity matrix
        cos_sim = grads_norm @ grads_norm.t()  # (K, K)  (equivalent to pairwise cosine) 

        # zero out diagonal (self-sim = 1)
        K = cos_sim.size(0)
        cos_sim.fill_diagonal_(0.0)
        
        # average similarity over other models only
        mean_sim_other = cos_sim.sum(dim=1) / float(K - 1)  # (K,)
        
        # diversity score: lower similarity -> higher diversity
        diversity_scores = (mean_sim_other).cpu().numpy()
        diversity_scores = (diversity_scores - diversity_scores.min()) / (diversity_scores.max() - diversity_scores.min() + 1e-8)

        # print("Diversity", diversity_scores)
        # print("Competences", competences) 

        final_scores = [alpha * c + (1 - alpha) * d for c, d in zip(competences, diversity_scores)] 
    
        # Step 7: Select top_n models
        top_indices = np.argsort(final_scores)[::-1][:top_n]
        # print("final_scores", final_scores)
        # print("Top_indices", top_indices)
        top_models = [self.pool[i] for i in top_indices]
    
        return top_models

### Setup 

In [15]:
ens_models = [
    get_models("imagenet", "resnet18", "resnet18"), 
    get_models("imagenet", "inception_v3", "inc_v3"), 
    get_models("imagenet", "deit_tiny_patch16_224", "deit_t"),
    get_models("imagenet", "vit_tiny_patch16_224", "vit_t"), 
    get_models("imagenet", "efficientnet_b0", "efficientnet_b0"), 
    get_models("imagenet", "xcit_tiny_12_p8_224", "swin_t"), 
]  

In [16]:
des_model = VisionDES_2(val_dataset, ens_models)
des_model.fit()

100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 938/938 [07:34<00:00,  2.07it/s]


In [17]:
saved_index = des_model.index
saved_dsel_embeddings = des_model.dsel_embeddings 
saved_dsel_labels = des_model.dsel_labels

# des_model.index = saved_index 
# des_model.dsel_embeddings = saved_dsel_embeddings 
# des_model.dsel_labels = saved_dsel_labels

In [18]:
import argparse

def get_args():
    parser = argparse.ArgumentParser(description='SMER')

    parser.add_argument('--dataset', type=str, default='imagenet_compatible')
    parser.add_argument('--batch-size', type=int, default=50)
    parser.add_argument('--image-size', type=int, default=224)
    parser.add_argument('--num_worker', type=int, default=4)
    parser.add_argument('--attack_method', type=str, default='MI_FGSM_SMER')
    parser.add_argument('--image-dir', type=str)
    parser.add_argument('--image-info', type=str, default='')
    parser.add_argument('--gpu-id', type=int, default=0)

    # attack params
    parser.add_argument('--eps', type=float, default=8.0)
    parser.add_argument('--alpha', type=float, default=2)
    parser.add_argument('--iters', type=int, default=10)
    parser.add_argument('--momentum', type=float, default=1.0)
    parser.add_argument('--beta', type=float, default=10)

    # FIX for Jupyter
    args, unknown = parser.parse_known_args()
    return args

# Correct call
args = get_args()

In [20]:
from torchmetrics.functional.image import structural_similarity_index_measure as ssim
import torch

adv_list = []
orig_list = []
labels_list = []
noise_rates = []
pixel_diffs = []


def ensure_batch(x):
    return x if x.dim() == 4 else x.unsqueeze(0)

def to_unit_range(x):
    x = ensure_batch(x).float()
    if x.max().item() > 1.5:
        x = x / 255.0
    return torch.clamp(x, 0.0, 1.0)

torch.cuda.empty_cache()

for img, label in tqdm(hard_loader, desc="Generating MI-FGSM adversarials (GPU)"):

    img = img.to(device)
    label = label.to(device)

    # ---- Generate ADV ----
    with torch.enable_grad():
        selected_models = des_model.get_top_n_competent_models(
                img[0],                    
                k=7,
                top_n=4,
                use_sim=False,
                sim_threshold=0,
                alpha=0.5
            )
        adv_img = PGD_SMER(
            selected_models,
            img,
            label,
            args,
            num_iter=10
        )

    # ---- Move everything to CPU IMMEDIATELY ----
    img_cpu = img.squeeze(0).cpu()
    adv_cpu = adv_img.squeeze(0).cpu()
    label_cpu = label.squeeze(0).cpu()

    adv_list.append(adv_cpu)
    orig_list.append(img_cpu)
    labels_list.append(label_cpu)

    # ---- Now compute SSIM on CPU (safe) ----
    img_for_ssim = to_unit_range(img_cpu)
    adv_for_ssim = to_unit_range(adv_cpu)

    ssim_val = ssim(adv_for_ssim, img_for_ssim)

    noise_rates.append(1.0 - float(ssim_val))
    pixel_diffs.append((adv_for_ssim - img_for_ssim).abs().mean().item())

    # ---- FREE GPU memory ----
    del img, label, adv_img
    torch.cuda.empty_cache()

# ---------- Final stacking (CPU only) ----------
adv_all = torch.stack(adv_list)
orig_all = torch.stack(orig_list)
labels_all = torch.stack(labels_list)

noise_rates = torch.tensor(noise_rates)
pixel_diffs = torch.tensor(pixel_diffs)

print(f"Generated {adv_all.shape[0]} adversarial samples")
print(f"Noise (1 - SSIM): mean={noise_rates.mean():.6f}, std={noise_rates.std():.6f}")
print(f"Pixel diff mean:   {pixel_diffs.mean():.6f}")


Generating MI-FGSM adversarials (GPU): 100%|â–ˆ| 1000/1000 [8:53:36<00:00, 32.02s/


Generated 1000 adversarial samples
Noise (1 - SSIM): mean=0.134501, std=0.056224
Pixel diff mean:   0.019760


### Test on Target Models 

In [23]:
import torch
from torch.utils.data import TensorDataset, DataLoader
from tqdm import tqdm

batch_size = 32  # tune this for your GPU
dataset = TensorDataset(adv_all, labels_all)
loader = DataLoader(dataset, batch_size=batch_size, shuffle=False,
                    num_workers=4, pin_memory=True)

In [28]:
target_models = [
    # get_models("imagenet", "resnet152", "resnet152"),
    # get_models("imagenet", "wide_resnet101_2", "wrn101_2"),     
    # get_models("imagenet", "regnety_320", "regnety_320"),
    # get_models("imagenet", "vgg19", "vgg19"),
    # get_models("imagenet", "vit_base_patch16_224", "vit_b"),
    # get_models("imagenet", "deit_base_patch16_224", "deit_b"),
    get_models("imagenet", "swin_base_patch4_window7_224", "swin_b"), 
    get_models("imagenet", "mixer_b16_224", "vit_t"), 
    get_models("imagenet", "convmixer_768_32", "vit_t")
] 

In [29]:
with torch.no_grad():
    for t_model in target_models:
        name = getattr(t_model, "name", t_model.__class__.__name__)
        t_model.eval()
        t_model.to(device)

        fooled = 0
        total = 0

        for imgs_cpu, labels_cpu in tqdm(loader, desc=f"ASR {name}"):
            # Move to device here
            imgs = imgs_cpu.to(device, non_blocking=True)
            labels = labels_cpu.to(device, non_blocking=True)

            outputs = t_model(imgs)
            if isinstance(outputs, (tuple, list)):
                outputs = outputs[0]
            preds = outputs.argmax(dim=1)

            fooled += (preds != labels).sum().item()
            total += labels.size(0)

            # free cache per batch (helps on tight GPUs)
            if device.type == "cuda":
                torch.cuda.empty_cache()

        asr = 100.0 * fooled / total if total > 0 else 0.0
        print(f"{name}: ASR = {asr:.2f}%  ({fooled}/{total} fooled)")

ASR Sequential: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 32/32 [00:05<00:00,  5.69it/s]


Sequential: ASR = 60.70%  (607/1000 fooled)


ASR Sequential: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 32/32 [00:03<00:00,  8.30it/s]


Sequential: ASR = 82.00%  (820/1000 fooled)


ASR Sequential: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 32/32 [00:08<00:00,  3.59it/s]

Sequential: ASR = 82.90%  (829/1000 fooled)





In [86]:
adv_all.shape

torch.Size([1000, 3, 224, 224])

In [None]:
import os
from torchvision.utils import save_image

# Base results folder
base_folder = "results"
# Create new folder with a unique name, e.g., timestamp
import time
timestamp = time.strftime("%Y%m%d_%H%M%S")
save_folder = os.path.join(base_folder, f"adv_images_{timestamp}")
os.makedirs(save_folder, exist_ok=True)

# Save images
for idx, adv_img in enumerate(adv_all):
    # adv_img is in [C,H,W], in 0-1 range (torch.float)
    save_path = os.path.join(save_folder, f"adv_{idx:04d}.png")
    save_image(adv_img, save_path)

print(f"Saved {adv_all.shape[0]} adversarial images to {save_folder}")