In [1]:
import numpy as np
import os
import pickle
import numpy as np
from torch.utils.data import Dataset
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import torchvision.models as models

class Cifar10LT(Dataset):
    def __init__(self, root, imb_factor=0.1, transform=None, train=True):
        """
        Create an imbalanced CIFAR-10LT dataset.
        :param root: Root directory where the data is stored.
        :param imb_factor: Imbalance factor (range: (0, 1]). Smaller values indicate higher imbalance.
        :param transform: Image preprocessing transformations.
        :param train: Boolean indicating whether to load the training or test set.
        """
        self.transform = transform
        self.train = train
        self.data, self.labels = self._load_data(root, train)
        self.indices = self._generate_imbalance(imb_factor)

    def _load_data(self, root, train):
        """
        Load CIFAR-10 data from the given directory.
        :param root: Root directory containing the CIFAR-10 dataset.
        :param train: Boolean indicating whether to load training or test data.
        :return: Tuple of data and labels.
        """
        root = os.path.join(root, "cifar-10-batches-py")
        batches = [f"data_batch_{i}" for i in range(1, 6)] if train else ["test_batch"]

        data, labels = [], []
        for batch in batches:
            with open(os.path.join(root, batch), 'rb') as f:
                batch_data = pickle.load(f, encoding='bytes')
                data.append(batch_data[b'data'])
                labels += batch_data[b'labels']

        data = np.concatenate(data)
        return data, np.array(labels)

    def _generate_imbalance(self, imb_factor):
        """
        Generate an imbalanced dataset based on the given imbalance factor.
        :param imb_factor: Imbalance factor.
        :return: List of selected indices representing the imbalanced dataset.
        """
        labels = self.labels
        num_classes = len(np.unique(labels))
        max_samples = len(labels) // num_classes

        # Calculate the number of samples per class based on the imbalance factor
        num_samples_per_class = [
            int(max_samples * (imb_factor ** (i / (num_classes - 1)))) for i in range(num_classes)
        ]

        indices = []
        for class_idx in range(num_classes):
            class_indices = np.where(labels == class_idx)[0]
            np.random.shuffle(class_indices)
            selected_indices = class_indices[:num_samples_per_class[class_idx]]
            indices.extend(selected_indices)

        return indices

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, idx):
        """
        Retrieve a single sample and its label.
        :param idx: Index of the sample to retrieve.
        :return: Tuple of image and label.
        """
        img = self.data[self.indices[idx]]
        label = self.labels[self.indices[idx]]
        img = np.transpose(np.reshape(img, (3, 32, 32)), (1, 2, 0))  # Convert to (H, W, C) format
        if self.transform:
            img = self.transform(img)
        return img, label
def download_cifar10(root):
    """
    Download the CIFAR-10 dataset.
    :param root: Root directory to store the dataset.
    """
    if not os.path.exists(root):
        os.makedirs(root)
    torchvision.datasets.CIFAR10(root=root, download=True)

In [2]:
root = './cifar-10-batches-py'
download_cifar10(root)

Files already downloaded and verified


In [3]:
from torchvision import transforms
from torch.utils.data import DataLoader

# Data augmentation
train_transform = transforms.Compose([
    transforms.ToPILImage(),  # Convert to PIL image
    transforms.Resize((224, 224)),  # Resize to 224x224
    transforms.RandomCrop(224, padding=4),  # Random crop with padding
    transforms.RandomHorizontalFlip(),  # Random horizontal flip
    transforms.ToTensor(),  # Convert to tensor
])

test_transform = transforms.Compose([
    transforms.ToPILImage(),  # Convert to PIL image
    transforms.Resize((224, 224)),  # Resize to 224x224
    transforms.ToTensor(),  # Convert to tensor
])

# Create datasets
train_dataset = Cifar10LT(root=root, imb_factor=0.1, transform=train_transform, train=True)
test_dataset = Cifar10LT(root=root, imb_factor=0.1, transform=test_transform, train=False)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=128, num_workers=2, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=128, num_workers=2, shuffle=False)
dataloaders = {'train': train_loader, 'test': test_loader}

# Print dataset information
print(f"Number of samples in the training set: {len(train_dataset)}")
unique_train_labels, train_counts = np.unique(train_dataset.labels[train_dataset.indices], return_counts=True)
print("Number of samples per class in the training set:", dict(zip(unique_train_labels, train_counts)))

print(f"Number of samples in the test set: {len(test_dataset)}")
unique_test_labels, test_counts = np.unique(test_dataset.labels[test_dataset.indices], return_counts=True)
print("Number of samples per class in the test set:", dict(zip(unique_test_labels, test_counts)))

# Example: Iterate through training data
for images, labels in train_loader:
    print("Training batch image shape:", images.shape)
    print("Training batch label shape:", labels.shape)
    break  # Display only the first batch

# Example: Iterate through test data
for images, labels in test_loader:
    print("Test batch image shape:", images.shape)
    print("Test batch label shape:", labels.shape)
    break  # Display only the first batch


Number of samples in the training set: 20431
Number of samples per class in the training set: {np.int64(0): np.int64(5000), np.int64(1): np.int64(3871), np.int64(2): np.int64(2997), np.int64(3): np.int64(2320), np.int64(4): np.int64(1796), np.int64(5): np.int64(1391), np.int64(6): np.int64(1077), np.int64(7): np.int64(834), np.int64(8): np.int64(645), np.int64(9): np.int64(500)}
Number of samples in the test set: 4084
Number of samples per class in the test set: {np.int64(0): np.int64(1000), np.int64(1): np.int64(774), np.int64(2): np.int64(599), np.int64(3): np.int64(464), np.int64(4): np.int64(359), np.int64(5): np.int64(278), np.int64(6): np.int64(215), np.int64(7): np.int64(166), np.int64(8): np.int64(129), np.int64(9): np.int64(100)}
Training batch image shape: torch.Size([128, 3, 224, 224])
Training batch label shape: torch.Size([128])
Test batch image shape: torch.Size([128, 3, 224, 224])
Test batch label shape: torch.Size([128])


In [4]:
from torchvision.models import resnet18
import torch
model = resnet18(weights=None)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_features = model.fc.in_features  
model.fc = nn.Linear(num_features, 10)  
model.to(device)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [5]:
optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[75, 90], gamma=0.1)

In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

def HARL(model,
         x_natural,
         y,
         num_classes,
         step_size=0.003,
         epsilon=0.031,
         perturb_steps=10,
         alpha=0.1,
         beta=0.1,
         gamma=0.1,
         distance='l_inf'):
    """
    HARL: Hierarchical Adversarial Robustness Loss
    Combines adversarial example generation (PGD) and hierarchical equalization loss into a unified loss function.

    :param model: PyTorch model.
    :param x_natural: Clean input samples [batch_size, ...].
    :param y: Ground truth labels [batch_size].
    :param num_classes: Number of classes in the classification task.
    :param step_size: Step size for PGD attack.
    :param epsilon: Perturbation size for PGD attack.
    :param perturb_steps: Number of steps for PGD attack.
    :param alpha: Weight for balancing the losses across classes.
    :param beta: Weight for balancing the hierarchical equalization.
    :param gamma: Weight for adjusting the focus on rare classes.
    :param distance: Distance metric for the attack ('l_inf' supported).
    :return: Computed HARL value.
    """
    model.eval()
    
    # Step 1: Generate adversarial examples using PGD
    x_adv = x_natural.detach() + 0.001 * torch.randn(x_natural.shape).to(x_natural.device).detach()
    if distance == 'l_inf':
        for _ in range(perturb_steps):
            x_adv.requires_grad_()
            with torch.enable_grad():
                loss_ce = F.cross_entropy(model(x_adv), y)
            grad = torch.autograd.grad(loss_ce, [x_adv])[0]
            x_adv = x_adv.detach() + step_size * torch.sign(grad.detach())
            x_adv = torch.min(torch.max(x_adv, x_natural - epsilon), x_natural + epsilon)
            x_adv = torch.clamp(x_adv, 0.0, 1.0)

    model.train()

    x_adv = Variable(torch.clamp(x_adv, 0.0, 1.0), requires_grad=False)
    
    # Step 2: Compute Hierarchical Equalization Loss
    outputs = model(x_adv)  # Model outputs for adversarial examples
    batch_size, num_classes_actual = outputs.size()
    assert num_classes_actual == num_classes, "Mismatch in number of classes"

    # Compute cross-entropy loss
    pixel_loss = F.cross_entropy(outputs, y, reduction='none')

    # Compute class-wise losses
    class_losses = torch.zeros(num_classes).to(outputs.device)
    for cls in range(num_classes):
        mask = (y == cls).float()
        class_loss = (pixel_loss * mask).sum() / (mask.sum() + 1e-10)
        class_losses[cls] = class_loss

    # Compute average and normalized class losses
    avg_class_loss = class_losses.mean()
    normalized_class_losses = class_losses / (class_losses.sum() + 1e-10)

    # Loss components
    balanced_loss = alpha * avg_class_loss
    hierarchical_loss = beta * ((class_losses - avg_class_loss) ** 2).mean()
    rare_class_loss = gamma * (normalized_class_losses ** 2).sum()

    # Total loss
    total_loss = balanced_loss + hierarchical_loss + rare_class_loss
    return total_loss


In [7]:
import torch
from sklearn.metrics import balanced_accuracy_score

def evaluate_model(model, dataloader, device):
    model.eval()
    all_targets = []
    all_predictions = []

    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs = inputs.to(device)
            targets = targets.to(device).squeeze()
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            all_targets.extend(targets.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())

    balanced_accuracy = balanced_accuracy_score(all_targets, all_predictions)
    return balanced_accuracy

In [8]:
import torch
from tqdm import tqdm

def train_model(model, dataloaders, optimizer, scheduler, num_epochs=100, ce_epochs=40):
    train_loss_history = []

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        for inputs, targets in tqdm(dataloaders['train'], desc=f'Epoch {epoch + 1}/{num_epochs}', leave=False):
            inputs = inputs.to(device)
            targets = targets.to(device).squeeze()
            optimizer.zero_grad()
            outputs = model(inputs)

            if epoch < ce_epochs:
                loss = F.cross_entropy(outputs, targets)
            else:
                loss = HARL(model, inputs, targets, num_classes=10)

            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)

        scheduler.step()
        epoch_loss = running_loss / len(dataloaders['train'].dataset)
        train_loss_history.append(epoch_loss)

        # Evaluate the model after each epoch
        balanced_accuracy = evaluate_model(model, dataloaders['test'], device)

        # Output the loss and balanced accuracy
        print(f'Epoch {epoch + 1}/{num_epochs} - Training Loss: {epoch_loss:.4f}, Test Balanced Accuracy: {balanced_accuracy:.4f}')

    torch.save(model.state_dict(), "model_final.pt")


In [9]:
train_model(model, dataloaders, optimizer, scheduler, num_epochs=10 , ce_epochs=4)

                                                                                

Epoch 1/10 - Training Loss: 2.2011, Test Balanced Accuracy: 0.1787


                                                                                

Epoch 2/10 - Training Loss: 1.5496, Test Balanced Accuracy: 0.2902


                                                                                

Epoch 3/10 - Training Loss: 1.3424, Test Balanced Accuracy: 0.3805


                                                                                

Epoch 4/10 - Training Loss: 1.2075, Test Balanced Accuracy: 0.3783


                                                                                

Epoch 5/10 - Training Loss: 0.2386, Test Balanced Accuracy: 0.3952


                                                                                

Epoch 6/10 - Training Loss: 0.2170, Test Balanced Accuracy: 0.4001


                                                                                

Epoch 7/10 - Training Loss: 0.2128, Test Balanced Accuracy: 0.4207


                                                                                

Epoch 8/10 - Training Loss: 0.2111, Test Balanced Accuracy: 0.4393


                                                                                

Epoch 9/10 - Training Loss: 0.2097, Test Balanced Accuracy: 0.4359


                                                                                

Epoch 10/10 - Training Loss: 0.2072, Test Balanced Accuracy: 0.4409


In [None]:
import torch
import numpy as np
import csv
from tqdm import tqdm  
from sklearn.metrics import roc_auc_score
from torch.utils.data import DataLoader
from torchattacks import FGSM, PGD, CW, DeepFool, AutoAttack
from collections import defaultdict

# Assuming `test_dataset` and `model` are already defined
BATCH_SIZE = 32
NUM_CLASSES = 10  # Assuming there are 10 classes
test_loader = DataLoader(dataset=test_dataset, batch_size=BATCH_SIZE, shuffle=False)

def evaluate_model_with_attack_by_class(model, dataloader, attack, num_classes):
    """
    Evaluate the model's performance under adversarial attack for each class.
    
    Parameters:
    - model: The model to be evaluated.
    - dataloader: DataLoader providing test data.
    - attack: The adversarial attack to evaluate.
    - num_classes: The number of classes in the dataset.

    Returns:
    - class_accuracies: Accuracy for each class.
    - avg_accuracy: Overall average accuracy.
    - balanced_accuracy: Accuracy averaged equally across all classes.
    """
    model.eval()
    
    class_correct = defaultdict(int)
    class_total = defaultdict(int)
    total_correct = 0
    total_samples = 0

    # Iterate over test data with progress bar
    for images, labels in tqdm(dataloader, desc=f'Evaluating {attack}'):
        labels = labels.cuda()
        images = attack(images.cuda(), labels).cpu()  # Apply the attack
        outputs = model(images.cuda())
        
        _, predicted = torch.max(outputs.data, 1)
        total_samples += labels.size(0)
        total_correct += (predicted == labels).sum().item()

        # Record results per class
        for i in range(labels.size(0)):
            label = labels[i].item()
            class_total[label] += 1
            class_correct[label] += (predicted[i] == label).item()

    # Calculate accuracy for each class
    class_accuracies = {}
    for class_idx in range(num_classes):
        if class_total[class_idx] > 0:
            class_accuracies[class_idx] = class_correct[class_idx] / class_total[class_idx]
        else:
            class_accuracies[class_idx] = None

    # Calculate overall average accuracy
    avg_accuracy = total_correct / total_samples

    # Calculate balanced accuracy (mean per-class accuracy)
    balanced_accuracy = np.mean([acc for acc in class_accuracies.values() if acc is not None])

    return class_accuracies, avg_accuracy, balanced_accuracy


def save_result_to_csv(attack_name, class_accuracies, avg_accuracy, balanced_accuracy, filename='attack_results.csv'):
    """
    Save attack results to a CSV file.
    
    Parameters:
    - attack_name: Name of the adversarial attack.
    - class_accuracies: Accuracy for each class.
    - avg_accuracy: Overall average accuracy.
    - balanced_accuracy: Accuracy averaged equally across all classes.
    - filename: Name of the CSV file to save the results.
    """
    with open(filename, mode='a', newline='') as file:
        writer = csv.writer(file)
        
        # Write per-class accuracies
        for class_idx, accuracy in class_accuracies.items():
            writer.writerow([f'{attack_name} - Class {class_idx}', accuracy])
        
        # Write overall and balanced accuracies
        writer.writerow([f'{attack_name} - Avg Accuracy', avg_accuracy, ''])
        writer.writerow([f'{attack_name} - Balanced Accuracy', balanced_accuracy, ''])
        print(f'{attack_name} - Avg Accuracy: {avg_accuracy:.4f}, Balanced Accuracy: {balanced_accuracy:.4f} saved to {filename}')


# Initialize the CSV file with headers
with open('attack_results.csv', mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['Attack', 'Accuracy'])

# Define attacks to evaluate
attacks = [
    ('FGSM', FGSM(model, eps=8/255)),
    ('PGD-20', PGD(model, eps=8/255, alpha=1/255, steps=20)),
    ('PGD-100', PGD(model, eps=8/255, alpha=1/255, steps=100)),
    ('CW', CW(model, c=10, kappa=0, steps=100, lr=0.01)),
    ('AutoAttack', AutoAttack(model, norm='Linf', eps=8/255, version='standard', n_classes=NUM_CLASSES, seed=None, verbose=False))
]

# Evaluate each attack and save results
for attack_name, attack in attacks:
    print(f'Evaluating with {attack_name}...')
    class_accuracies, avg_accuracy, balanced_accuracy = evaluate_model_with_attack_by_class(model, test_loader, attack, NUM_CLASSES)
    save_result_to_csv(attack_name, class_accuracies, avg_accuracy, balanced_accuracy, filename='attack_results.csv')


Evaluating with FGSM...


Evaluating FGSM(model_name=ResNet, device=cuda:0, attack_mode=default, targeted=


FGSM - Avg Accuracy: 0.2791, Balanced Accuracy: 0.2621 saved to attack_results.csv
Evaluating with PGD-20...


Evaluating PGD(model_name=ResNet, device=cuda:0, attack_mode=default, targeted=F


PGD-20 - Avg Accuracy: 0.2738, Balanced Accuracy: 0.2589 saved to attack_results.csv
Evaluating with PGD-100...


Evaluating PGD(model_name=ResNet, device=cuda:0, attack_mode=default, targeted=F


PGD-100 - Avg Accuracy: 0.2738, Balanced Accuracy: 0.2586 saved to attack_results.csv
Evaluating with CW...


Evaluating CW(model_name=ResNet, device=cuda:0, attack_mode=default, targeted=Fa


CW - Avg Accuracy: 0.4160, Balanced Accuracy: 0.4038 saved to attack_results.csv
Evaluating with AutoAttack...


Evaluating AutoAttack(model_name=ResNet, device=cuda:0, attack_mode=default, tar

AutoAttack - Avg Accuracy: 0.2260, Balanced Accuracy: 0.2011 saved to attack_results.csv



