<a href="https://colab.research.google.com/github/giacomobinco/FDS-Final-Project/blob/main/CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Loading all the necessary libraries

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import seaborn as sns

import random

import zipfile
import os
from tqdm import tqdm
import shutil
import warnings

from google.colab import files
from google.colab import drive

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms
from torch.utils.data import random_split
from torch.nn.functional import softmax

from sklearn.model_selection import KFold, StratifiedShuffleSplit
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, roc_curve, auc
from sklearn.preprocessing import label_binarize

In [None]:
# Enabling the use of the new API settings
torch.backends.cuda.matmul.fp32_precision = "ieee"

In [None]:
# Checking if the GPU is selected as hardware accelerator
print(torch.cuda.is_available())
print(torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU")

If it returns *False, No GPU*, follow this path:

Runtime > Cambia tipo di runtime > Acceleratore hardware > select *T4 GPU*

In [None]:
# Define utility functions to control in the best way possible the reproducibility of future results
# (A complete reproducibility is not guaranteed though)

# Function 1

def seed_everything(seed: int):

    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)

    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)

    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# Function 2

def seed_worker(worker_id):

    worker_seed = torch.initial_seed() % 2**32
    np.random.seed(worker_seed)
    random.seed(worker_seed)

In [None]:
# Executing these functions

seed_everything(42);

g = torch.Generator(); # "g" will be implemented in the "DataLoader()" functions ...
g.manual_seed(42);     # ... together with the just defined function "worker_seed()"

In [None]:
# Uploading the zip folder ("fma_spectrograms.zip") with all the Mel-spectograms generated from the "fma_small" dataset
# [ !! It takes about 25 minutes !! ]

spectograms = files.upload()

In [None]:
# Extracting all these images (spectrograms)

zip_path = list(spectograms.keys())[0]
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall('/content/dataset')

In [None]:
# Create the folder in which we want to save in an organized way the spectrograms
output_directory = "fma_spectrograms_organized"
os.makedirs(output_directory, exist_ok = True)

# Open the zip folder previously loaded in "reading" ("r") modality
with zipfile.ZipFile("fma_spectrograms.zip", "r") as zip_ref:

    # Gets the complete path of every file (.png in our case) in the folder
    for member in zip_ref.namelist():

        # Remove every sub-folder, if present
        filename = os.path.basename(member)

        # Skip the element if it's a folder and not a file
        if filename:

            # Open the file within the folder
            source = zip_ref.open(member)

            # Create the path in which the spectrogram (file) will be saved ...
            target_path = os.path.join(output_directory, filename)

            # ... and actually save it
            with open(target_path, "wb") as target:
                with source as src:
                    target.write(src.read())

In [None]:
# Connecting Google Drive to the present notebook ...
drive.mount('/content/drive')

In [None]:
# ... to upload the metadata on the audio tracks
tracks = pd.read_csv(r"/content/drive/MyDrive/tracks.csv", index_col = 0, header = [0,1])

In [None]:
# CNNs work better if the dataset is organized in folders, and that's what the next cell of code does.
# The following structure is in fact expected:
#
# dataset/
#    Rock/
#        1234.png
#        9876.png
#        ...
#    Classical/
#        8765.png
#        ...
#    Hip-Hop/
#        ...
#    .../

In [None]:
# "fma_spectrograms_organized" (assigned to the variable "output_directory") is the folder created in the previous part of code,
# as the name suggests, to save and organize all the spectrograms; whereas the final dataset will be saved in the following folder:
dataset_directory = "dataset"
os.makedirs(dataset_directory, exist_ok = True)

# Counters
copied = 0
skipped = 0

# For every file (spectrogram) saved in "output_directory" ...
for fname in os.listdir(output_directory):

    # ... consider only the .png files (there shouldn't be any others though)
    if not fname.lower().endswith(".png"):
        continue

    # Get the audio track ID from the file name (according to the method used to generate them in the first place)
    track_id_str = fname.split("_")[0]

    # Convert the file name (a sequence of numbers) in a "int" object
    try:
        track_id = int(track_id_str)

    # It the conversion fails, the file is skipped
    except:
        skipped += 1
        continue

    # If the audio track ID doesn't exist in the audio tracks dataset (file "tracks.csv"), the file is skipped
    if track_id not in tracks.index:
        skipped += 1
        continue

    # From said dataset, collect the audio track's main genre
    genre = tracks.loc[track_id, ('track','genre_top')]

    # Create the genre folder (see the structure that the dataset should have)
    genre_directory = os.path.join(dataset_directory, genre)
    os.makedirs(genre_directory, exist_ok = True)

    # Copy the spectrogram in the corresponding genre folder
    shutil.copy(os.path.join(output_directory, fname), os.path.join(genre_directory, fname))
    copied += 1

# Visual check
print(f"✅ {copied} files has been copied in the genres' sub-folders")
if skipped > 0:
  print(f"\n⚠️ {skipped} files with no correspondece has been found")

In [None]:
# To remove a directory (two specific ones in this case):
!rm -rf dataset/fma_spectrograms/
!rm -rf sample_data/

In [None]:
# Using the current structure of the dataset (organized in sub-folders per genre), two sets are defined
# - The training test will be used to train the model, evaluate each execution and consequently update it
# - The test set will be used to compute the final evaluation of the model
# Since the dataset is almost perfectly balanced, the same structure is kept in both the training and the test sets
# --> considering 8000 images (1000 per genre), the training set contains 80% of them (6400 images, 800 per genre),
#     while the test set has the remaing 20% (1600 images, 200 per genre)

# Output directories, aka where the images will be copied from "dataset"
# (The dataset directory has been created in the previous cell)
TRAIN_DIRECTORY = "/content/dataset_train"
TEST_DIRECTORY  = "/content/dataset_test"
os.makedirs(TRAIN_DIRECTORY, exist_ok = True)
os.makedirs(TEST_DIRECTORY, exist_ok = True)

# 1. Loading the images (spectrograms) and the labels (genres)

file_paths = [] # it will contain all the images
labels = []     # it will contain the corresponding labels

genres = sorted(os.listdir(dataset_directory))

for genre in genres:

    genre_dir = os.path.join(dataset_directory, genre)
    images = os.listdir(genre_dir) # all the images contained in a specific genre sub-folder (the one of the currente iteration)

    for img in images:

        file_paths.append(os.path.join(genre_dir, img)) # add the image to the container
        labels.append(genre)                            # add the corresponding genre to the container

file_paths = np.array(file_paths)
labels = np.array(labels)

# 2. Creating the split with "StratifiedShuffleSplit()", which automatically keeps the dataset balanced

splitter = StratifiedShuffleSplit(n_splits = 1, test_size = 0.20, random_state = 42)

for train_idx, test_idx in splitter.split(file_paths, labels):

    train_files = file_paths[train_idx] # spectrograms of the trainig set
    train_labels = labels[train_idx]    # genres of these spectrograms (training)
    test_files  = file_paths[test_idx]  # spectrograms of the test set
    test_labels = labels[test_idx]      # genres of these spectrograms (test)

# 3. Copying the files in the corresponding folders (created at the beginning of this cell)

def copy_files(files, labels, destination_root):
    # Utility function to use to execute this last task
    for file, label in zip(files, labels):
        dest_dir = os.path.join(destination_root, label) # selecting the correct folder
        os.makedirs(dest_dir, exist_ok = True)           # checking it exits
        shutil.copy(file, dest_dir)                      # copying the image

copy_files(train_files, train_labels, TRAIN_DIRECTORY)
copy_files(test_files, test_labels, TEST_DIRECTORY)

In [None]:
# @title
# Ignoring the normalization temporarily in order to compute the mean and standard deviation of the training dataset
# (See the following cell of code to understand the current)

transform_no_norm = transforms.Compose([
    transforms.ToTensor()])

dataset_no_norm = datasets.ImageFolder(TRAIN_DIRECTORY, transform=transform_no_norm)

loader = DataLoader(dataset_no_norm, batch_size = 64, shuffle = False, num_workers = 2)

mean = 0.0
std = 0.0
total_images = 0

for images, _ in loader:
    # Structure of the images' shape: (batch, channels, H, W)
    batch_samples = images.size(0)
    images = images.view(batch_samples, images.size(1), -1)

    mean += images.mean(2).sum(0)
    std += images.std(2).sum(0)
    total_images += batch_samples

mean /= total_images
std /= total_images

print("Training set\n")
print("Mean per channel =", mean)
print("Standard Deviation per channel =", std)

In [None]:
# @title
# Ignoring the normalization temporarily in order to compute the mean and standard deviation of the test dataset
# (See the following cell of code to understand the current)

transform_no_norm = transforms.Compose([
    transforms.ToTensor()])

dataset_no_norm = datasets.ImageFolder(TEST_DIRECTORY, transform=transform_no_norm)

loader = DataLoader(dataset_no_norm, batch_size = 64, shuffle = False, num_workers = 2)

mean = 0.0
std = 0.0
total_images = 0

for images, _ in loader:
    # Structure of the images' shape: (batch, channels, H, W)
    batch_samples = images.size(0)
    images = images.view(batch_samples, images.size(1), -1)

    mean += images.mean(2).sum(0)
    std += images.std(2).sum(0)
    total_images += batch_samples

mean /= total_images
std /= total_images

print("Test set\n")
print("Mean per channel =", mean)
print("Standard Deviation per channel =", std)

In [None]:
# Creating a dataset of images for PyTorch, and loading it in a DataLoader object
# Doing so allows the dataset to be ready for the training of a CNN

training_transform = transforms.Compose([
    transforms.Resize((224, 224)),                   # Transform the dimension of every image in a standard format (224*224 in this case)
    transforms.ToTensor(),                           # Trasform every image into a PyTorch tensor, mapping the values from [0, 255] to [0, 1]
    transforms.Normalize((0.6556, 0.2476, 0.2829),   # Standardize the values using the training dataset's mean and std computed in the previous cells
                         (0.2269, 0.1693, 0.1223))]) # The values are now mapped in [-1, 1]

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),                   # Transform the dimension of every image in a standard format (224*224 in this case)
    transforms.ToTensor(),                           # Trasform every image into a PyTorch tensor, mapping the values from [0, 255] to [0, 1]
    transforms.Normalize((0.6495, 0.2441, 0.2835),   # Standardize the values using the test dataset's mean and std computed in the previous cell
                         (0.2280, 0.1685, 0.1227))]) # The values are now mapped in [-1, 1]

# The dataset are now generated (the structure required by CNNs is verified)
train_dataset = datasets.ImageFolder(TRAIN_DIRECTORY, transform = training_transform)
test_dataset = datasets.ImageFolder(TEST_DIRECTORY, transform = test_transform)

# Check if everything went good
print("Training set ------------------------------")
print("\nNumber of categories found:", len(train_dataset.classes)) # they should be 8
print("Total number of images uploaded:", len(train_dataset))    # they should be 6397
print("\nTest set ----------------------------------")
print("\nNumber of categories found:", len(test_dataset.classes)) # they should be 8
print("Total number of images uploaded:", len(test_dataset))    # they should be 1600

In [None]:
# Define the Convolutional Neural Network (CNN)

class simple_SpectrogramCNN(nn.Module):

    def __init__(self, num_classes):

        super(simple_SpectrogramCNN, self).__init__()

        # Define the Convolutional Layers

        self.conv_layers = nn.Sequential(

            # First Convolutional Layer
            nn.Conv2d(3, 32, kernel_size = 5, stride = 1, padding = 1),
            # Only in this initial layer the kernell is a 5*5 block instead of a 3*3, in order to capture more information from the spectrograms in input
            nn.BatchNorm2d(32),
            nn.LeakyReLU(negative_slope = 0.01),
            # The activation function LeakyReLU() is preferred to ReLu() because it captures also negative values, instead of forcing them to zero
            # (however, they have a very low weight, set to 0.01)
            nn.Dropout2d(0.2), # Usually, the dropout rate lies in the interval [0.1, 0.3] for the Convolutional Layers,
                               # and in [0.4, 0.6] for the Fully-connected Layers
            nn.MaxPool2d(2),   # "MaxPool2d()" is implemented using 2*2 blocks, and preferred to "AvgPool2d()" in the case of image classification

            # Second Convolutional Layer
            nn.Conv2d(32, 64, kernel_size = 3, stride = 1, padding = 1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(negative_slope = 0.01),
            nn.Dropout2d(0.2),
            nn.MaxPool2d(2),

            # Third Convolutional Layer
            nn.Conv2d(64, 128, kernel_size = 3, stride = 1, padding = 1),
            nn.BatchNorm2d(128),
            nn.GELU(),         # For this final Convolutional Layer, GELU() is preferred to ReLU(), whereas the latter will be used in the FC Layers, ...
            nn.Dropout2d(0.3), # ... and the Dropout rate is increased by 0.1 to avoid overfitting
            nn.MaxPool2d(2),

            # Fourth Convolutional Layer
            nn.Conv2d(128, 256, kernel_size = 3, stride = 1, padding = 1),
            nn.BatchNorm2d(256),
            nn.GELU(),
            nn.Dropout2d(0.3),
            nn.MaxPool2d(2),

            nn.AdaptiveAvgPool2d((1,1)) # Reduce drastically the number of parameters, to avoid having an incredibly long vector with nn.Flatten()
                                        # (With (2,2) or (3,3) as arguments, the reduction is less pronounced)
        )

        # Computing the output's dimension of the final Convolutional Layer
        # It will be the input dimension of the first Linear Fully-connected Layer

        input_shape = (3, 224, 224)

        with torch.no_grad():
            dummy = torch.zeros(1, *input_shape)
            conv_out = self.conv_layers(dummy)
            conv_out_size = conv_out.view(1, -1).size(1)

        # Define the Fully-connected Layers

        self.fc_layers = nn.Sequential(
            nn.Flatten(),                  # Transform the feature maps of the final Convolutional Layer in a vector of C*H*W length
            nn.Linear(conv_out_size, 256), # Reduce dimensionality from C*H*W to 256 with a linear transformation
            nn.ReLU(),                     # ReLU is used as Activation Function
            nn.Dropout(0.5),               # A "bigger" dropout is used to avoid overfitting
            nn.Linear(256, num_classes)    # Reduce dimensionality from 256 to 8 (# categories) with a linear transformation
            # The softmax function is not required since it's direclty implemented in the definition of the Loss function (CrossEntropyLoss, see the next cell)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = self.fc_layers(x)
        return x

In [None]:
# Count the number of parameters in the model

# This function calculates the total number of trainable parameters in a PyTorch model.
# It helps in understanding the complexity and capacity of different models.
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

CNN_model = simple_SpectrogramCNN(num_classes = 8)
print("MLP Parameters:", count_parameters(CNN_model))

In [None]:
# Define a function that computes the necessary metrics to evaluate the model

def model_metrics(model, dataloader, device, num_classes, top_k = 3):

  # The function accepts as input:
  # - the model (CNN) for which it will compute the metrics
  # - a datset of the "DataLoader" type to use for the computation
  # - a "torch.device" object (see the previous cell)
  # - the number of categories
  # - the value of k for the Top-k Accuracy (see the following code)

  model.eval() # set the model in modality "evaluation"

  # ------------------------------
  # 1. Saving the labels, predictions and vectors of probabilities
  # ------------------------------

  all_labels = []        # it will contain the true values (labels indeed)
  all_predictions = []   # it will contain all the predictions
  all_probabilities = [] # it will contain the vector of probabilities for every image

  with torch.no_grad(): # block the tracking of the gradients (they're not needed during the evaluation phase)

    for X, y in dataloader: # "X" is a tensor containing the features, while "y" contains the labels (targets)

      X, y = X.to(device), y.to(device) # to accelerate the computation

      outputs = model(X)                        # For every image, generate the scores for every class, but ...
      predictions = outputs.argmax(dim = 1)     # ... save only the top-score (the prediction)
      probabilities = softmax(outputs, dim = 1) # Save the probabilities of belonging to every class for every image

      # Adding the labels, predictions and probabilities to the corresponding containers ...
      all_labels.extend(y.cpu().numpy())
      all_predictions.extend(predictions.cpu().numpy())
      all_probabilities.extend(probabilities.cpu().numpy())

  # ... and converting them into NumPy arrays
  all_labels = np.array(all_labels)
  all_predictions = np.array(all_predictions)
  all_probabilities = np.array(all_probabilities)

  # ------------------------------
  # 2. Computing the Top-3 Accuracy (The Accuracy is computed considering the first three more likely categories,
  # not only the top-1, aka the prediction)
  # ------------------------------

  topk_correct = 0

  for t, p in zip(all_labels, all_probabilities):
        if t in np.argsort(p)[-top_k:]:
          topk_correct += 1

  topk_accuracy = topk_correct / len(all_labels)

  # ------------------------------
  # 3. Confusion Matrix
  # ------------------------------

  conf_matrix = confusion_matrix(all_labels, all_predictions, labels = np.arange(num_classes))

  # ------------------------------
  # 4. Per-class Metrics ("classification_report()" computes precision, recall and f1-score)
  # ------------------------------

  per_class = classification_report(
        all_labels, all_predictions, output_dict = True, zero_division = 0)
  # The default option returns a formatted string, that can't be used to do additional computations:
  # setting "output_dict" to True returns a Dictionary instead, and thus solves the problem

  # ------------------------------
  # 5. Saving all the metrics in a Dictionary
  # ------------------------------

  metrics = {

        # Overall metrics (between three different options to compute these metrics - macro, micro and weighted - the first one was selected,
        # since it's generally better for balanced datasets, such as the one we're considering)
        "accuracy": accuracy_score(all_labels, all_predictions),
        "precision": precision_score(all_labels, all_predictions, average = "macro", zero_division = 0),
        "recall": recall_score(all_labels, all_predictions, average = "macro", zero_division = 0),
        "f1-score": f1_score(all_labels, all_predictions, average = "macro", zero_division = 0),

        "per_class": per_class,
        "confusion_matrix": conf_matrix,
        "topk_accuracy": topk_accuracy,

        "y_true": all_labels,
        "y_predictions": all_predictions,
        "y_probabilities": all_probabilities
    }

  return metrics

In [None]:
# To avoid a specific warning during the execution of the next cell
torch._inductor.config.max_autotune = False

In [None]:
# Cross-Validation is used to compute the metrics defined in the previous cell, together with many others

num_classes = len(train_dataset.classes) # number of categories in the dataset
batch_size = 128                         # number of images per batch (it will be used in the "DataLoader()" function)

k_folds = 4
# The "KFold()" function generates an object that automatically manages the Cross-Validation
kfold = KFold(n_splits = k_folds, shuffle = True, random_state = 42)

all_metrics = []       # it will contain the metrics computed for every fold through the "model_metrics()" function (list of 5 dictionaries)
accuracy_per_fold = [] # it will contain the Accuracy computed for every fold (5-elements vector)

global_confusion_matrix = np.zeros((num_classes, num_classes), dtype = int) # initialize the confusion matrix as populated only of zeros

# As in the "model_metrics()" function:
all_labels = []        # it will contain the true values (labels indeed)
all_predictions = []   # it will contain all the predictions
all_probabilities = [] # it will contain the vectors of probabilities

histories = [] # it will contain the training loss and the validation loss for every fold

# In order to speed up a little the training of the CNN
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
scaler = torch.amp.GradScaler('cuda')

for fold, (train_idx, valid_idx) in enumerate(kfold.split(train_dataset)):

    # Create the Training and Validation sets ...
    train_subset = Subset(train_dataset, train_idx)
    valid_subset = Subset(train_dataset, valid_idx)

    # ... and transform them in a "DataLoader" object
    train_loader = DataLoader(train_subset, batch_size = batch_size, shuffle = True,
                              num_workers = 2, pin_memory = True, worker_init_fn = seed_worker, generator = g)
    valid_loader = DataLoader(valid_subset, batch_size = batch_size, shuffle = False,
                              num_workers = 2, pin_memory = True, worker_init_fn = seed_worker, generator = g)
    # About the "DataLoader()" function:
    # --> "batch_size": every batch will be formed by that number of images (a bigger value optimizes the CPU usage, until it explodes at least)
    # --> "shuffle": if TRUE, the order will be shuffled for every epoch
    # --> "num_workes": the execution will have a better CPU usage (in a "local execution", it can even be set to 4-8)
    # --> "pin_memory": accelerates the transfer GPU - CPU

    # Initialize the model (CNN)
    model = simple_SpectrogramCNN(num_classes).to(device)
    model = torch.compile(model)

    criterion = torch.nn.CrossEntropyLoss()                                           # Loss function selected
    optimizer = torch.optim.Adam(model.parameters(), lr = 0.001, weight_decay = 1e-6) # Optimization technique implemented
    # (L2 regularization (through "weight_decay") is added to the optimization technique to prevent overfitting)

    history = {"train_loss": [], "valid_loss": []} # it will contain the training loss and the validation loss of the current fold

    # Training

    epochs = 10

    for epoch in range(epochs):

      model.train() # setting the model in "training modality"

      train_loss = 0 # it will contain the cumulative sum of the current epoch's losses (one per batch)

      for images, labels in train_loader:

        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad() # cleaning the gradient values of the previous epoch: they're all set to zero

        with torch.amp.autocast('cuda'): # accelerate the execution

            outputs = model(images)           # The output is computed ...
            loss = criterion(outputs, labels) # ... as well as the value of the loss

        # Backpropagation is implemented ...
        scaler.scale(loss).backward()
        # ... and the CNN's parameters are updated accordingly
        scaler.step(optimizer)
        scaler.update()

        train_loss += loss.item() # Update the value with the current batch's loss

      train_loss_epoch = train_loss / len(train_loader) # ... compute the mean training loss of the current epoch ...
      history["train_loss"].append(train_loss_epoch)    # ... and add it to the current fold's container

      # The same reasoning can be used for the validation loss of the current epoch

      model.eval() # setting the model in "evaluation modality"

      valid_loss = 0

      with torch.no_grad():

        for images, labels in valid_loader:

          images, labels = images.to(device), labels.to(device)
          outputs = model(images)

          loss = criterion(outputs, labels)
          valid_loss += loss.item()

      valid_loss_epoch = valid_loss / len(valid_loader)
      history["valid_loss"].append(valid_loss_epoch)

    # Evaluation metrics

    metrics = model_metrics(model, valid_loader, device, num_classes, top_k = 3) # Metrics of the current fold ...
    all_metrics.append(metrics)                                                  # ... added to the general container

    accuracy_per_fold.append(metrics["accuracy"]) # save the accuracy of the current fold

    global_confusion_matrix += metrics["confusion_matrix"] # update the confusion matrix

    histories.append(history) # add the loss values (training and validation) of the current fold to the general container

    # Saving the labels, predictions and probabilities of the current fold
    all_labels.extend(metrics["y_true"])
    all_predictions.extend(metrics["y_predictions"])
    all_probabilities.extend(metrics["y_probabilities"])

    print(f"\n✅ Training and Evaluation over Fold {fold+1}/{k_folds} completed") # visual check

In [None]:
# Overall results

print("Overall metrics over 5-folds Cross-Validation\n")

# Utility function for computing the mean value of a certain metric
def mean_metric(metric_name):
    return np.mean([m[metric_name] for m in all_metrics])

print("Accuracy in every fold:")
for i, acc in enumerate(accuracy_per_fold):
    print(f"Fold {i+1}: {acc:.4f}")

print(f"\nMean Accuracy: {mean_metric('accuracy'):.4f}")
print(f"Top-3 Mean Accuracy: {mean_metric('topk_accuracy'):.4f}")

print(f"\nMean Precision: {mean_metric('precision'):.4f}")
print(f"Mean Recall: {mean_metric('recall'):.4f}")
print(f"Mean F1-score: {mean_metric('f1-score'):.4f}")

In [None]:
# Plotting the aggregated (over the 5-folds) Confusion Matrix

plt.figure(figsize = (8,6))
sns.heatmap(global_confusion_matrix, annot = True, fmt = "d", cmap = "Blues")
plt.title("Aggregated Confusion Matrix")
plt.xlabel("Predicted values")
plt.ylabel("True values")
plt.show()

In [None]:
# Plotting the Learning Curve "Mean (over the 5-folds) Training Loss vs Epochs"

# Extracting the training loss for every fold ...
all_train_losses = [h["train_loss"] for h in histories]  # [list of lists]
# ... and converting them into a NumPy array
all_train_losses = np.array(all_train_losses)

# Computing the mean value for every fold
mean_train_loss = np.mean(all_train_losses, axis = 0)

# Plotting the results
plt.plot(mean_train_loss)
plt.title("Mean Training Loss vs Epochs (over 5-folds CV)")
plt.xlabel("Epoch")
plt.ylabel("Mean Loss")
plt.grid()
plt.show()

In [None]:
# Plotting the Learning Curves "Training Loss vs Validation Loss" for every epoch in every fold

# Layout settings
fig, axes = plt.subplots(2, 2, figsize = (12, 10))
axes = axes.flatten()

for i, h in enumerate(histories):
    ax = axes[i]
    ax.plot(h["train_loss"], label="Training Loss")
    ax.plot(h["valid_loss"], label="Validation Loss")
    ax.set_title(f"Fold {i+1}")
    ax.set_xlabel("Epoch")
    ax.set_ylabel("Loss")
    ax.legend()
    ax.grid(True)

plt.tight_layout()
plt.show()

In [None]:
# Plotting the aggregated per-class metrics (precision, recall, f1-score)

# Instead of considering the value computed in every fold (saved in "all_metrics") for these three metrics,
# the latter are computed confronting all predictions and labels (7997*5)
precision, recall, f1, _ = precision_recall_fscore_support(
    all_labels, all_predictions, average = None, labels = np.arange(num_classes))

plt.figure(figsize = (10,5))
plt.bar(np.arange(num_classes) - 0.2, precision, width = 0.2, label = "Precision")
plt.bar(np.arange(num_classes), recall, width = 0.2, label = "Recall")
plt.bar(np.arange(num_classes) + 0.2, f1, width = 0.2, label = "F1-score")
plt.xticks(np.arange(num_classes))
plt.legend()
plt.title("Aggregated per-class metrics (over 5-folds CV)")
plt.show()

In [None]:
# Plotting the ROC Curve for every class

y_true_bin = label_binarize(all_labels, classes = np.arange(num_classes))
all_probabilities = np.array(all_probabilities)

plt.figure(figsize = (10,7))

for i in range(num_classes):
    fpr, tpr, _ = roc_curve(y_true_bin[:,i], all_probabilities[:,i])
    roc_auc = auc(fpr, tpr)
    plt.plot(fpr, tpr, label = f"Class {i} (AUC = {roc_auc:.4f})")

plt.plot([0,1], [0,1], '--')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("Aggregated ROC curves One-vs-Rest (over a 5-fold CV)")
plt.legend()
plt.show()

In [None]:
# After training and evaluating the model on 80% of the data through Cross-Validation, this portion of the dataset
# will be used one last time, in its entirety, to train the CNN (its "optimized version"). The metrics of the model
# will be then computed on the test set (the remaining 20% of the data) as final evaluation

# 1. Preparing the two sets

train_loader = DataLoader(train_dataset, batch_size = 128, shuffle = True,
                              num_workers = 2, pin_memory = True, worker_init_fn = seed_worker, generator = g)
test_loader = DataLoader(test_dataset, batch_size = 128, shuffle = True,
                              num_workers = 2, pin_memory = True, worker_init_fn = seed_worker, generator = g)

# 2. Training and evaluating the model one last time

epochs = 10
all_losses = []

for epoch in range(epochs):

    epoch_loss = {"train_loss": [], "test_loss": []}
    train_loss = 0.0
    test_loss = 0.0

    # Training

    for images, labels in train_loader:

        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()

        with torch.amp.autocast('cuda'):
            outputs = model(images)
            loss = criterion(outputs, labels)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        train_loss += loss.item()

    train_loss_epoch = train_loss / len(train_loader)
    epoch_loss["train_loss"] = train_loss_epoch

    # Evaluation

    model.eval()

    with torch.no_grad():

        for images, labels in test_loader:

          images, labels = images.to(device), labels.to(device)
          outputs = model(images)

          loss = criterion(outputs, labels)
          test_loss += loss.item()

    test_loss_epoch = test_loss / len(test_loader)
    epoch_loss["test_loss"] = test_loss_epoch

    all_losses.append(epoch_loss)

In [None]:
# Computing the final Accuracy on the Test set

model.eval()

correct = sum(
    (torch.max(model(images.to(device)).data, 1)[1] == labels.to(device)).sum().item()
    for images, labels in test_loader)
total = len(test_dataset)

print(f"Accuracy on the Test set: {correct / total:.4f}")

In [None]:
# Plotting the Learning Curve "Training Loss vs Validation Loss" for every epoch

# Extracting the necessary values
train_losses = [x["train_loss"] for x in all_losses]
test_losses = [x["test_loss"] for x in all_losses]

# Plotting the results
plt.figure(figsize = (8,5))
plt.plot(train_losses, label = "Training Loss", marker = 'o')
plt.plot(test_losses, label = "Test Loss", marker = 'o')
plt.title("Training Loss vs Test Loss per Epoch")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.grid(True)
plt.legend()
plt.show()