<a href="https://colab.research.google.com/github/DanilaMos/Storage/blob/main/Assignment_01/UQ_CIFAR-10N_Ensembling.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Uncertainty Quantification with CIFAR-10N and Ensembling
By *First name* *Second name*.

*Month, Day, 2025.*

## Problem Statement

Re-annotated versions of the CIFAR-10 and CIFAR-100 data which contains real-world human annotation errors. We show how these noise patterns deviate from the classically assumed ones and what the new challenges are. The website of CIFAR-N is available at [cifar-10-100n
](https://github.com/UCSC-REAL/cifar-10-100n/tree/main) project.

# Preparation of simulation models

## Import and Install Libraries

In [21]:
!pip install pytorch-lightning clearml



In [22]:
#Pytorch modules
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader, random_split, TensorDataset
from torchvision.datasets import CIFAR10
from torchvision import datasets, transforms
#scipy
from scipy.stats import mode
#sklearn
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
#Numpy
import numpy as np
#Pandas
import pandas as pd
#Lightning & logging
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint
#Data observation
import os
import sys
import pickle
import requests
from pathlib import Path
#Plotting
import matplotlib.pyplot as plt
import seaborn as sns
#Logging
from clearml import Task

## Set the Models

### Simulation Settings

Check the current directory

In [23]:
os.getcwd() #returns the current working directory

'/content'

In [24]:
# Path to the folder where the pretrained models are saved
CHECKPOINT_PATH = os.environ.get("PATH_CHECKPOINT", "saved_models/")
print(f'CHECKPOINT_PATH: {CHECKPOINT_PATH}')

os.makedirs(CHECKPOINT_PATH, exist_ok=True)

CHECKPOINT_PATH: saved_models/


Set the reproducibility options

In [25]:
# Function for setting the seed to implement parallel tests
SEEDS = [42, 0, 17, 9, 3, 16, 2]
SEED = 42 # random seed by default
pl.seed_everything(SEED)

# Determine the device (GPU if available, otherwise CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Prioritizes speed but may reduce precision
torch.set_float32_matmul_precision('high')

# # Ensure that all operations are deterministic on GPU (if used) for reproducibility
# torch.backends.cudnn.deterministic = True
# torch.backends.cudnn.benchmark = False
# torch.use_deterministic_algorithms(True)

# torch.manual_seed(SEED)
# np.random.seed(SEED)

INFO:lightning_fabric.utilities.seed:Seed set to 42


### Logging

To configure ClearML in your Colab environment, follow these steps:

---

*Step 1: Create a ClearML Account*
1. Go to the [ClearML website](https://clear.ml/).
2. Sign up for a free account if you don’t already have one.
3. Once registered, log in to your ClearML account.

---

*Step 2: Get Your ClearML Credentials*
1. After logging in, navigate to the **Settings** page (click on your profile icon in the top-right corner and select **Settings**).
2. Under the **Workspace** section, find your **+ Create new credentials**.
3. Copy these credentials for a Jupiter notebook into the code cell below.

---

*Step 3: Accessing the ClearML Dashboard*
1. Go to your ClearML dashboard (https://app.clear.ml).
2. Navigate to the **Projects** section to see your experiments.
3. Click on the experiment (e.g., `Lab_1`) to view detailed metrics, logs, and artifacts.

---

In [26]:
#Enter your code here to implement Step 2 of the logging instruction as it is shown below
%env CLEARML_WEB_HOST=https://app.clear.ml/
%env CLEARML_API_HOST=https://api.clear.ml
%env CLEARML_FILES_HOST=https://files.clear.ml
%env CLEARML_API_ACCESS_KEY=ZTNOJ0LDBNTNHV6W9928PB4N6ENY4Y
%env CLEARML_API_SECRET_KEY=23xgA8Vl1w6ehrDVDZycmY0QC4EVLE8W3W6Qn2vZGVDruDBkZL05rDhQIMZJVtj84nM

env: CLEARML_WEB_HOST=https://app.clear.ml/
env: CLEARML_API_HOST=https://api.clear.ml
env: CLEARML_FILES_HOST=https://files.clear.ml
env: CLEARML_API_ACCESS_KEY=ZTNOJ0LDBNTNHV6W9928PB4N6ENY4Y
env: CLEARML_API_SECRET_KEY=23xgA8Vl1w6ehrDVDZycmY0QC4EVLE8W3W6Qn2vZGVDruDBkZL05rDhQIMZJVtj84nM


### Dataset

Summary

In [27]:
DATASET = 'CIFAR10N' # dataset with the real-world noise
# Can be 'clean_label', 'worse_label', 'aggre_label', 'random_label1', 'random_label2', 'random_label3'
NOISE_TYPE = 'worse_label'

NS = {
    'train': 45000,
    'val': 5000,
    'test': 10000
} # for MNIST

SIZE = 32 #image size
NUM_CLASSES = 10
CLASS_NAMES = ['plane', 'car', 'bird', 'cat',
               'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

Normalization parameters

In [28]:
#For the CIFAR-10 dataset
MEAN = np.array([0.491,0.482,0.447])
STD  = np.array([0.247,0.243,0.261])

Transforms

### Collect parameters

In [29]:
#Model parameters
LOSS_FUN = 'CE' # 'CE','CELoss'(custom), 'N', 'B', etc.
ARCHITECTURE = 'CNN' # 'CNN, 'ResNet50', 'ViT', etc.

#Collect the parameters (hyperparams and others)
hparams = {
    "seed": SEED,
    "lr": 0.001,
    'weight_decay': 0.0,
    "dropout": 0.0,
    "bs": 128,
    "num_workers": 0, #set 2 in Colab, or 0 in InnoDataHub
    "num_epochs": 20,
    "criterion": LOSS_FUN,
    "architecture": ARCHITECTURE,
    "num_samples": NS,
    "im_size": SIZE,
    "mean": np.array([0.4914, 0.4822, 0.4465]),
    "std": np.array([0.2470, 0.2435, 0.2616]),
    'randResCrop': {'size': (SIZE, SIZE), 'scale': (0.8, 1.0), 'ratio': (0.9, 1.1)},
    "n_classes": NUM_CLASSES,
    "noise_path": './data/CIFAR-10_human.pt',
    "noise_type": NOISE_TYPE  # Can be 'clean_label', 'worse_label', 'aggre_label', etc.
}

#Visualization
vis_params = {
    'fig_size': 5,
    'num_samples': 5,
    'num_bins': 50,
}

## Functions

### Lightning

Data module

In [30]:
def download_file(url, save_path):
    """Download a file from a URL and save it to the specified path."""
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        os.makedirs(os.path.dirname(save_path), exist_ok=True)  # Ensure directory exists
        with open(save_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"File downloaded and saved to {save_path}")
    else:
        raise Exception(f"Failed to download file from {url}. Status code: {response.status_code}")

In [31]:
class CIFAR10(datasets.CIFAR10):
    """CIFAR10 dataset with noisy labels."""
    def __init__(self, root, train=True, transform=None, target_transform=None,
                 download=False, noise_type=None, noise_path=None, is_human=True):
        super().__init__(root, train=train, transform=transform,
                         target_transform=target_transform, download=download)
        self.noise_type = noise_type
        self.noise_path = noise_path
        self.is_human = is_human

        if self.train and self.noise_type is not None:
            self.load_noisy_labels()

    def load_noisy_labels(self):
        noise_file = torch.load(self.noise_path)
        if isinstance(noise_file, dict):
            if "clean_label" in noise_file.keys():
                clean_label = torch.tensor(noise_file['clean_label'])
                assert torch.sum(torch.tensor(self.targets) - clean_label) == 0
                print(f'Loaded {self.noise_type} from {self.noise_path}.')
                print(f'The overall noise rate is {1 - np.mean(clean_label.numpy() == noise_file[self.noise_type])}')
            self.noisy_labels = noise_file[self.noise_type].reshape(-1)
        else:
            raise Exception('Input Error')

    def __getitem__(self, index):
        img, target = super().__getitem__(index)
        if self.train and self.noise_type is not None:
            target = self.noisy_labels[index]
        return img, target, index

In [32]:
class CIFAR10DataModule(pl.LightningDataModule):
    def __init__(self, params):
        super().__init__()
        self.seed = params['seed']
        self.batch_size = params['bs']
        self.num_workers = params['num_workers']
        self.mean = params['mean']
        self.std = params['std']
        self.ns = params['num_samples']
        self.rand_res_crop = params['randResCrop']
        self.noise_path = params.get('noise_path', './data/CIFAR-10_human.pt')
        self.noise_type = params.get('noise_type', 'worse_label')  # Default to 'worse_label'

        # Ensure the data directory exists
        os.makedirs(os.path.dirname(self.noise_path), exist_ok=True)

        # Download the CIFAR-10_human.pt file if it doesn't exist
        if not os.path.exists(self.noise_path):
            print(f"Downloading CIFAR-10_human.pt from GitHub...")
            download_file(
                url="https://github.com/UCSC-REAL/cifar-10-100n/raw/main/data/CIFAR-10_human.pt",
                save_path=self.noise_path
            )

        self.transform = transforms.Compose([
            transforms.RandomResizedCrop(size=self.rand_res_crop['size'],
                                         scale=self.rand_res_crop['scale'],
                                         ratio=self.rand_res_crop['ratio']),
            transforms.ToTensor(),
            transforms.Normalize(self.mean, self.std)
        ])

    def prepare_data(self):
        # Download CIFAR-10 dataset
        datasets.CIFAR10(root='./data', train=True, download=True)
        datasets.CIFAR10(root='./data', train=False, download=True)

    def setup(self, stage=None):
        # Load noisy labels
        noise_file = torch.load(self.noise_path)
        clean_label = noise_file['clean_label']
        noisy_label = noise_file[self.noise_type]

        # Split dataset into train and validation sets
        cifar10_full = CIFAR10(root='./data', train=True, transform=self.transform,
                               noise_type=self.noise_type, noise_path=self.noise_path, is_human=True)
        pl.seed_everything(self.seed)
        self.cifar10_train, self.cifar10_val = random_split(cifar10_full,
                                                            [self.ns['train'],
                                                             self.ns['val']])
        self.cifar10_test = CIFAR10(root='./data', train=False, transform=self.transform)

    def train_dataloader(self):
        return DataLoader(self.cifar10_train, batch_size=self.batch_size,
                          num_workers=self.num_workers, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.cifar10_val, batch_size=self.batch_size,
                          num_workers=self.num_workers)

    def test_dataloader(self):
        return DataLoader(self.cifar10_test, batch_size=self.batch_size,
                          shuffle=False)

Training module

In [33]:
class train_model(pl.LightningModule):
    def __init__(self, model=None, loss=None, hparams=hparams):
        super().__init__()
        self.save_hyperparameters(hparams)
        self.model = model
        self.loss_fn = loss
        self.nc = hparams['n_classes']
        self.lr = hparams['lr']
        self.wd = hparams['weight_decay']

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x, y, _ = batch  # Unpack batch (ignore indices for now)
        logits = self(x)
        loss = self.loss_fn(logits, y)

        # Log training loss and accuracy
        # preds = torch.argmax(logits[:, :self.nc], dim=1)
        # acc = (preds == y).float().mean()
        self.log('train_loss', loss, on_step=True, on_epoch=True, prog_bar=True)
        # self.log('train_acc', acc, on_step=True, on_epoch=True, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y, _ = batch  # Unpack batch (ignore indices for now)
        logits = self(x)
        loss = self.loss_fn(logits, y)

        # Log validation loss and accuracy
        # preds = torch.argmax(logits[:, :self.nc], dim=1)
        # acc = (preds == y).float().mean()
        self.log('val_loss', loss, on_step=True, on_epoch=True, prog_bar=True)
        # self.log('val_acc', acc, on_step=True, on_epoch=True, prog_bar=True)
        return loss

    def test_step(self, batch, batch_idx):
        x, y, _ = batch  # Unpack batch (ignore indices for now)
        logits = self(x)
        loss = self.loss_fn(logits, y)

        # Log test loss and accuracy
        preds = torch.argmax(logits[:, :self.nc], dim=1)
        acc = (preds == y).float().mean()
        self.log('test_loss', loss, on_step=True, on_epoch=True, prog_bar=True)
        self.log('test_acc', acc, on_step=True, on_epoch=True, prog_bar=True)
        return {'loss': loss, 'preds': preds, 'y': y}

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr, weight_decay=self.wd)

        # Optionally, add a learning rate scheduler
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=1.0)
        return [optimizer], [scheduler]

### Models

CNN from paper by [Xia](https://arxiv.org/abs/2106.00445)

In [34]:
#Copy the code from the paper
def call_bn(bn, x):
    return bn(x)

class CNN(nn.Module):
    def __init__(self, input_channel=3, n_outputs=10, dropout_rate=0.25, top_bn=False):
        self.dropout_rate = dropout_rate
        self.top_bn = top_bn
        super(CNN, self).__init__()
        self.c1=nn.Conv2d(input_channel,128,kernel_size=3,stride=1, padding=1)
        self.c2=nn.Conv2d(128,128,kernel_size=3,stride=1, padding=1)
        self.c3=nn.Conv2d(128,128,kernel_size=3,stride=1, padding=1)
        self.c4=nn.Conv2d(128,256,kernel_size=3,stride=1, padding=1)
        self.c5=nn.Conv2d(256,256,kernel_size=3,stride=1, padding=1)
        self.c6=nn.Conv2d(256,256,kernel_size=3,stride=1, padding=1)
        self.c7=nn.Conv2d(256,512,kernel_size=3,stride=1, padding=0)
        self.c8=nn.Conv2d(512,256,kernel_size=3,stride=1, padding=0)
        self.c9=nn.Conv2d(256,128,kernel_size=3,stride=1, padding=0)
        self.l_c1=nn.Linear(128,n_outputs)
        self.bn1=nn.BatchNorm2d(128)
        self.bn2=nn.BatchNorm2d(128)
        self.bn3=nn.BatchNorm2d(128)
        self.bn4=nn.BatchNorm2d(256)
        self.bn5=nn.BatchNorm2d(256)
        self.bn6=nn.BatchNorm2d(256)
        self.bn7=nn.BatchNorm2d(512)
        self.bn8=nn.BatchNorm2d(256)
        self.bn9=nn.BatchNorm2d(128)

    def forward(self, x,):
        h=x
        h=self.c1(h)
        h=F.leaky_relu(call_bn(self.bn1, h), negative_slope=0.01)
        h=self.c2(h)
        h=F.leaky_relu(call_bn(self.bn2, h), negative_slope=0.01)
        h=self.c3(h)
        h=F.leaky_relu(call_bn(self.bn3, h), negative_slope=0.01)
        h=F.max_pool2d(h, kernel_size=2, stride=2)
        h=F.dropout2d(h, p=self.dropout_rate)

        h=self.c4(h)
        h=F.leaky_relu(call_bn(self.bn4, h), negative_slope=0.01)
        h=self.c5(h)
        h=F.leaky_relu(call_bn(self.bn5, h), negative_slope=0.01)
        h=self.c6(h)
        h=F.leaky_relu(call_bn(self.bn6, h), negative_slope=0.01)
        h=F.max_pool2d(h, kernel_size=2, stride=2)
        h=F.dropout2d(h, p=self.dropout_rate)

        h=self.c7(h)
        h=F.leaky_relu(call_bn(self.bn7, h), negative_slope=0.01)
        h=self.c8(h)
        h=F.leaky_relu(call_bn(self.bn8, h), negative_slope=0.01)
        h=self.c9(h)
        h=F.leaky_relu(call_bn(self.bn9, h), negative_slope=0.01)
        h=F.avg_pool2d(h, kernel_size=h.data.shape[2])

        h = h.view(h.size(0), h.size(1))
        logit=self.l_c1(h)
        if self.top_bn:
            logit=call_bn(self.bn_c1, logit)
        return logit

ResNet50

In [35]:
class ResNet50(nn.Module):
    def __init__(self, n_outputs):
        super(ResNet50, self).__init__()
        self.n_outputs = n_outputs
        # Define your ResNet50 layers here

ViT

In [36]:
class ViT(nn.Module):
    def __init__(self, n_outputs):
        super(ViT, self).__init__()
        self.n_outputs = n_outputs
        # Define your Vision Transformer layers here

### Loss functions

Create a loss function class, or use a standart one.

In [37]:
# Cross entropy loss maden from scratch (just in case)
class CELoss(nn.Module):
    def __init__(self, reduction='mean'):
        super(CELoss, self).__init__()
        self.reduction = reduction

    def forward(self, x, y):
        # Compute softmax probabilities
        prob = nn.functional.softmax(x, 1)
        # Compute log probabilities
        log_prob = -1.0 * torch.log(prob)
        # Gather the log probabilities for the true labels
        loss = log_prob.gather(1, y.unsqueeze(1))
        # Apply reduction
        if self.reduction == 'mean':
            loss = loss.mean()
        elif self.reduction == 'sum':
            loss = loss.sum()
        elif self.reduction == 'none':
            loss = loss.squeeze()  # Remove extra dimension for consistency
        else:
            raise ValueError("Invalid reduction option.")

        return loss

In [38]:
class NLoss(nn.Module):
    def __init__(self, label_smoothing=0.1, num_classes=NUM_CLASSES):
        super(NLoss, self).__init__()
        self.label_smoothing = label_smoothing
        self.num_classes = num_classes
        self.inv_smoothing = 1.0 - label_smoothing  # Probability for the correct class
        self.smoothing = label_smoothing / (num_classes - 1)  # Probability for incorrect classes

    def forward(self, x, y):
        """
        x: Model output (logits + log variance)
            - x[:, :self.num_classes]: Logits for class probabilities (h)
            - x[:, self.num_classes:]: Logarithmic variance (s)
        y: Labels
        """
        # Split the model output into predictions (h) and log variance (s)
        logits = x[:, :self.num_classes]  # Predictions (h)
        log_var = x[:, self.num_classes:]  # Logarithmic variance (s)

        # Apply label smoothing to the one-hot encoded labels
        with torch.no_grad():
            yoh = torch.zeros_like(logits)
            yoh.fill_(self.smoothing / (self.num_classes - 1))
            yoh.scatter_(1, y.data.unsqueeze(1), self.inv_smoothing)

        # Compute the squared differences between predictions and smoothed labels
        squared_diff = torch.pow(yoh - logits, 2)  # (y_k - h_k)^2

        # Compute the exponential of the negative log variance (e^{-s})
        exp_neg_log_var = torch.exp(-log_var)

        # Compute the first term of the loss: e^{-s} * sum((y_k - h_k)^2)
        term1 = exp_neg_log_var * squared_diff.sum(dim=1)

        # Compute the second term of the loss: N * s
        term2 = self.num_classes * log_var

        # Combine the terms and compute the mean over the batch
        loss = (term1 + term2).mean()

        return loss

In [39]:
class BLoss(nn.Module):
    def __init__(self, label_smoothing=0.1, num_classes=NUM_CLASSES):
        super(BLoss, self).__init__()
        self.inv_smoothing = 1.0 - label_smoothing
        self.smoothing = label_smoothing
        self.num_classes = num_classes

    def forward(self, x, y):
        # Extract certainty and probabilities from the model output
        pass
        #Enter your code here

### Models zoo

Architectures and loss functions

In [40]:
def get_arch_and_loss(hparams):
    """
    Returns the architecture and loss function based on the provided hparams.

    Args:
        hparams (dict): Hyperparameters dictionary, including 'ARCHITECTURE' and 'criterion'.

    Returns:
        arch: The model architecture.
        loss: The loss function.
    """
    # Determine the number of outputs based on the loss function
    if hparams['criterion'] in ['B', 'N']:
        n_outputs = hparams['n_classes'] + 1  # Add 1 output neuron for BLoss or NLoss
    else:
        n_outputs = hparams['n_classes']  # Default number of outputs

    # Define the architectures
    architectures = {
        'CNN': CNN(n_outputs=n_outputs),
        'ResNet50': ResNet50(n_outputs=n_outputs),
        'ViT': ViT(n_outputs=n_outputs),
    }

    # Define the loss functions
    losses = {
        'CE': nn.CrossEntropyLoss(),
        'B': BLoss(),
        'N': NLoss(),
    }

    # Get the architecture and loss based on hparams
    arch = architectures.get(hparams['architecture'])
    loss = losses.get(hparams['criterion'])

    if arch is None:
        raise ValueError(f"Architecture '{hparams['ARCHITECTURE']}' is not supported.")
    if loss is None:
        raise ValueError(f"Loss function '{hparams['criterion']}' is not supported.")

    return arch, loss


### Metrics

In [41]:
def metrics(dataloader,model,hparams=hparams,loss_fn_red=None):
    # Collect images, predictions, and losses
    # images = []
    preds  = []
    labels = []
    losses = []
    correct= 0
    total  = 0
    for batch in dataloader:
        x, y, _ = batch
        with torch.no_grad():
            logits = model(x)
            # loss = loss_fn_red(h,y)
            pred = torch.argmax(logits[:,:hparams['n_classes']], dim=1)
        correct += (pred == y).sum().item()  # Number of correct predictions
        total += y.size(0)  # Total number of samples

        # images.extend(x.cpu())
        preds.extend(pred.cpu().numpy())
        labels.extend(y.cpu().numpy())
        # losses.extend(loss.cpu().numpy())
    acc = correct / total
    return preds, labels, acc

# Ensembling
This approach is expected to give a robust ensemble model that leverages the diversity introduced by different seeds, potentially improving the overall accuracy on the test set.

## Create Dataset and Data Loaders

Initialization of the dataset, the dataloader, and the training module

In [42]:
# Enter your code here
data_module = CIFAR10DataModule(hparams)

Downloading CIFAR-10_human.pt from GitHub...
File downloaded and saved to ./data/CIFAR-10_human.pt


## Train the Ensemble

Loop over different seeds

In [None]:
# List to store predictions from each model
all_predictions = []

In [None]:
for seed in SEEDS:
    # Set seed for reproducibility at the VERY BEGINNING
    pl.seed_everything(seed)

    # Reinitialize the model architecture for each seed
    #arch, loss_fn =  #Enter your code here


    #checkpoint_callback_img = #Enter your code here

    #task = #Enter your code here

    # Initialize the model with the reinitialized architecture
    # model = #Enter your code here

    # Log hyperparameters to ClearML
    task.connect(model.hparams)

    #trainer = #Enter your code here

    # best_model_path = #Enter your code here
    task.update_output_model(model_path=best_model_path, auto_delete_file=False)
    # best_model = #Enter your code here

    # Test set
    # test_dataloader = #Enter your code here

    # Move the model to the correct device
    # best_model = #Enter your code here
    # predictions = #Enter your code here

    if seed != SEEDS[-1]:
        task.close()
        del[model, best_model, task, arch, loss_fn]

## Test the models and the ensemble of the models

In [None]:
all_predictions

[array([5, 8, 8, ..., 5, 1, 7]),
 array([3, 1, 8, ..., 5, 1, 7]),
 array([5, 8, 8, ..., 5, 1, 7]),
 array([5, 8, 8, ..., 5, 1, 7]),
 array([5, 8, 8, ..., 5, 1, 7]),
 array([5, 8, 8, ..., 5, 1, 7]),
 array([3, 8, 8, ..., 5, 1, 7])]

Individual models

In [None]:
# List to store individual model accuracies
individual_accuracies = []

# Compute accuracy for each model
for i, predictions in enumerate(all_predictions):
    # Get predictions for the current model
    model_predictions = predictions  # Shape: (num_samples,)

    # Get true labels (already collected earlier)
    true_labels = np.array(data_module.cifar10_test.targets)

    # Calculate accuracy for the current model
    accuracy = accuracy_score(true_labels, model_predictions)
    individual_accuracies.append(accuracy)
    print(f'Model {i+1} Accuracy: {accuracy:.4f}')

# Convert to numpy array for easier calculations
individual_accuracies = np.array(individual_accuracies)

# Compute mean accuracy
mean_accuracy = np.mean(individual_accuracies)

# Compute standard deviation of accuracy
std_accuracy = np.std(individual_accuracies)

print(f'Mean Accuracy: {mean_accuracy:.4f}')
print(f'Standard Deviation of Accuracy: {std_accuracy:.4f}')

Model 1 Accuracy: 0.7183
Model 2 Accuracy: 0.7062
Model 3 Accuracy: 0.7123
Model 4 Accuracy: 0.7151
Model 5 Accuracy: 0.7116
Model 6 Accuracy: 0.7174
Model 7 Accuracy: 0.7141
Mean Accuracy: 0.7136
Standard Deviation of Accuracy: 0.0038


Ensemble

In [None]:
# Stack predictions from all models
all_predictions = np.stack(all_predictions)  # Shape: (num_models, num_samples, num_classes)

# Ensemble predictions (e.g., by averaging)
ensemble_predictions = np.mean(all_predictions, axis=0)  # Shape: (num_samples, num_classes)
final_predictions, _ = mode(all_predictions, axis=0)  # Majority voting
final_predictions = final_predictions.flatten()  # Flatten to 1D array

# Get true labels from the CIFAR-10 data set
test_labels = np.array(data_module.cifar10_test.targets)
# test_labels = data_module.test_dataset.labels  # Adjust this based on your dataset

# Calculate accuracy
accuracy = accuracy_score(test_labels, final_predictions)
print(f'Ensemble Accuracy: {accuracy:.4f}')

# Compute confusion matrix
cm = confusion_matrix(test_labels, final_predictions)

Ensemble Accuracy: 0.7426


In [None]:
# Simulated test metrics
test_metrics = {
    "Mean Accuracy (individual)": mean_accuracy,
    "Standard Deviation of Accuracy (individual)": std_accuracy,
    "Ensemble Accuracy": accuracy,
}

task.connect(test_metrics)

{'Mean Accuracy (individual)': 0.7135714285714287,
 'Standard Deviation of Accuracy (individual)': 0.0037696965719932834,
 'Ensemble Accuracy': 0.7426}

In [None]:
task.close()