**DESCRIPTION OF THE IMPLEMENTED METHOD**

This notebook provides a reimplementation of the MaskTune method presented in the paper "MaskTune: Mitigating Spurious Correlations by Forcing to Explore" (https://arxiv.org/abs/2210.00055). MaskTune is a technique designed to mitigate the impact of spurious correlations in deep learning models. Spurious correlations are unintended associations between input features and target variables that can lead models to make incorrect predictions, especially when these correlations do not hold in new or shifted domains. Traditional methods to address this issue often require supervision or annotation of spurious features, which can be impractical. MaskTune addresses this by masking features that a model has learned to rely on during training, forcing it to explore and utilize other potentially more relevant features. Across the code and the different datasets and tasks, we always follow the same steps:
1. training of a model using empirical risk minimization (ERM);
2. the most discriminative features identified by the model are masked using XGradCAM;
3. fine-tuning of the model for a single epoch on the modified (masked) dataset.

XGradCAM is a technique used to visualize and interpret the decisions of convolutional neural networks. It highlights the regions of an input image that are most influential in the model's prediction by generating a heatmap over the image. This heatmap shows which parts of the image the model focuses on when making a decision, helping to understand how the model interprets visual features.

The MaskTune technique encourages the model to discover and rely on a broader set of features, which enhances its robustness and generalization ability.
We implemented two key tasks: classification with spurious features and selective classification. For the first one, we used both the MNIST and CelebA datasets, while the second task was performed on the CIFAR10 dataset.

*Classification with spurious features*: this task involves training models on datasets where certain irrelevant or misleading features are unintentionally correlated with the target labels, failing to classify correctly new data that doesn’t exhibit the same correlations (for example, associating the grass with the label "cow" can cause errors in the classification of images with cows but without grass). In order to show the effectiveness of the MaskTune strategy in this task, a simple CNN was trained on the MNIST dataset to identify digits, introducing colored backgrounds as spurious features, while for CelebA, a pre-trained ResNet-50 model was used to classify the images based on a single attribute. Across these datasets, we applied MaskTune to mask out the spurious features identified by the models, forcing them to rely on more robust and representative features. Further details on the datasets, preprocessing, architectures and results are provided later in the section "classification with spurious features".

*Selective classification*: this task focuses on the model's ability to abstain from making predictions when uncertain, thereby reducing the risk of incorrect predictions. This task is particularly relevant in scenarios where incorrect decisions can have significant consequences (e.g. medical tasks). We used a ResNet-32 architecture on the CIFAR-10 dataset and the selective classification was performed using a threshold mechanism: while iterating over the validation set during the training phase, the probabilities corresponding to the selected output class were saved and a threshold was computed to drop the lowest 10% of predictions. This threshold corresponds to the highest value among the lowest 10% of the probabilities. The MaskTune approach was shown to improve the models' ability to selectively classify, by making the results more robust to spurious correlations and leading to more reliable and confident predictions. Further details on the dataset, preprocessing, architecture and results are provided later in the section "selective classification".

Overall, the tasks demonstrated that MaskTune can enhance both classification accuracy and the reliability of selective classification, especially in the presence of spurious features.

**INSTALLATIONS**

In [None]:
!pip install torchvision
!pip install timm
!pip install git+https://github.com/jacobgil/pytorch-grad-cam.git

**IMPORT**

In [None]:
import torch
import torchvision
from torchvision import datasets,transforms
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from google.colab import files
import os
import zipfile
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import torch.nn.functional as F
import pandas as pd
import timm
import random
import pickle
import shutil
from pytorch_grad_cam import XGradCAM
import cv2
import math
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix, roc_auc_score
from torch.cuda.amp import GradScaler, autocast
from torchvision.datasets import CIFAR10
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') #defining the device
print(f'Using device: {device}')

**CLASSES & FUNCTIONS**

Functions:
*   Validate: this function implements the evaluation of the model on the     validation set during the training. The model is set to evaluation mode to compute the loss and the accuracy on the validation data. This is useful to monitor the trend of the loss and avoid possiible overfitting issues.
*   save_checkpoints: this function saves the checkpoint in the given checkpoint path.
*   load_checkpoint: it is the complementary function of save_checkpoints, it loads the checkpoints in the model to restart from the last learned parameters.
*   mask_heatmap_using_threshold: this function has been taken from the paper code. It applies a binary mask to each heatmap according to a threshold computed as the sum of mean value of the heatmap and two times the standard deviation of the heatmap. It returns 0 if the pixel is above the threshold, 1 otherwise.
*   mask_tune:this function is used to first generate the heatmaps using XGradCAM heatmap generator, then it generates masks with the previously defined function 'mask_heatmap_using_threshold' and it masks the original images with an element-wise multiplication. It finally saves the masked images according to their label in the specified path.
*    bias_transform: this function adds a small 4x4 blue square on the top left corner of the image.
*    get_grad_cam_target_layer: this function is taken from the paper code and it is used by the XGradCAM to generate the heatmaps according to the features of the specified layer.
*    filter_classes: this function takes only the first 5 classes of the CIFAR10 dataset.
*    validate_selective_classification: this function implements a standard evaluation on the validation set (as the previously define 'Validate' function) but it also returns the probability values corresponding to the true class of each sample.

Classes:
*    CustomDataset: this class is used in order to retrieve previously transformed and preprocessed data which has been saved with torch.save function on google drive. We defined this class in order to avoid repeating the same steps between a colab runtime and another.
*    GrayscaleToRGB: this class is used to transform gray scale images to RGB ones. It is needed because, altough MNIST is a grayscale dataset, the addition of the small blue square requires the images to have 3 channels.
*   SimpleCNN: this class implements a CNN with 4 convolutional layers (backbone) and 2 final fully connected layers. The backbone consists of a sequence of convolutional layers, batch normalization, ReLU activations, and max pooling.
*   LambdaLayer: this class is used for dimensionality reasons in the cases in which the stride is different from 1 or the number of input channels is different from the number of output channels. It implements a downsampling.
*   BasicBlock: this class defines the basic block used by the Resnet32 class. It consists of two convolutional layers, two batch normalization layers and two ReLU activation functions (after each convolutional layer) and a downsampling layer (LambdaLayer).
*   ResNet32: this class defines a resnet architecture which is constituted by residual blocks (BasicBlock). It first implements a convolutional layer, then it defines three layers, each one composed of 5 basic blocks. The first layer keeps the dimensionality unchanged, while the first block of the other two implements a downsampling. After the last residual block the network does an average pooling and a flattening.





In [None]:
#this function implements the evaluation of the model on the validation set during the training.
#The model is set to evaluation mode to compute the loss and the accuracy on the validation data.
#This is useful to monitor the trend of the loss and avoid possiible overfitting issues.
def validate(model, val_loader, criterion):
    model.eval()
    val_loss = 0
    correct = 0
    with torch.no_grad():
        for data, targets in val_loader:
            data, targets = data.to(device), targets.to(device)
            outputs = model(data)
            loss = criterion(outputs, targets)
            val_loss += loss.item() * data.size(0)
            _, predicted = outputs.max(1)
            correct += predicted.eq(targets).sum().item()
    val_loss /= len(val_loader.dataset)
    accuracy = 100. * correct / len(val_loader.dataset)
    print(f'Validation Loss: {val_loss:.4f}, Accuracy: {accuracy:.2f}%')
    model.train()
    return val_loss, accuracy

In [None]:
#this function saves the checkpoint in the given checkpoint path.
def save_checkpoint(model, optimizer, epoch, loss, accuracy, checkpoint_path):
    checkpoint = {
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'epoch': epoch,
        'loss': loss,
        'accuracy': accuracy
    }
    torch.save(checkpoint, checkpoint_path)

In [None]:
#it is the complementary function of save_checkpoints,
#it loads the checkpoints in the model to restart from the last learned parameters
def load_checkpoint(model, optimizer, checkpoint_path):
   checkpoint = torch.load(checkpoint_path)
   model.load_state_dict(checkpoint['model_state_dict'])
   optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
   epoch = checkpoint['epoch']
   val_loss = checkpoint['loss']
   accuracy = checkpoint['accuracy']
   return model, optimizer, epoch, val_loss, accuracy

In [None]:
#this function has been taken from the paper code.
#It applies a binary mask to each heatmap according to a threshold computed as the sum of mean value of the heatmap and two times the standard deviation of the heatmap.
#It returns 0 if the pixel is above the threshold, 1 otherwise.
def mask_heatmap_using_threshold(heat_maps):
    mask_mean_value = np.nanmean(np.where(heat_maps > 0, heat_maps, np.nan), axis=(1, 2))[:, None, None]
    mask_std_value = np.nanstd(np.where(heat_maps > 0, heat_maps, np.nan), axis=(1, 2))[:, None, None]
    mask_threshold_value = mask_mean_value + 2 * mask_std_value
    return np.where(heat_maps > mask_threshold_value, 0, 1)

In [None]:
#this function is used to first generate the heatmaps using XGradCAM heatmap generator,
#then it generates masks with the previously defined function 'mask_heatmap_using_threshold' and it masks the original images with an element-wise multiplication.
#It finally saves the masked images according to their label in the specified path.
def mask_tune(images, masked_data_dir, images_paths, targets):
  heat_maps = heat_map_generator(images)
  masks = mask_heatmap_using_threshold(heat_maps)
  masks = np.repeat(masks[:, np.newaxis, :, :], 3, axis=1)
  for image_path, mask, target, data_dir in zip(images_paths, masks, targets, masked_data_dir):
    original_image = Image.open(image_path).convert('RGB')
    mask=mask.transpose(1, 2, 0)
    image_mask = cv2.resize(mask, dsize=original_image.size, interpolation=cv2.INTER_NEAREST)
    i=np.array(original_image)*image_mask
    target_dir = os.path.join(data_dir, str(target.item()))
    os.makedirs(target_dir, exist_ok=True)
    im = Image.fromarray(i.astype(np.uint8))
    im.save(os.path.join(target_dir, image_path.split("/")[-1]))

In [None]:
#MNIST function
#this function adds a small 4x4 blue square on the top left corner of the image.
def bias_transform(x):
  new_value = torch.tensor([0.0, 0.0, 1.0])
  x[:, :4, :4] = new_value.view(3, 1, 1)
  return x

In [None]:
#CIFAR10
#this function takes only the first 5 classes of the CIFAR10 dataset
def filter_classes(dataset, classes=[0, 1, 2, 3, 4]):
    indices = [i for i, (_, label) in enumerate(dataset) if label in classes]
    subset = torch.utils.data.Subset(dataset, indices)
    return subset

In [None]:
#CIFAR10
#this function implements a standard evaluation on the validation set
#(as the previously define 'Validate' function) but it also returns the probability values corresponding to the true class of each sample.
def validate_selective_classification(model, valloader, criterion):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    all_probs = []

    with torch.no_grad():
        for inputs, labels in valloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            probs = torch.nn.functional.softmax(outputs, dim=1)
            selected_probs = probs.gather(1, labels.view(-1, 1)).cpu().numpy()
            all_probs.extend(selected_probs)

            _, predicted = torch.max(probs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_loss /= len(valloader)
    accuracy = 100 * correct / total

    all_probs = np.array(all_probs).flatten()

    return val_loss, accuracy, all_probs

In [None]:
#generated by Chat GPT
#this class is used in order to retrieve previously transformed and preprocessed data which has been saved with torch.save function on google drive.
#We defined this class in order to avoid repeating the same steps between a colab runtime and another.
class CustomDataset(Dataset):
    def __init__(self, file_path):
        self.data_list = torch.load(file_path)

    def __len__(self):
        return len(self.data_list)

    def __getitem__(self, idx):
        return self.data_list[idx]


In [None]:
#MNIST class generated by Chat GPT
#this class is used to transform gray scale images to RGB ones. It is needed because, altough MNIST is a grayscale dataset,
#the addition of the small blue square requires the images to have 3 channels.
class GrayscaleToRGB:
    def __call__(self, img):
        return img.repeat(3, 1, 1)

In [None]:
#MNIST model with the same hyperparameters of the paper
#this class implements a CNN with 4 convolutional layers (backbone) and 2 final fully connected layers.
#The backbone consists of a sequence of convolutional layers, batch normalization, ReLU activations, and max pooling.
class SimpleCNN(nn.Module):
    def __init__(self, num_classes):
        super(SimpleCNN, self).__init__()
        self.backbone=nn.Sequential( nn.Conv2d(3, 16, (3, 3), (1, 1)),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.Conv2d(16, 16, (3, 3), (1, 1)),
            nn.BatchNorm2d(num_features=16),
            nn.ReLU(),
            nn.MaxPool2d((2, 2), (2, 2)),
            nn.Conv2d(16, 32, (3, 3), (1, 1)),
            nn.BatchNorm2d(num_features=32),
            nn.ReLU(),
            nn.Conv2d(32, 32, (3, 3), (1, 1)),
            nn.BatchNorm2d(num_features=32),
            nn.ReLU(),
            nn.MaxPool2d((2, 2), (2, 2)),
            nn.Flatten())
        self.fc1 = nn.Linear(in_features=512, out_features=256)
        self.bnfc=nn.BatchNorm1d(256)
        self.fc2 = nn.Linear(in_features=256, out_features=num_classes)

    def forward(self, x):
        x=self.backbone(x)
        x=self.fc1(x)
        x = F.relu(self.bnfc(x))
        x = self.fc2(x)
        return x

    #function taken from the paper code
    #this function is taken from the paper code and it is used by the XGradCAM to generate the heatmaps according to the features of the specified layer.
    def get_grad_cam_target_layer(self):
        return self.backbone[-3]

In [None]:
#CIFAR10
#this class is used for dimensionality reasons in the cases in which the stride is different from 1 or the number of input channels is different from the number of output channels.
#It implements a downsampling.
class LambdaLayer(nn.Module):
    def __init__(self, lambd):
        super(LambdaLayer, self).__init__()
        self.lambd = lambd

    def forward(self, x):
        return self.lambd(x)

In [None]:
#CIFAR10
#this class defines the basic block used by the Resnet32 class.
#It consists of two convolutional layers, two batch normalization layers and two ReLU activation functions (after each convolutional layer) and a downsampling layer (LambdaLayer).
class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.downsample = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.downsample = LambdaLayer(lambda x:
                                            F.pad(x[:, :, ::2, ::2], (0, 0, 0, 0, out_channels//4, out_channels//4), "constant", 0))


    def forward(self, x):
        identity = self.downsample(x)
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += identity
        out = self.relu(out)
        return out

In [None]:
#CIFAR10 model
#this class defines a resnet architecture which is constituted by residual blocks (BasicBlock).
#It first implements a convolutional layer, then it defines three layers, each one composed of 5 basic blocks. The first layer keeps the dimensionality unchanged, while the first block of the other two implements a downsampling.
#After the last residual block the network does an average pooling and a flattening.
class ResNet32(nn.Module):
    def __init__(self, num_classes=5):
        super(ResNet32, self).__init__()
        self.in_channels = 16

        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)

        # Layer 1
        self.layer1_0 = BasicBlock(16, 16, stride=1)
        self.layer1_1 = BasicBlock(16, 16, stride=1)
        self.layer1_2 = BasicBlock(16, 16, stride=1)
        self.layer1_3 = BasicBlock(16, 16, stride=1)
        self.layer1_4 = BasicBlock(16, 16, stride=1)

        # Layer 2
        self.layer2_0 = BasicBlock(16, 32, stride=2)
        self.layer2_1 = BasicBlock(32, 32, stride=1)
        self.layer2_2 = BasicBlock(32, 32, stride=1)
        self.layer2_3 = BasicBlock(32, 32, stride=1)
        self.layer2_4 = BasicBlock(32, 32, stride=1)

        # Layer 3
        self.layer3_0 = BasicBlock(32, 64, stride=2)
        self.layer3_1 = BasicBlock(64, 64, stride=1)
        self.layer3_2 = BasicBlock(64, 64, stride=1)
        self.layer3_3 = BasicBlock(64, 64, stride=1)
        self.layer3_4 = BasicBlock(64, 64, stride=1)


        self.fc = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        # Layer 1
        x = self.layer1_0(x)
        x = self.layer1_1(x)
        x = self.layer1_2(x)
        x = self.layer1_3(x)
        x = self.layer1_4(x)

        # Layer 2
        x = self.layer2_0(x)
        x = self.layer2_1(x)
        x = self.layer2_2(x)
        x = self.layer2_3(x)
        x = self.layer2_4(x)

        # Layer 3
        x = self.layer3_0(x)
        x = self.layer3_1(x)
        x = self.layer3_2(x)
        x = self.layer3_3(x)
        x = self.layer3_4(x)

        x = F.avg_pool2d(x, x.size()[3])
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

    def get_grad_cam_target_layer(self):
        return self.layer3_4

**CLASSIFICATION WITH SPURIOUS FEATURES**

MNIST

*   Dataset: The MNIST dataset contains 60000 training images (we split them into 48000 for training and 12000 for validation) and 10000 testing images. The dataset is composed by low resolution (28x28) greyscale images of white numbers (from 0 to 9) on a black background and this is often used for simple classification tasks.
*   Preprocessing: In order to show the ability of MaskTune to not rely on spurious features, we started with a basic example. We preprocessed our data firstly by creating two subgroups from the original 10 classes (classes from 0 to 4 are put in the new class 0, classes from 5 to 9 in the new class 1) and secondly by adding a small 4x4 blue square to some of the images (spurious feature). Moreover, in order to add the blue square, we performed a grayscale to RGB transformation on all the data and Image to Tensor transformations are also implemented across the code.
*   Architecture: The SimpleCNN model is a convolutional neural network designed to classify images into two classes. The network consists of four convolutional layers. The first two convolutional layers have 16 filters each, with a kernel size of 3x3 and a stride of 1. These layers are followed by batch normalization to stabilize and speed up training, and the ReLU activation function is applied to introduce non-linearity. After the second convolutional layer, a max-pooling operation with a 2x2 filter and a stride of 2 is used to downsample the feature maps, reducing their spatial dimensions. The third and fourth convolutional layers have 32 filters each, again with a 3x3 kernel size and a stride of 1, followed by batch normalization and ReLU activation. After the fourth convolutional layer, another 2x2 max-pooling operation with a stride of 2 is applied. The output from the final max-pooling layer is flattened into a 1D vector with 512 elements. This flattened vector is then passed through a fully connected layer with 256 units, followed by batch normalization and the ReLU activation function. Finally, the network includes a second fully connected layer that outputs logits corresponding to the two classes.
For training, the model uses the CrossEntropyLoss function, which is standard for classification tasks. The optimization is performed using the Stochastic Gradient Descent (SGD) algorithm with a learning rate of 0.01, a momentum of 0.9, and a weight decay of 1e-4 to help prevent overfitting. To further refine the learning process,the MultiStepLR scheduler is employed to decrease the learning rate by a factor of 0.5 at specific epochs (specified by the parameter milestones). The model is trained for a total of 100 epochs with a batch size of 128 for both the training and validation datasets. Throughout the training process, the model's performance on the validation set is monitored, and the best model, based on validation accuracy, is saved. At the end of the 100 epochs, a final checkpoint of the model is also saved.
*   Results: the SimpleCNN model is first evaluated using the best checkpoint on both the original and biased test sets. The ERM model achieves an accuracy of 0.9352 on the original test set, while its performance on the biased test set is slightly lower, with an accuracy of 0.8977. This discrepancy highlights the model's sensitivity to bias in the data. However, after applying the MaskTune technique, the model's performance improves significantly on both test sets. The accuracy on the original one increases to 0.9550, and the accuracy on the biased test set also rises to 0.9540. This result demonstrates that MaskTune effectively reduces the accuracy gap between the original and biased test sets, indicating enhanced robustness and fairness in the model's predictions compared to the initial results using ERM.



In [None]:
#It performs a composition of grayscale to RGB and Image to Tensor transformations to the MNIST samples.
transform = transforms.Compose([transforms.ToTensor(), GrayscaleToRGB()])

In [None]:
#downloading MNIST dataset from torchvision
mnist_trainset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
mnist_testset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

In [None]:
#dividing the 10 MNIST training set classes in 2 groups [0: 0-4 classes; 1: 5-9 classes]
class_0 = []
class_1 = []
for i in mnist_trainset:
  if(i[1] < 5):
    class_0.append(i)
  else:
    class_1.append(i)

In [None]:
#create the directory for saving the MNIST checkpoints
checkpoints_mnist = '/content/drive/My Drive/checkpoints_mnist/'
os.makedirs(checkpoints_mnist, exist_ok=True)

In [None]:
#small blue squares added to 99% of class 0
class0 = []
num_unchanged=len(class_0)//100
unchanged_indices = random.sample(range(len(class_0)), num_unchanged)
for i in range(len(class_0)):
  if(i in unchanged_indices):
    class0.append((class_0[i][0], 0))
  else:
    class0.append((bias_transform(class_0[i][0]),0))

In [None]:
#small blue squares added to 1% of class 1
class1 = []
num_changed=len(class_1)//100
changed_indices = random.sample(range(len(class_1)), num_changed)
for i in range(len(class_1)):
  if(i in changed_indices):
    class1.append((bias_transform(class_1[i][0]), 1))
  else:
    class1.append((class_1[i][0],1))

In [None]:
#creation of the full dataset
total_trainset = class0 + class1

In [None]:
#we sampled 12000 indices randomly from the total trainset and saved them on google drive for repeatability
val_set_indices=random.sample(range(len(total_trainset)),k=12000)
file_path = 'val_set_indices.pkl'
with open(file_path, 'wb') as f:
    pickle.dump(val_set_indices, f)
drive_path = '/content/drive/My Drive/val_set_indices.pkl'
shutil.copy(file_path, drive_path)
os.remove(file_path)

In [None]:
#we define the val and train sets according to the previous random sampling
drive_path = '/content/drive/My Drive/val_set_indices.pkl'
with open(drive_path, 'rb') as f:
    val_set_indices = pickle.load(f)
val_set = [total_trainset[i] for i in val_set_indices]
train_set = [total_trainset[i] for i in range(len(total_trainset)) if i not in val_set_indices]

In [None]:
#we saved on google drive the MNIST trainset
mnist_dataset = '/content/drive/My Drive/MNISTdatatrainset/'
os.makedirs(mnist_dataset, exist_ok=True)
i=0
for el in train_set:
  image=el[0]
  t=transforms.ToPILImage()
  t(image).save(mnist_dataset+str(i)+'.jpg')
  i+=1

In [None]:
#creation of a list containing the elements of the drive folder.
dir_mnist=os.listdir('/content/drive/My Drive/MNISTdatatrainset/')

In [None]:
#dividing the 10 MNIST original testing set classes in 2 groups [0: 0-4 classes; 1: 5-9 classes]
original_testset= []
for i in mnist_testset:
  if(i[1]<5):
   original_testset.append((i[0],0))
  else:
    original_testset.append((i[0],1))

In [None]:
#we added a small blue square to the test images of classes 5-9
biased_testset_initial = []
for i in mnist_testset:
  x,y = i
  if(i[1]>4):
    biased_testset_initial.append((bias_transform(x),y))
  else:
    biased_testset_initial.append((x,y))

In [None]:
#dividing the 10 MNIST biased testing set classes in 2 groups [0: 0-4 classes; 1: 5-9 classes]
biased_testset = []
for i in biased_testset_initial:
  if i[1]<5:
    biased_testset.append((i[0],0))
  else:
    biased_testset.append((i[0],1))

In [None]:
#SimpleCNN training with MNIST dataset
train_loader_MNIST = DataLoader(train_set, batch_size=128, shuffle=True, num_workers=8) #train data loader
val_loader_MNIST = DataLoader(val_set, batch_size=128, shuffle=False, num_workers=8) #val data loader

model = SimpleCNN(num_classes=2).to(device) #Model used for the training moved to Device
loss = nn.CrossEntropyLoss() #cross entropy loss
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum = 0.9, weight_decay = 1e-4) #SGD optimizer
lr_step = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[25, 50, 75, 100], gamma=0.5) #MultiStepLR learning rate scheduler
best_accuracy = -math.inf #initialization of the best accuracy

num_epochs = 100 #number of epochs
epoch = 0
for epoch in range(num_epochs):
    model.train()
    batch_count=0
    running_loss = 0.0
    for batch_idx, (inputs, labels) in enumerate(train_loader_MNIST):
        inputs, labels = inputs.to(device), labels.to(device)
        batch_count +=1

        outputs = model(inputs)
        loss_f = loss(outputs, labels)
        optimizer.zero_grad()
        loss_f.backward()
        optimizer.step()

        running_loss += loss_f.item() * inputs.size(0)

    epoch_loss = running_loss / len(train_loader_MNIST.dataset)
    val_loss, val_accuracy = validate(model, val_loader_MNIST, loss)
    lr_step.step()
    if val_accuracy > best_accuracy: #compute the best checkpoint
        best_accuracy = val_accuracy
        best_checkpoint_path = f'/content/drive/My Drive/checkpoints_mnist/best_checkpoint.pth'
        save_checkpoint(model, optimizer, epoch, val_loss, val_accuracy, best_checkpoint_path) #save the best checkpoint

    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')
    if epoch == num_epochs - 1:
        last_checkpoint_path = f'/content/drive/My Drive/checkpoints_mnist/last_checkpoint.pth'
        save_checkpoint(model, optimizer, epoch, val_loss, val_accuracy, last_checkpoint_path) # save the last checkpoint
print('Training finished.')

In [None]:
#definition of the two data loaders for the two test sets
test_loader_MNIST = DataLoader(original_testset, batch_size=128, shuffle=False, num_workers=8)
biased_test_loader_MNIST = DataLoader(biased_testset, batch_size=128, shuffle=False, num_workers=8)

In [None]:
#definition of the Model
model = SimpleCNN(num_classes=2).to(device)

In [None]:
#paths to the best and last checkpoints
best_checkpoint_mnist_path='/content/drive/My Drive/checkpoints_mnist/best_checkpoint.pth'
last_checkpoint_mnist_path='/content/drive/My Drive/checkpoints_mnist/last_checkpoint.pth'

In [None]:
#Testing ERM model (SImpleCNN) for original test set.
model.load_state_dict(torch.load(best_checkpoint_mnist_path)['model_state_dict'])#loading the model parameters saved in the best checkpoint dict
model.eval() #setting the model to evaluation mode
model.to(device) #moving the model to device

all_predictions = []
all_labels = []

with torch.no_grad(): #disabling gradient calculation
    for inputs, labels in test_loader_MNIST:
        inputs, labels = inputs.to(device), labels.to(device) #inputs and labels moved to device
        outputs = model(inputs) #applying model to inputs
        _, predicted = torch.max(outputs, 1) #predictions
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(all_labels, all_predictions) #computing accuracy

print(f'Accuracy: {accuracy:.4f}')

In [None]:
#Testing ERM model for biased test set
model.load_state_dict(torch.load(best_checkpoint_mnist_path)['model_state_dict']) #loading the model parameters saved in the best checkpoint dict
model.to(device)

all_predictions = []
all_labels = []

with torch.no_grad():
    for inputs, labels in biased_test_loader_MNIST: #now we use the biased test set
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(all_labels, all_predictions)

print(f'Accuracy: {accuracy:.4f}')

In [None]:
#loading the model parameters saved in the best checkpoint dict
model.load_state_dict(torch.load(best_checkpoint_mnist_path)['model_state_dict'])

In [None]:
#heat map generator using XGradCAM taken from the github link installed previously
heat_map_generator = XGradCAM(model=model, target_layers=[model.get_grad_cam_target_layer()])

In [None]:
#directory containing masked data
masked_data_dir_mnist = '/content/drive/My Drive/maskeddataMNIST/'
os.makedirs(masked_data_dir_mnist, exist_ok=True)

In [None]:
#we save in a list the training tensors, the directory where to save the corresponding masked data, the image path and the label.
mnist_list=[]
for i in range(len(dir_mnist)):
  mnist_list.append((train_set[i][0],masked_data_dir_mnist,mnist_dataset+str(i)+'.jpg', train_set[i][1]))

In [None]:
#directory to save the data to mask
trainset_mnist_to_mask = '/content/drive/My Drive/trainset_mnist_to_mask/'
os.makedirs(trainset_mnist_to_mask, exist_ok=True)

In [None]:
#we saved the MNIST training set with all the information needed for mask tune [tensor, directory of masked data, original_img_path, label]
torch.save(mnist_list, trainset_mnist_to_mask+'tensors_list.pth')

In [None]:
#definition of the loader that is used in the testing of MaskTune
mask_loader_MNIST = torch.utils.data.DataLoader(CustomDataset(trainset_mnist_to_mask+'tensors_list.pth'), batch_size=128, shuffle=False, num_workers=8)

In [None]:
#we perform the function MaskTune for all the data in the loader.
for data in mask_loader_MNIST:
  images, save_dir, images_pathes, targets = data[0], data[1], data[2], data[3]
  images = images.to(device) #move the tensors to device
  mask_tune(images, save_dir, images_pathes, targets)

In [None]:
dir0 ='/content/drive/My Drive/maskeddataMNIST/0/'
dir1 ='/content/drive/My Drive/maskeddataMNIST/1/'
dir0list = os.listdir(dir0)
dir1list = os.listdir(dir1)

In [None]:
#defining the image to tensor transform
transf=transforms.Compose([transforms.ToTensor()])

In [None]:
#for each element in class 0 of the masked data we apply the transformation from image to tensor
mask_data_mnist_transformed0 = []
target = 0

for el in dir0list:
  img = Image.open(dir0+el)
  img = transf(img)
  mask_data_mnist_transformed0.append((img, target)) #list with transformed images and target 0

In [None]:
#for each element in class 1 of the masked data we apply the transformation from image to tensor
mask_data_mnist_transformed1 = []
target = 1

for el in dir1list:
  img = Image.open(dir1+el)
  img = transf(img)
  mask_data_mnist_transformed1.append((img, target)) #list with transformed images and target 1

In [None]:
#union of the two lists
mask_data_mnist_transformed=mask_data_mnist_transformed0+mask_data_mnist_transformed1

In [None]:
masktuned_mnist='/content/drive/My Drive/masktuned_mnist/'
os.makedirs(masktuned_mnist, exist_ok=True)

In [None]:
torch.save(mask_data_mnist_transformed, '/content/drive/My Drive/masktuned_mnist/mask_data_mnist_transformed.pth')

In [None]:
transformed_mnist_mask_path = '/content/drive/My Drive/masktuned_mnist/mask_data_mnist_transformed.pth'
masked_loader_MNIST = torch.utils.data.DataLoader(CustomDataset(transformed_mnist_mask_path), batch_size=128, shuffle=True, num_workers=8)

In [None]:
#training the SimpleCNN for one epoch on the new masked training dataset with the same hyperparameters specified in the paper
loss = nn.CrossEntropyLoss() #definition of the loss
model = SimpleCNN(num_classes=2).to(device) #moving the model to device
model.load_state_dict(torch.load(last_checkpoint_mnist_path)['model_state_dict']) #loading the model parameters saved in the last checkpoint
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum = 0.9, weight_decay = 1e-4) #SGD optimizer
num_epochs = 1

model.train() #setting the model to training mode
batch_count = 0
running_loss = 0.0
for batch_idx, (inputs, labels) in enumerate(masked_loader_MNIST):

        inputs, labels = inputs.to(device), labels.to(device)
        batch_count +=1
        optimizer.zero_grad() #Reset the gradients of all model parameters to zero before backpropagation


        outputs = model(inputs)
        loss_f = loss(outputs, labels)
        loss_f.backward() #backpropagation
        optimizer.step() #update the model parameters

        running_loss += loss_f.item() * inputs.size(0) #adding the current loss

epoch_loss = running_loss / len(masked_loader_MNIST.dataset) #computing the average loss per epoch
print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')
if epoch == num_epochs - 1:
        last_checkpoint_path = f'/content/drive/My Drive/checkpoints_mnist/last_checkpoint_mask_mnist.pth'
        torch.save(model.state_dict(), last_checkpoint_path) #saving the last chekpoint
print('Training finished.')

Epoch 1/1, Loss: 0.0362
Training finished.


In [None]:
#path to the last mnist checkpoint
checkpoint_masktune_mnist='/content/drive/My Drive/checkpoints_mnist/last_checkpoint_mask_mnist.pth'

In [None]:
#testing of Mask tune model for original test set
model.load_state_dict(torch.load(checkpoint_masktune_mnist)) #loading the model parameters saved in the last checkpoint
model.eval() #setting the model to evaluation mode
model.to(device) #moving the model to device
all_predictions = []
all_labels = []

with torch.no_grad(): #disabling gradient calculation during testing
    for inputs, labels in test_loader_MNIST:
        inputs, labels = inputs.to(device), labels.to(device) #moving inputs and labels to device
        outputs = model(inputs) #applying the model to the inputs
        _, predicted = torch.max(outputs, 1) #predictions
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(all_labels, all_predictions)#computing the accuracy

print(f'Accuracy: {accuracy:.4f}')

In [None]:
#testing of Mask tune model for biased test set
model.load_state_dict(torch.load(checkpoint_masktune_mnist))
model.eval()
model.to(device)
all_predictions = []
all_labels = []

with torch.no_grad():
    for inputs, labels in biased_test_loader_MNIST:#now we use the biased test data loader
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(all_labels, all_predictions)

print(f'Accuracy: {accuracy:.4f}')

CELEBA

*   Dataset: The CelebA dataset is a large-scale face dataset which consists of over 200,000 celebrity images (178x218), each annotated with 40 binary attribute labels that describe various facial features and characteristics., such as gender, hair color, eyeglasses, smile, and more.  The dataset is divided into three subsets: training, validation, and testing.
*   Preprocessing: Because of RAM issues, we split the training set (162770 samples) into 17 smaller groups of 10000 samples each, and we trained our model on one of these subgroups. To keep the proportions, we used 1250 samples in the validation set and 1250 samples for test set. For our task, we dropped all the attribute labels except the hair color. This was done to reduce our problem to a binary classification task, as the authors did. In the dataset, the column 'Blond_Hair' contains the value -1 when the feature is not present and 1 otherwise, so we substituted all -1 with the value 0, in order to implement the classification task. Moreover, for the training set images are randomly resized and cropped to the target resolution (224x224), with a scale factor between 0.7 and 1.0 and an aspect ratio between 1.0 and 1.33. Additionally, the images are randomly flipped horizontally to augment the dataset. After these augmentations, the images are converted to tensors and normalized using mean and standard deviation values. For the testing and validation sets, images are first center-cropped to the minimum dimension, then resized to the target resolution. Like in the training set, they are converted to tensors and normalized. These transformations ensure that the images are prepared consistently for model input while also introducing variability in the training phase to improve generalization.
*   Architecture: We use a pre-trained ResNet-50 model and we fine-tune it for our specific task by adjusting it to have two output classes. For training, the model is optimized using stochastic gradient descent (SGD) with a learning rate of 1e-4, momentum of 0.9, and weight decay of 1e-4 to help prevent overfitting. The training loop is run for 5 epochs. During each epoch, the model is put into training mode, and it iterates over the training dataset in batches. The outputs of the model are then compared with the true labels using the CrossEntropyLoss function, which computes the loss. The loss is backpropagated, scaled appropriately using the GradScaler to prevent issues related to small gradient values, and the model's weights are updated using the optimizer. After each epoch, the average training loss is calculated, and the model's performance is evaluated on a validation set. If the accuracy on the validation set improves over previous epochs, the model is saved as the best checkpoint. At the end of the training process the final model checkpoint is saved as well.
*   Results: the ERM model achieves on the test set an accuracy of 0.88, but its F1 score is low (0.0506), indicating poor balance between precision and recall. The precision is 1.0000, suggesting the model makes very few false positives, but the recall is extremely low at 0.0260, indicating that the model misses many true positives. The ROC AUC score is 0.5129 and the confusion matrix shows that the model correctly classifies 1096 negatives but only 4 positives, with 150 positives misclassified as negatives. In contrast, the MaskTune model shows improved overall performance. The accuracy increases to 0.8960, and the F1 score improves significantly to 0.3011, reflecting a better trade-off between precision and recall. Precision drops slightly to 0.8750, but recall improves to 0.1818, indicating that the model is better at identifying true positives compared to the ERM model. The ROC AUC score also increases to 0.5891, suggesting better discrimination between classes. The confusion matrix for MaskTune shows that the model correctly classified 1092 negatives and 28 positives, with 126 positives incorrectly classified as negatives. Overall, MaskTune demonstrates better balanced performance, particularly in terms of recall and F1 score, compared to the ERM model.

In [None]:
#We downloaded on google drive the CELEBA dataset from the link provided by the paper authors
!unzip "/content/drive/My Drive/archive.zip" -d "/content/data/"

In [None]:
#reading the csv containing the unzipped dataset and take only the selected columns
dataset_celeba = pd.read_csv('/content/data/list_attr_celeba.csv', usecols = ['image_id','Blond_Hair'])
split = pd.read_csv('/content/data/list_eval_partition.csv')

In [None]:
#Given the fact that the training data in the dataset are the first 162770, we dropped all the validation and test data.
#Then, we used Chat GPT in order to divide randomly the 162770 into 17 smaller subsets (10000 samples each except the last one) because of time and RAM issues.
#The rest of the code refers to the first subset of training data (dataset_celeba0)
dropfromindx= 162770
dataset_celeba_train=dataset_celeba.drop(dataset_celeba.index[dropfromindx:])
num_samples = dataset_celeba_train.shape[0] // 10000
sample_size = 10000

for i in range(num_samples):
    if len(dataset_celeba_train) < sample_size:
        break
    sampled_df = dataset_celeba_train.sample(n=sample_size, random_state=42+i)
    dataset_celeba_train = dataset_celeba_train.drop(sampled_df.index)

    sampled_df.to_csv(f'/content/drive/My Drive/sampled_data_celeba/sampled_data_{i}.csv')

if not dataset_celeba_train.empty:
    dataset_celeba_train.to_csv(f'/content/drive/My Drive/sampled_data_celeba/sampled_data_{num_samples}.csv')

dataset_celeba0 = pd.read_csv('/content/drive/My Drive/sampled_data_celeba/sampled_data_0.csv')

In [None]:
#the csv containing the original dataset has a 'partition' column which splits the data into train, validation and test according to 0, 1, 2.
#Moreover, the column 'Blond_Hair' contains the value -1 when the feature is not present. We substituted it with the value 0
valset_celeba = []
testset_celeba = []
slideindex=162770


for i in dataset_celeba.values[slideindex:]:
  ind=dataset_celeba.index[slideindex]
  splitcl=split['partition'][ind]
  if (splitcl==1):
    if i[1]==1:
     valset_celeba.append([i[0],i[1]])
    else:
      valset_celeba.append([i[0],0])
  elif (splitcl==2):
    if i[1]==1:
     testset_celeba.append([i[0],i[1]])
    else:
      testset_celeba.append([i[0],0])
  slideindex +=1

In [None]:
#we did the same substitution [-1 to 0] for the training set
trainset_celeba = []

for i in dataset_celeba0.values:
  ind=i[0]
  splitcl=split['partition'][ind]
  if(splitcl==0):
    if i[2]==1:
     trainset_celeba.append([i[1],i[2]])
    else:
      trainset_celeba.append([i[1],0])

In [None]:
#paths to folders where we save respectively the transformed training set, masked set, validation set and test set
dataset_celeba0_transformed_train = '/content/drive/My Drive/dataset_celeba0_transformed_train/'
os.makedirs(dataset_celeba0_transformed_train, exist_ok=True)
dataset_celeba0_5epochs_transformed_mask='/content/drive/My Drive/dataset_celeba0_5epochs_transformed_mask/'
os.makedirs(dataset_celeba0_5epochs_transformed_mask, exist_ok=True)
valset_celeba_transformed = '/content/drive/My Drive/valset_celeba_transformed/'
os.makedirs(valset_celeba_transformed, exist_ok=True)
testset_celeba_transformed = '/content/drive/My Drive/testset_celeba_transformed/'
os.makedirs(testset_celeba_transformed, exist_ok=True)

In [None]:
#we used the same transformations as the authors
orig_min_dim = (178,218) #original image resolution
target_resolution = (224, 224) #target resolution


t_train = transforms.Compose([ #data augmentation
            transforms.RandomResizedCrop(
                target_resolution,
                scale=(0.7, 1.0),
                ratio=(1.0, 1.3333333333333333),
                interpolation=2),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(), #image to tensor
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) #normalization
        ])

t_test = transforms.Compose([
                transforms.CenterCrop(orig_min_dim),
                transforms.Resize(target_resolution),
                transforms.ToTensor(), #image to tensor
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ])

In [None]:
#applying transformations to train set
dir = '/content/data/img_align_celeba/img_align_celeba/'
train = []
for el in trainset_celeba:
  img = Image.open(dir+el[0])
  img = t_train(img)
  train.append((img, el[1]))

#saving the transformed tensors
torch.save(train, '/content/drive/My Drive/dataset_celeba0_transformed_train/tensors_list.pth')

In [None]:
#we sampled randomly 1250 samples from the total validation set and saved them in a csv file on google drive (Chat GPT)
valset_celeba_restr=random.sample(valset_celeba, k=1250)

image_ids_to_keep = set(item[0] for item in valset_celeba_restr) # Create a set of image IDs from the 'valset_celeba_restr' variable
filtered_df = split[split['image_id'].isin(image_ids_to_keep)] # Filter the 'split' DataFrame to include only rows where the 'image_id' is in the set of image IDs
output_path_filtered = '/content/drive/MyDrive/valset_celeba_restr.csv' # Define the output path where the filtered DataFrame will be saved as a CSV file
filtered_df.to_csv(output_path_filtered, index=False) # Save the filtered DataFrame to the specified output path without including the DataFrame index in the CSV file

filtered_val_df = pd.read_csv('/content/drive/MyDrive/valset_celeba_restr.csv')
valset_celeba_restr=[]
for i in filtered_val_df.values:
  for j in valset_celeba:
   if i[0]== j[0]: # Check if the 'image_id' in the filtered DataFrame matches the 'image_id' in the original DataFrame
    valset_celeba_restr.append(j) # Append the corresponding row from the original DataFrame to the new list

In [None]:
#applying transformations to validation set
dir = '/content/data/img_align_celeba/img_align_celeba/'
val = []
for el in valset_celeba_restr:
  img = Image.open(dir+el[0])
  img = t_test(img)
  val.append((img, el[1]))

#saving the transformed tensors
torch.save(val, '/content/drive/My Drive/valset_celeba_transformed/tensors_list.pth')

In [None]:
#we sampled randomly 1250 samples from the total test set and saved them in a csv file on google drive (Chat GPT)
testset_celeba_restr=random.sample(testset_celeba, k=1250)

image_ids_to_keep = set(item[0] for item in testset_celeba_restr)# Create a set of image IDs from the 'testset_celeba_restr' variable
filtered_df = split[split['image_id'].isin(image_ids_to_keep)]  # Filter the 'split' DataFrame to include only rows where the 'image_id' is in the set of image IDs
output_path_filtered = '/content/drive/MyDrive/testset_celeba_restr.csv' # Define the output path where the filtered DataFrame will be saved as a CSV file
filtered_df.to_csv(output_path_filtered, index=False) # Save the filtered DataFrame to the specified output path without including the DataFrame index in the CSV file

filtered_test_df = pd.read_csv('/content/drive/MyDrive/testset_celeba_restr.csv')
testset_celeba_restr=[]

for i in filtered_test_df.values:
  for j in testset_celeba:
   if i[0]== j[0]:# Check if the 'image_id' in the filtered DataFrame matches the 'image_id' in the original DataFrame
    testset_celeba_restr.append(j) # Append the corresponding row from the original DataFrame to the new list

In [None]:
#applying transformations to test set
dir = '/content/data/img_align_celeba/img_align_celeba/'
test = []
for el in testset_celeba_restr:
  img = Image.open(dir+el[0])
  img = t_test(img)
  test.append((img, el[1]))

#saving the transformed tensors
torch.save(test, '/content/drive/My Drive/testset_celeba_transformed/tensors_list.pth')

In [None]:
#we created a pretrained resnet50 model (as the authors did) and moved it to device
resnet50 = timm.create_model('resnet50', pretrained = True, num_classes = 2)
resnet50 = resnet50.to(device)

In [None]:
#paths to transformed train, validation and test tensors
transformed_data0_path = '/content/drive/My Drive/dataset_celeba0_transformed_train/tensors_list.pth'
train_loader = torch.utils.data.DataLoader(CustomDataset(transformed_data0_path), batch_size=8, shuffle=True, num_workers=2)
transformed_valset_path = '/content/drive/My Drive/valset_celeba_transformed/tensors_list.pth'
val_loader = torch.utils.data.DataLoader(CustomDataset(transformed_valset_path), batch_size=8, shuffle=False, num_workers=2)
transformed_testset_path = '/content/drive/My Drive/testset_celeba_transformed/tensors_list.pth'
test_loader = torch.utils.data.DataLoader(CustomDataset(transformed_testset_path), batch_size=8, shuffle=False, num_workers=2)

In [None]:
#creation of the train, validation and test data loaders with batch size 8 and shuffle parameter true for the train loader and false for the other two
train_loader = DataLoader(train, batch_size=8, shuffle=True, num_workers=2)
val_loader = DataLoader(val, batch_size=8, shuffle=False, num_workers=2)
test_loader = DataLoader(test, batch_size=8, shuffle=False, num_workers=2)

In [None]:
#directory to checkpoints
checkpoint_dir = '/content/drive/My Drive/checkpoints'
os.makedirs(checkpoint_dir, exist_ok=True)

In [None]:
#paths to the best and last checkpoints
checkpoint_path_last = '/content/drive/MyDrive/checkpoints/last_checkpoint0_5epochs.pth'
checkpoint_path_best = '/content/drive/MyDrive/checkpoints/best_checkpoint0_5epochs.pth'

In [None]:
#Because of time and RAM issues on Colab which forced us to use only 10000 training samples, we trained the model only for 5 epochs instead of 20
scaler = GradScaler() # Initialize the gradient scaler (helps with faster computations)
loss = nn.CrossEntropyLoss() #loss function
optimizer = optim.SGD(resnet50.parameters(), lr=1e-4, momentum = 0.9, weight_decay = 1e-4) #SGD optimizer

num_epochs = 5
best_accuracy = 0.0 #initialize the best accuracy tracker

for epoch in range(num_epochs):

 resnet50.train() # Set the model to training mode
 batch_count = 0
 running_loss = 0.0
 for batch_idx, (inputs, labels) in enumerate(train_loader):

        inputs, labels = inputs.to(device), labels.to(device)  # Move the inputs and labels to device
        batch_count += 1
        optimizer.zero_grad() #Reset the gradients of all model parameters to zero before backpropagation

        with autocast(enabled=torch.cuda.is_available()): # Use automatic mixed precision if CUDA is available for faster computations and memory savings
         outputs = resnet50(inputs) # Forward pass: compute model predictions
         loss_f = loss(outputs, labels) # Compute the loss using the predictions and true labels
        scaler.scale(loss_f).backward() # Backward pass: scale the loss and compute gradients
        scaler.step(optimizer) # optimizer step
        scaler.update()
        running_loss += loss_f.item() * inputs.size(0)

 epoch_loss = running_loss / len(train_loader.dataset) # Calculate the average loss for the epoch
 val_loss, val_accuracy = validate(resnet50, val_loader, loss) # Validate the model on the validation set and obtain validation loss and accuracy

# Save the model checkpoint if the validation accuracy improves
 if val_accuracy > best_accuracy:
        best_accuracy = val_accuracy
        best_checkpoint_path = f'/content/drive/My Drive/checkpoints/best_checkpoint0_5epochs.pth'
        save_checkpoint(resnet50, optimizer, epoch, val_loss, val_accuracy, best_checkpoint_path)

 print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')
 # Save the last checkpoint after the final epoch
 if epoch == num_epochs - 1:
        last_checkpoint_path = f'/content/drive/My Drive/checkpoints/last_checkpoint0_5epochs.pth'
        save_checkpoint(resnet50, optimizer, epoch, val_loss, val_accuracy, last_checkpoint_path)
print('Training finished.')

In [None]:
#testing ERM model on CELEBA test set
resnet50.load_state_dict(torch.load(checkpoint_path_best )['model_state_dict']) #loading the model parameters saved in the best checkpoint
resnet50.eval() #setting the model to evaluation mode
all_predictions = []
all_labels = []

with torch.no_grad(): #disabling gradient calculation during testing
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)  #moving inputs and labels to device
        outputs = resnet50(inputs) #applying the model to the inputs
        _, predicted = torch.max(outputs, 1) #predictions
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

#computing evaluation metrics
accuracy = accuracy_score(all_labels, all_predictions)
f1 = f1_score(all_labels, all_predictions)
precision = precision_score(all_labels, all_predictions)
recall = recall_score(all_labels, all_predictions)
conf_matrix = confusion_matrix(all_labels, all_predictions)
try:
  roc_auc = roc_auc_score(all_labels, all_predictions)
except ValueError:
  roc_auc = "Not applicable for multiclass"

print(f'Accuracy: {accuracy:.4f}')
print(f'F1 Score: {f1:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'ROC AUC Score: {roc_auc}')
print('Confusion Matrix:')
print(conf_matrix)

Accuracy: 0.8800
F1 Score: 0.0506
Precision: 1.0000
Recall: 0.0260
ROC AUC Score: 0.512987012987013
Confusion Matrix:
[[1096    0]
 [ 150    4]]


In [None]:
#directory to masked data
masked_data_dir0_5epochs = '/content/drive/My Drive/masked_data0_5epochs/'
os.makedirs(masked_data_dir0_5epochs, exist_ok=True)

In [None]:
#loading best checkpoint
resnet50.load_state_dict(torch.load(checkpoint_path_best, map_location=device)['model_state_dict'])

In [None]:
#definition of XGradCAM heat map generator to the last layer
heat_map_generator = XGradCAM(model=resnet50, target_layers=[resnet50.layer4[-1]])

In [None]:
#applying the correct (the ones needed by mask tune) transformations to the train set and saving data in a list
dir = '/content/data/img_align_celeba/img_align_celeba/'
mask_set = []
for el in trainset_celeba:
  img_path = dir+el[0]
  target = el[1]
  img = Image.open(img_path)
  img = t_test(img)
  mask_set.append((img, masked_data_dir0_5epochs, img_path, target))

In [None]:
#saving the mask set list to the specified directory
trainset_celeba_to_mask = '/content/drive/My Drive/trainset_celeba_to_mask/'
os.makedirs(trainset_celeba_to_mask, exist_ok=True)
torch.save(mask_set, trainset_celeba_to_mask +'tensors_list.pth')

In [None]:
#creation of the mask data loader
mask_loader = torch.utils.data.DataLoader(CustomDataset(trainset_celeba_to_mask+'tensors_list.pth'), batch_size=8, shuffle=False, num_workers=2)

In [None]:
#applying MaskTune to the data in mask_loader
for data in mask_loader:
  images, save_dir, images_pathes, targets = data[0], data[1], data[2], data[3]
  images = images.to(device)
  mask_tune(images, save_dir, images_pathes, targets)

In [None]:
#definition of the directories and corresponding lists where to save masked data of classes 0 and 1
dir0 ='/content/drive/My Drive/masked_data0_5epochs/0/'
dir1 ='/content/drive/My Drive/masked_data0_5epochs/1/'
dir0list = os.listdir(dir0)
dir1list = os.listdir(dir1)

In [None]:
#for each element in class 0 of the masked data we apply the transformation taken from the paper
mask_data_transformed0 = []

for el in dir0list:
  target = 0
  img = Image.open(dir0+el)
  img = t_test(img)
  mask_data_transformed0.append((img, target))

In [None]:
#for each element in class 1 of the masked data we apply the transformation taken from the paper
mask_data_transformed1 = []

for el in dir1list:
  target = 1
  img = Image.open(dir1+el)
  img = t_test(img)
  mask_data_transformed1.append((img, target))

In [None]:
#summing the two lists containing the masked data of the two classes and saving the resulting list
mask_data_transformed=mask_data_transformed0+mask_data_transformed1
torch.save(mask_data_transformed, dataset_celeba0_5epochs_transformed_mask+'tensors_list.pth')

In [None]:
#definition of the mask tranformed data loader
transformed_data0_5epochs_mask_path = dataset_celeba0_5epochs_transformed_mask+'tensors_list.pth'
mask_transformed_loader = torch.utils.data.DataLoader(CustomDataset(transformed_data0_5epochs_mask_path), batch_size=8, shuffle=True, num_workers=2)

In [None]:
#training the Mask tune model on the new masked training dataset
scaler = GradScaler() # Initialize the gradient scaler (helps with faster computations)
loss = nn.CrossEntropyLoss() #loss
resnet50.load_state_dict(torch.load(checkpoint_path_last)['model_state_dict']) #loading the last checkpoint
optimizer = optim.SGD(resnet50.parameters(), lr=1e-4, momentum = 0.9, weight_decay = 1e-4) #SGD optimizer

num_epochs = 1
best_accuracy = 0.0

resnet50.train() # Set the model to training mode
batch_count = 0
running_loss = 0.0
for batch_idx, (inputs, labels) in enumerate(mask_transformed_loader):

        inputs, labels = inputs.to(device), labels.to(device) # Move the inputs and labels to device
        batch_count += 1
        optimizer.zero_grad() #Reset the gradients of all model parameters to zero before backpropagation

        with autocast(enabled=torch.cuda.is_available()): # Use automatic mixed precision if CUDA is available for faster computations and memory savings
         outputs = resnet50(inputs) # Forward pass: compute model predictions
         loss_f = loss(outputs, labels) # Compute the loss using the predictions and true labels
        scaler.scale(loss_f).backward() # Backward pass: scale the loss and compute gradients
        scaler.step(optimizer)  # optimizer step
        scaler.update()
        running_loss += loss_f.item() * inputs.size(0)

epoch_loss = running_loss / len(mask_transformed_loader.dataset) # Calculate the average loss for the epoch
print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')
if epoch == num_epochs - 1:
        last_checkpoint_path = f'/content/drive/My Drive/checkpoints/last_checkpoint_mask0_5epochs.pth'
        torch.save(resnet50.state_dict(), last_checkpoint_path) # Save the last checkpoint after the final epoch
print('Training finished.')

Epoch 1/1, Loss: 0.3151
Training finished.


In [None]:
#path to the last chekpoint
checkpoint_masktune='/content/drive/My Drive/checkpoints/last_checkpoint_mask0_5epochs.pth'

In [None]:
#testing mask tune
resnet50.load_state_dict(torch.load(checkpoint_masktune)) #loading the last checkpoint
resnet50.eval() #setting the model to evaluation mode
all_predictions = []
all_labels = []

with torch.no_grad():  #disabling gradient calculation during testing
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)  #moving inputs and labels to device
        outputs = resnet50(inputs)  #applying the model to the inputs
        _, predicted = torch.max(outputs, 1) #predictions
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

#computing evaluation metrics
accuracy = accuracy_score(all_labels, all_predictions)
f1 = f1_score(all_labels, all_predictions)
precision = precision_score(all_labels, all_predictions)
recall = recall_score(all_labels, all_predictions)
conf_matrix = confusion_matrix(all_labels, all_predictions)
try:
  roc_auc = roc_auc_score(all_labels, all_predictions)
except ValueError:
  roc_auc = "Not applicable for multiclass"

print(f'Accuracy: {accuracy:.4f}')
print(f'F1 Score: {f1:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'ROC AUC Score: {roc_auc}')
print('Confusion Matrix:')
print(conf_matrix)

Accuracy: 0.8960
F1 Score: 0.3011
Precision: 0.8750
Recall: 0.1818
ROC AUC Score: 0.5890842733908428
Confusion Matrix:
[[1092    4]
 [ 126   28]]


**SELECTIVE CLASSIFICATION**

CIFAR-10

*   Dataset: The CIFAR-10 dataset consists of 60000 labeled color images, each with a resolution of 32x32 pixels. These images are evenly distributed across 10 different classes, with 6000 images per class. The classes represent everyday objects and animals, including airplanes, automobiles, birds, cats, deer, dogs, frogs, horses, ships, and trucks. The dataset is divided into a training set and a test set, with 50000 images allocated for training and 10000 images for testing.
*   Preprocessing: Because of RAM issues, we use only the first 5 classes. We split the training data into 20000 samples for the training set and 5000 for the validation set, and take the first 5 classess of the testing set (5000 samples). For the training set, we perform some transformations which include a random horizontal flip, which helps augment the dataset by mirroring images, thereby increasing variability and reducing overfitting; a random affine transformation, allowing for slight rotations of up to 15 degrees and translations of up to 10% in both the horizontal and vertical directions (this further enhances the model's robustness by providing it with slightly altered versions of the original images); all transformed images are then converted to tensors. In contrast, on the testing and validation sets we apply a simpler transformation that only converts images to tensors without any augmentation. This ensures that the model is evaluated on the original image data, providing a fair assessment of its performance.
*   Architecture: The ResNet32 architecture consists of an initial convolutional layer followed by three layers of 5 Basic Blocks each (residual blocks). Each Basic Block contains two convolutional layers with batch normalization and ReLU activation functions. The downsampling is managed through a combination of striding in the convolutions and a LambdaLayer for adjusting the dimensions of the input when necessary. In terms of the training process, the model is trained for 50 epochs using stochastic gradient descent (SGD) as the optimizer, with a learning rate of 0.1, momentum set to 0.9, and a weight decay of 1e-4 to mitigate overfitting. A multi-step learning rate scheduler is employed, reducing the learning rate by half at the 25th and 50th epochs to enhance convergence. During training, the cross-entropy loss function is used to measure the difference between the predicted and actual labels. At the end of each epoch, the model’s performance is evaluated on a validation set, where both the validation loss and accuracy are computed. The best accuracy achieved during training is tracked, and checkpoints of the model are saved for the best-performing configuration. Additionally, the final probabilities from the validation set are saved for further analysis (selective classification), including computing a threshold to discard the lowest 10% of predicted probabilities.
*   Results: we evaluate the performance of the ERM and MaskTune models under both selective classification and non-selective classification conditions. First, the ERM model achieves an accuracy of 0.8032, with an F1 Score of 0.8022, precision of 0.8026, and recall of 0.8032. The confusion matrix reveals that the model performs relatively well, particularly for classes 0, 1, and 4, but struggles with classes 2 and 3. When applying selective classification, the model demonstrates a significant performance improvement, achieving an accuracy of 0.8665, an F1 Score of 0.8639, precision of 0.8649, and recall of 0.8638. The confusion matrix indicates that the selective classification approach helps reducing misclassifications across various classes, particularly in classes 2 and 3, when compared to the non-selective scenario. Subsequently, we test the MaskTune model without selective classification, which reaches an accuracy of 0.8104, with an F1 Score of 0.8071, precision of 0.8100, and recall of 0.8104. The confusion matrix indicates that while the model maintains good performance, it still exhibits some challenges in classifying certain instances, especially in classes 2 and 3. In the final evaluation of the MaskTune model with selective classification, the accuracy improves to 0.9159, with an F1 Score of 0.9129, precision of 0.9134, and recall of 0.9126. The confusion matrix shows a substantial reduction in misclassifications across all classes, highlighting the effectiveness of combining mask tuning with selective classification. Overall, the results demonstrate that both selective classification and mask tuning significantly enhance the model's performance on this dataset.

In [None]:
#defining the needed transformations
t_train=transforms.Compose([ #data augmentation
    transforms.RandomHorizontalFlip(),
    transforms.RandomAffine(degrees=15, translate=(0.1, 0.1)),
    transforms.ToTensor(), #image to tensor
    ])

t_test=transforms.Compose([transforms.ToTensor()]) #image to tensor

In [None]:
#loading the CIFAR10 dataset from torchvision according to the train and test split (validation set will be extracted from the train set but with a different transformation)
total_trainset_CIFAR10= CIFAR10(root='./data', train=True, download=True, transform=t_train)
total_valset_CIFAR10= CIFAR10(root='./data', train=True, download=True, transform=t_test)
testset_CIFAR10= CIFAR10(root='./data', train=False, download=True, transform=t_test)

In [None]:
#we use only 5 classes because of time and RAM issues and extract them with the filter_classes function
trainset_CIFAR10_subset = filter_classes(total_trainset_CIFAR10)
valset_CIFAR10_subset = filter_classes(total_valset_CIFAR10)
testset_CIFAR10_subset = filter_classes(testset_CIFAR10)

#converting the subsets to lists
trainset_CIFAR10_subset=list(trainset_CIFAR10_subset)
valset_CIFAR10_subset=list(valset_CIFAR10_subset)
testset_CIFAR10_subset=list(testset_CIFAR10_subset)

In [None]:
#saving the list containg the new test set
testset_CIFAR10_path = '/content/drive/My Drive/testset_CIFAR10_subset/'
os.makedirs(testset_CIFAR10_path, exist_ok=True)
torch.save(testset_CIFAR10_subset, testset_CIFAR10_path + 'tensors_list.pth')

In [None]:
#we take 5000 random samples from the training set and save the indices on google drive in order to retrieve always the same images to create the validation set
valset_CIFAR10_indices=random.sample(range(len(trainset_CIFAR10_subset)), 5000)
valset_CIFAR10_indices_path='/content/drive/MyDrive/valset_CIFAR10_subset_indices.pkl' # Define the path to the file where the CIFAR10 indices were previously saved.
with open(valset_CIFAR10_indices_path, 'wb') as f:
    pickle.dump(valset_CIFAR10_indices, f) #serialize the 'valset_CIFAR10_indices' object and save it to the file 'f'

with open(valset_CIFAR10_indices_path, 'rb') as f:
    valset_CIFAR10_indices = pickle.load(f) #deserialize the data from the file 'f'

In [None]:
#we dropped the images corresponding to the validation indices to obtain the train set
trainset_CIFAR10 = [trainset_CIFAR10_subset[i] for i in range(len(trainset_CIFAR10_subset)) if i not in valset_CIFAR10_indices]

In [None]:
#saving the list containing the train tensors
trainset_CIFAR10_path = '/content/drive/My Drive/trainset_CIFAR10_subset/'
os.makedirs(trainset_CIFAR10_path, exist_ok=True)
torch.save(trainset_CIFAR10, trainset_CIFAR10_path + 'tensors_list.pth')

In [None]:
#selecting the elements of the valset subset corresponding to the indices defined before
valset_CIFAR10= [valset_CIFAR10_subset[i] for i in valset_CIFAR10_indices]

In [None]:
#saving the list containing the validation tensors
valset_CIFAR10_path = '/content/drive/My Drive/valset_CIFAR10_subset/'
os.makedirs(valset_CIFAR10_path, exist_ok=True)
torch.save(valset_CIFAR10, valset_CIFAR10_path + 'tensors_list.pth')

In [None]:
#we did the same thing as for the training set but we used the original set transformed with t_test
trainset_CIFAR10_to_mask=[valset_CIFAR10_subset[i] for i in range(len(valset_CIFAR10_subset)) if i not in valset_CIFAR10_indices]

In [None]:
#saving the list containing the data to mask tensors
trainset_CIFAR10_to_mask_path = '/content/drive/My Drive/trainset_CIFAR10_to_mask_subset/'
os.makedirs(trainset_CIFAR10_to_mask_path, exist_ok=True)
torch.save(trainset_CIFAR10_to_mask, trainset_CIFAR10_to_mask_path + 'tensors_list.pth')

In [None]:
#defining the train, validatation, test and data to mask data loaders
train_loader_CIFAR10=torch.utils.data.DataLoader(CustomDataset(trainset_CIFAR10_path+'tensors_list.pth'), batch_size=128, shuffle=True, num_workers=8)
val_loader_CIFAR10=torch.utils.data.DataLoader(CustomDataset(valset_CIFAR10_path+'tensors_list.pth'), batch_size=128, shuffle=False, num_workers=8)
test_loader_CIFAR10=torch.utils.data.DataLoader(CustomDataset(testset_CIFAR10_path+'tensors_list.pth'), batch_size=128, shuffle=False, num_workers=8)
train_loader_to_mask_CIFAR10=torch.utils.data.DataLoader(CustomDataset(trainset_CIFAR10_to_mask_path+'tensors_list.pth'), batch_size=128, shuffle=False, num_workers=8)

In [None]:
#directories to save chekpoints and probabilities of the selected class
checkpoints_CIFAR10_path = '/content/drive/My Drive/checkpoints_CIFAR10/'
os.makedirs(checkpoints_CIFAR10_path, exist_ok=True)
probabilities_CIFAR10_path = '/content/drive/My Drive/probabilities_CIFAR10/'
os.makedirs(probabilities_CIFAR10_path, exist_ok=True)

In [None]:
#training ERM model with the CIFAR10 training set for 50 epoch instead of 300
model = ResNet32(num_classes=5).to(device) #move the model to device
loss = nn.CrossEntropyLoss() #loss
optimizer = optim.SGD(model.parameters(), lr=0.1, momentum = 0.9, weight_decay = 1e-4) #SGD optimizer
lr_step = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[25, 50], gamma=0.5) #learning rate scheduler
best_accuracy = -math.inf #initialize the best accuracy tracker

num_epochs=50
final_selected_probs = None

for epoch in range(num_epochs):
    model.train() #setting the model to training mode
    running_loss = 0.0
    for inputs, labels in train_loader_CIFAR10:
        inputs, labels = inputs.to(device), labels.to(device) #moving inputs and labels to device
        optimizer.zero_grad() #reset the gradients to 0 before backpropagation
        outputs = model(inputs) #applying the model to the inputs
        loss_f = loss(outputs, labels)
        loss_f.backward()
        optimizer.step()

        running_loss += loss_f.item() * inputs.size(0)

    epoch_loss = running_loss / len(train_loader_CIFAR10.dataset) #calculate the average loss for the epoch
    val_loss, val_accuracy, val_probs = validate_selective_classification(model, val_loader_CIFAR10, loss) #validation loss, validation accuracy and probabilities of the selected class
    if epoch == num_epochs - 1: #we use the probabilities of the last epoch
          final_selected_probs = val_probs

    print(f'Epoch {epoch + 1}, Validation Loss: {val_loss:.3f}, Validation Accuracy: {val_accuracy:.2f}%')
    lr_step.step()

    #save the model checkpoint if the validation accuracy improves
    if val_accuracy > best_accuracy:
        best_accuracy = val_accuracy
        best_checkpoint_path = f'/content/drive/My Drive/checkpoints_CIFAR10/best_checkpoint_5classes50epochs.pth'
        save_checkpoint(model, optimizer, epoch, val_loss, val_accuracy, best_checkpoint_path)

    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')
    #save the last checkpoint
    if epoch == num_epochs - 1:
        last_checkpoint_path = f'/content/drive/My Drive/checkpoints_CIFAR10/last_checkpoint_5classes50epochs.pth'
        save_checkpoint(model, optimizer, epoch, val_loss, val_accuracy, last_checkpoint_path)

#saving the probabilities of the selected class to a csv file
if final_selected_probs is not None:
        df = pd.DataFrame(final_selected_probs, columns=['Selected_Prob'])
        output_path = os.path.join(probabilities_CIFAR10_path, 'final_probabilities_5classes50epochs.csv')
        df.to_csv(output_path, index=False)
        print(f'Final epoch probabilities saved to: {output_path}')

#computing the threshold to drop the 10% with the lowest probabilities
sorted_probs = np.sort(final_selected_probs)
threshold = sorted_probs[int(0.1 * len(sorted_probs))]
print(f'Calculated threshold: {threshold}')

In [None]:
model = ResNet32(num_classes=5).to(device) #moving the model to device

In [None]:
#paths to last and best checkpoints
last_checkpoint_path = '/content/drive/My Drive/checkpoints_CIFAR10/last_checkpoint_5classes50epochs.pth'
best_checkpoint_path = '/content/drive/My Drive/checkpoints_CIFAR10/best_checkpoint_5classes50epochs.pth'

In [None]:
#reading the csv probabilities file, selecting the column containing probabilities and flattening it to a list
probabilities_df = pd.read_csv('/content/drive/My Drive/probabilities_CIFAR10/final_probabilities_5classes50epochs.csv')
probabilities = probabilities_df['Selected_Prob'].values.flatten()
#sorting the list
sorted=np.sort(probabilities)
threshold = sorted[int(0.1 * len(sorted))] #computing the threshold to drop the 10% with the lowest probabilities

In [None]:
# testing ERM model
model.load_state_dict(torch.load(best_checkpoint_path )['model_state_dict']) #loading the best checkpoint
model.eval() #setting the model to evaluation mode

all_predictions = []
all_labels = []


with torch.no_grad(): #disabling gradient calculation during testing
    for inputs, labels in test_loader_CIFAR10:
        inputs, labels = inputs.to(device), labels.to(device) #moving inputs and labels to device

        outputs = model(inputs) #applying the model to the inputs
        _, predicted = torch.max(outputs, 1) #predictions
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

#computing evaluation metrics
accuracy = accuracy_score(all_labels, all_predictions)
f1 = f1_score(all_labels, all_predictions, average='macro')
precision = precision_score(all_labels, all_predictions, average='macro')
recall = recall_score(all_labels, all_predictions, average='macro')
conf_matrix = confusion_matrix(all_labels, all_predictions)
try:
  roc_auc = roc_auc_score(all_labels, all_predictions)
except ValueError:
  roc_auc = "Not applicable for multiclass"


print(f'Accuracy: {accuracy:.4f}')
print(f'F1 Score: {f1:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'ROC AUC Score: {roc_auc}')
print('Confusion Matrix:')
print(conf_matrix)

Accuracy: 0.8032
F1 Score: 0.8022
Precision: 0.8026
Recall: 0.8032
ROC AUC Score: Not applicable for multiclass
Confusion Matrix:
[[868  39  56  23  14]
 [ 21 956   7  13   3]
 [ 75   8 730  87 100]
 [ 49  20 144 701  86]
 [ 43   7 109  80 761]]


In [None]:
# testing ERM model dropping 10% of samples
model.load_state_dict(torch.load(best_checkpoint_path )['model_state_dict']) #loading the best checkpoint
model.eval() #setting the model to evaluation mode
all_predictions = []
all_labels = []


with torch.no_grad(): #disabling gradient calculation during testing
    for inputs, labels in test_loader_CIFAR10:
        inputs, labels = inputs.to(device), labels.to(device) #moving inputs and labels to device

        outputs = model(inputs) #applying the model to the inputs
        probs = F.softmax(outputs, dim=1)  #computing the probabilities
        selected_probs = probs.gather(1, labels.view(-1, 1)).cpu().numpy() #extract the predicted probability for the true class for each sample.
        selected_probs=np.array(selected_probs).flatten() # Convertion to a 1D NumPy array and flattening
        _, predicted = torch.max(probs, 1) #predictions


        mask = selected_probs > threshold # Create a boolean mask where 'selected_probs' are greater than the specified 'threshold'
        filtered_predictions = predicted[mask] # Filter the 'predicted' array using the mask to keep only the predictions where the condition is true
        filtered_labels = labels[mask] # Filter the 'labels' array using the same mask to keep only the corresponding labels

        print(f'Batch size: {inputs.size(0)}')
        print(f'Mask shape: {mask.shape}, Number of filtered predictions: {filtered_predictions.size(0)}')

        all_predictions.extend(filtered_predictions.cpu().numpy())
        all_labels.extend(filtered_labels.cpu().numpy())

#computing the evaluation metrics
accuracy = accuracy_score(all_labels, all_predictions)
f1 = f1_score(all_labels, all_predictions, average='macro')
precision = precision_score(all_labels, all_predictions, average='macro')
recall = recall_score(all_labels, all_predictions, average='macro')
conf_matrix = confusion_matrix(all_labels, all_predictions)
try:
  roc_auc = roc_auc_score(all_labels, all_predictions)
except ValueError:
  roc_auc = "Not applicable for multiclass"

print(f'Accuracy: {accuracy:.4f}')
print(f'F1 Score: {f1:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'ROC AUC Score: {roc_auc}')
print('Confusion Matrix:')
print(conf_matrix)

In [None]:
model.load_state_dict(torch.load(best_checkpoint_path )['model_state_dict']) #loading the best checkpoint

In [None]:
heat_map_generator = XGradCAM(model=model, target_layers=[model.get_grad_cam_target_layer()]) #creating the XGradCAM heatmap generator

In [None]:
#path to the dataset to mask
CIFAR10_dataset_to_mask_path = '/content/drive/My Drive/CIFAR10datasettomask_5classes/'
os.makedirs(CIFAR10_dataset_to_mask_path, exist_ok=True)

In [None]:
#we saved the trainset to mask on google drive after applying the ToPILImage transformation
i=0
for el in trainset_CIFAR10_to_mask:
  image=el[0]
  t=transforms.ToPILImage()
  t(image).save(CIFAR10_dataset_to_mask_path+str(i)+'.jpg')
  i+=1

In [None]:
#directory to save the masked data
masked_data_dir_CIFAR10 = '/content/drive/My Drive/maskeddatasetCIFAR10_5classes50epochs/'
os.makedirs(masked_data_dir_CIFAR10, exist_ok=True)

In [None]:
#creation of a list with the correct format to be given in input to the MaskTune function later
CIFAR10_list=[]
for i in range(len(trainset_CIFAR10_to_mask)):
  CIFAR10_list.append((trainset_CIFAR10_to_mask[i][0],masked_data_dir_CIFAR10, CIFAR10_dataset_to_mask_path+str(i)+'.jpg', trainset_CIFAR10_to_mask[i][1]))

In [None]:
#definition of the mask data loader
mask_loader_CIFAR10 = DataLoader(CIFAR10_list, batch_size=128, shuffle=False, num_workers=8)

In [None]:
#computing the mask_tune function for the mask data loader
for data in mask_loader_CIFAR10:
  images, save_dir, images_pathes, targets = data[0], data[1], data[2], data[3]
  images = images.to(device)
  mask_tune(images, save_dir, images_pathes, targets)

In [None]:
#definition of the directories where the masked data of the 5 classes are saved and corresponding lists
dir0 ='/content/drive/My Drive/maskeddatasetCIFAR10_5classes50epochs/0/'
dir1 ='/content/drive/My Drive/maskeddatasetCIFAR10_5classes50epochs/1/'
dir2 ='/content/drive/My Drive/maskeddatasetCIFAR10_5classes50epochs/2/'
dir3 ='/content/drive/My Drive/maskeddatasetCIFAR10_5classes50epochs/3/'
dir4 ='/content/drive/My Drive/maskeddatasetCIFAR10_5classes50epochs/4/'

dir0list = os.listdir(dir0)
dir1list = os.listdir(dir1)
dir2list = os.listdir(dir2)
dir3list = os.listdir(dir3)
dir4list = os.listdir(dir4)

In [None]:
#applying transformations to the masked data of class 0
mask_data_transformed0 = []

for el in dir0list:
  target = 0
  img = Image.open(dir0+el)
  img = t_test(img)
  mask_data_transformed0.append((img, target))

In [None]:
#applying transformations to the masked data of class 1
mask_data_transformed1 = []

for el in dir1list:
  target = 1
  img = Image.open(dir1+el)
  img = t_test(img)
  mask_data_transformed1.append((img, target))

In [None]:
#applying transformations to the masked data of class 2
mask_data_transformed2 = []

for el in dir2list:
  target = 2
  img = Image.open(dir2+el)
  img = t_test(img)
  mask_data_transformed2.append((img, target))

In [None]:
#applying transformations to the masked data of class 3
mask_data_transformed3 = []

for el in dir3list:
  target = 3
  img = Image.open(dir3+el)
  img = t_test(img)
  mask_data_transformed3.append((img, target))

In [None]:
#applying transformations to the masked data of class 4
mask_data_transformed4 = []

for el in dir4list:
  target = 4
  img = Image.open(dir4+el)
  img = t_test(img)
  mask_data_transformed4.append((img, target))

In [None]:
#summing the 5 lists
mask_data_transformed=mask_data_transformed0+mask_data_transformed1+mask_data_transformed2+mask_data_transformed3+mask_data_transformed4

In [None]:
#saving the list of masked data tensors
masked_data_transformed_CIFAR10 = '/content/drive/My Drive/maskeddatasettransformedCIFAR10_5classes50epochs/'
os.makedirs(masked_data_transformed_CIFAR10, exist_ok=True)
torch.save(mask_data_transformed, masked_data_transformed_CIFAR10+'tensors_list.pth')

In [None]:
#defition of the mask transformed data loader
mask_transformed_loader_CIFAR10 = torch.utils.data.DataLoader(CustomDataset(masked_data_transformed_CIFAR10+'tensors_list.pth'), batch_size=128, shuffle=True, num_workers=8)

In [None]:
#directory to the MaskTune probabilities
probabilities_CIFAR10_masktune_path = '/content/drive/My Drive/probabilities_masktune_CIFAR10/'
os.makedirs(probabilities_CIFAR10_masktune_path, exist_ok=True)

In [None]:
#training mask tune and saving probabilities
loss = nn.CrossEntropyLoss() #loss
model = ResNet32(num_classes=5).to(device) #moving the model to device
optimizer = optim.SGD(model.parameters(), lr=0.1, momentum = 0.9, weight_decay = 1e-4) #SGD optimizer
model, optimizer, start_epoch, val_loss, accuracy = load_checkpoint(model, optimizer, last_checkpoint_path) #loading the last checkpoint

num_epochs=1
final_selected_probs = None

for epoch in range(num_epochs):
    model.train() #setting the model to train mode
    running_loss = 0.0
    for inputs, labels in mask_transformed_loader_CIFAR10:
        inputs, labels = inputs.to(device), labels.to(device) #moving the inputs and labels to device
        optimizer.zero_grad() #reset the gradients to 0 before backpropagation
        outputs = model(inputs) #applying the model to the inputs
        loss_f = loss(outputs, labels)
        loss_f.backward()
        optimizer.step()

        running_loss += loss_f.item() * inputs.size(0)

    epoch_loss = running_loss / len(mask_transformed_loader_CIFAR10.dataset) #calculate the average loss for the epoch
    val_loss, val_accuracy, val_probs = validate_selective_classification(model, val_loader_CIFAR10, loss) #validation loss, validation accuracy and probabilities of the selected class
    if epoch == num_epochs - 1: #we use the probabilities of the last epoch
          final_selected_probs = val_probs

    print(f'Epoch {epoch + 1}, Validation Loss: {val_loss:.3f}, Validation Accuracy: {val_accuracy:.2f}%')


    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')
    #save the last checkpoint
    if epoch == num_epochs - 1:
        last_checkpoint_path = f'/content/drive/My Drive/checkpoints_CIFAR10/last_checkpoint_mask_CIFAR10_5classes50epochs_prob.pth'
        torch.save(model.state_dict(), last_checkpoint_path)

#saving the probabilities of the selected class to a csv file
if final_selected_probs is not None:
        df = pd.DataFrame(final_selected_probs, columns=['Selected_Prob'])
        output_path = os.path.join(probabilities_CIFAR10_masktune_path, 'final_probabilities_masktune_5classes50epochs.csv')
        df.to_csv(output_path, index=False)
        print(f'Final epoch probabilities saved to: {output_path}')
#computing the threshold to drop the 10% with the lowest probabilities
sorted_probs = np.sort(final_selected_probs)
threshold = sorted_probs[int(0.1 * len(sorted_probs))]
print(f'Calculated threshold: {threshold}')

In [None]:
#path to the last checkpoint
checkpoint_masktune_CIFAR10_prob='/content/drive/My Drive/checkpoints_CIFAR10/last_checkpoint_mask_CIFAR10_5classes50epochs_prob.pth'

In [None]:
#testing mask tune with no selective classification
model.load_state_dict(torch.load(checkpoint_masktune_CIFAR10_prob)) #loading the last checkpoint
model.eval() #setting the model to evaluation mode
all_predictions = []
all_labels = []

with torch.no_grad():#disabling gradient calculation during testing
    for inputs, labels in test_loader_CIFAR10:
        inputs, labels = inputs.to(device), labels.to(device) #moving inputs and labels to device

        outputs = model(inputs) #applying the model to inputs
        _, predicted = torch.max(outputs, 1) #predictions
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

#computing evaluation metrics
accuracy = accuracy_score(all_labels, all_predictions)
f1 = f1_score(all_labels, all_predictions, average='macro')
precision = precision_score(all_labels, all_predictions, average='macro')
recall = recall_score(all_labels, all_predictions, average='macro')
conf_matrix = confusion_matrix(all_labels, all_predictions)
try:
  roc_auc = roc_auc_score(all_labels, all_predictions)
except ValueError:
  roc_auc = "Not applicable for multiclass"

print(f'Accuracy: {accuracy:.4f}')
print(f'F1 Score: {f1:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'ROC AUC Score: {roc_auc}')
print('Confusion Matrix:')
print(conf_matrix)

Accuracy: 0.8104
F1 Score: 0.8071
Precision: 0.8100
Recall: 0.8104
ROC AUC Score: Not applicable for multiclass
Confusion Matrix:
[[907  31  22  19  21]
 [ 33 956   0   9   2]
 [108  17 627 136 112]
 [ 44  35  68 766  87]
 [ 41  10  59  94 796]]


In [None]:
#testing mask tune with selective classification
model = ResNet32(num_classes=5).to(device) #moving the model to device
model.load_state_dict(torch.load(checkpoint_masktune_CIFAR10_prob)) #loading the last checkpoint
model.eval() #setting the model to evaluation mode
all_predictions = []
all_labels = []

with torch.no_grad():#disabling gradient calculation during testing
    for inputs, labels in test_loader_CIFAR10:
        inputs, labels = inputs.to(device), labels.to(device) #moving inputs and labels to device

        outputs = model(inputs) #applying the model to inputs
        probs = F.softmax(outputs, dim=1)  #computing the probabilities
        selected_probs = probs.gather(1, labels.view(-1, 1)).cpu().numpy() #extract the predicted probability for the true class for each sample
        selected_probs=np.array(selected_probs).flatten() # Convertion to a 1D NumPy array and flattening
        _, predicted = torch.max(probs, 1) #predictions

        mask = selected_probs > threshold # Create a boolean mask where 'selected_probs' are greater than the specified 'threshold'
        filtered_predictions = predicted[mask] # Filter the 'predicted' array using the mask to keep only the predictions where the condition is true
        filtered_labels = labels[mask] # Filter the 'labels' array using the same mask to keep only the corresponding labels

        print(f'Batch size: {inputs.size(0)}')
        print(f'Mask shape: {mask.shape}, Number of filtered predictions: {filtered_predictions.size(0)}')

        all_predictions.extend(filtered_predictions.cpu().numpy())
        all_labels.extend(filtered_labels.cpu().numpy())

#computing evaluation metrics
accuracy = accuracy_score(all_labels, all_predictions)
f1 = f1_score(all_labels, all_predictions, average='macro')
precision = precision_score(all_labels, all_predictions, average='macro')
recall = recall_score(all_labels, all_predictions, average='macro')
conf_matrix = confusion_matrix(all_labels, all_predictions)
try:
  roc_auc = roc_auc_score(all_labels, all_predictions)
except ValueError:
  roc_auc = "Not applicable for multiclass"


print(f'Accuracy: {accuracy:.4f}')
print(f'F1 Score: {f1:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'ROC AUC Score: {roc_auc}')
print('Confusion Matrix:')
print(conf_matrix)

**MAIN RESULTS AND FINAL CONSIDERATIONS**

The final analysis across different tasks and datasets highlights the effectiveness of MaskTune in improving model performance, particularly in addressing issues related to spurious features and selective classification.

In the classification tasks with spurious features, MaskTune demonstrates its capability to significantly enhance model robustness and fairness. On MNIST, MaskTune decreases the accuracy gap between the original and biased test sets, showing a marked improvement in both, which underscores its ability to reduce the model's sensitivity to bias. Similarly, on CelebA, MaskTune not only improves overall accuracy but also achieves a better balance between precision and recall, as evidenced by the improved F1 score and ROC AUC score, indicating enhanced discrimination and a more reliable identification of true positives.

In the selective classification task on CIFAR-10, MaskTune again proves its utility. While the ERM model benefits from selective classification, MaskTune further enhances performance, particularly when combined with selective classification. The improvements in accuracy, F1 score, precision, and recall, along with a more favorable confusion matrix, show that MaskTune helps in reducing misclassifications and improving the model's confidence in its predictions.

Overall, these results underscore MaskTune's efficiency in addressing spurious correlations and improving model reliability through selective classification, making it a valuable technique for enhancing the performance of deep learning models across various datasets and tasks.