## Machine Learning exam, Andrea Milici

In [1]:
from data.DataExplorator import DataExplorator
from data.DataLoader import DataLoader
from data.torch_Dataset import torch_Dataset
from torch.utils.data import random_split

import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn

from utils.conv import output_dim
from utils.model_utils import count_parameters,reset_weights
from utils.seed import set_seed
set_seed(42)

In [2]:
# Carica il dataset, di default lo converte in (N,24,15,15) e ritorna un dataframe pandas
dataloader = DataLoader("/auto_atlas/atlas/atlas_gen_fs/amilici/other_samples/ml_exam_dataset/data_quench.json") # transform_to_2d= True di default
data= dataloader.get_data()

In [3]:
data[data.label==1]

Unnamed: 0,sequence,label,quench,num_quench
8,"[[[22.582478486789327, 22.904808739314365, 22....",1,"[{'step': 18, 'pixel': [2, 1], 'temp': 25.1909...",1.0
9,"[[[22.0373877189044, 18.636944579237863, 21.76...",1,"[{'step': 8, 'pixel': [13, 6], 'temp': 25.0312...",7.0
20,"[[[21.411886934355916, 23.16618378647687, 20.1...",1,"[{'step': 14, 'pixel': [7, 7], 'temp': 24.8999...",1.0
21,"[[[21.276068279076245, 20.97700816985905, 21.2...",1,"[{'step': 8, 'pixel': [13, 6], 'temp': 24.4713...",2.0
23,"[[[21.69330864976891, 19.39115826850467, 20.55...",1,"[{'step': 15, 'pixel': [10, 13], 'temp': 25.69...",1.0
...,...,...,...,...
2946,"[[[21.740674455238747, 19.547169172311275, 20....",1,"[{'step': 11, 'pixel': [8, 4], 'temp': 24.7140...",4.0
2966,"[[[19.118253683343585, 21.739494679368477, 21....",1,"[{'step': 5, 'pixel': [9, 5], 'temp': 24.73965...",3.0
2977,"[[[20.366059839869674, 21.313603736769316, 20....",1,"[{'step': 18, 'pixel': [6, 13], 'temp': 24.451...",1.0
2980,"[[[20.94590826562088, 22.970579566231, 21.8976...",1,"[{'step': 7, 'pixel': [9, 10], 'temp': 25.7973...",5.0


# Data exploration

In [None]:
exploration = DataExplorator(data) 
# Vediamo una sequenza di heatmap con label == 1
exploration.visualize_sequence(quenched=True,debug=True)
# Vediamo una sequenza di heatmap con label == 0
exploration.visualize_sequence(quenched=False)
exploration.more_data_exploration()

In [None]:
#Preprocessing: Normalizzazione dei dati
# Trova min e max globali su tutto il dataset

all_values = np.concatenate([sequence.flatten() for sequence in data.sequence])
global_min = np.min(all_values)
global_max = np.max(all_values)
denom = global_max - global_min
data.sequence = (data.sequence- global_min)/denom

In [None]:
from sklearn.utils import resample
import pandas as pd

downsapmping = False

if downsapmping:
    # Esegui il downsampling per bilanciare le classi
    print("Eseguo il downsampling per bilanciare le classi")
    # Supponiamo che df sia il tuo DataFrame con una colonna 'label'
    class0 = data[data.label == 0]
    class1 = data[data.label == 1]

    # Sottocampiona la classe 0
    class0_downsampled = resample(class0, 
                                       replace=False,     # no replacement
                                       n_samples=300,     # come la classe minoritaria
                                       random_state=42)

    # Combina le due classi
    df_balanced = pd.concat([class0_downsampled, class1], ignore_index=True)

    # Shuffle (facoltativo ma consigliato)
    data = df_balanced.sample(frac=1, random_state=42).copy()


In [None]:
'''
from data.torch_Dataset import torch_Dataset
# Creazione del dataset PyTorch
full_dataset = torch_Dataset(data, grid_size=15)


# Creazione del DataLoader

# Train/test split
test_ratio = 0.2
test_size = int(len(full_dataset) * test_ratio)
train_size = len(full_dataset) - test_size
train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])

# Creazione dei DataLoader per il training e il test
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


from models.CNN_3D import CNN_3D
model = CNN_3D()

# Definizione della loss
pos_weight= torch.tensor([len(data[data.label==0])/len(data[data.label==1])])  # Peso per la classe positiva, da regolare in base al dataset
print("Pos weight:", pos_weight)

loss_BCEWithLogits= nn.BCEWithLogitsLoss(pos_weight=None)  # Per classificazione binaria


total_params, trainable_params = count_parameters(model)
print(f"Parametri totali: {total_params}")
print(f"Parametri trainabili: {trainable_params}")

#---Training---

from train.trainer import TrainerCNN3D
from train.trainer import Results

trainer = TrainerCNN3D(model, loss_BCEWithLogits,optimizer='Adam',lr=1e-3, num_epochs=100, train_loader=train_loader ,val_loader=test_loader)
results=trainer.run()
'''

In [None]:
'''
def plot_results(results,trainer):

    # Estrai le metriche dai risultati
    train_losses_per_batch = results.get_train_losses_per_batch()
    train_losses = results.get_train_loss()
    test_losses = results.get_test_loss()
    test_accuracy_per_epoch = results.get_test_accuracy()
    num_epochs = trainer.num_epochs


    # Calcola il numero di batch per epoca
    num_batches_per_epoch = len(train_losses_per_batch) // num_epochs

    # Crea un asse x "frazionario" per le batch, spostato in modo che l'epoca 1 inizi da x=1
    batch_x_axis = np.linspace(1, num_epochs, len(train_losses_per_batch))

    # Asse x per le medie per epoca, da 1 a num_epochs inclusi
    epoch_x_axis = np.arange(1, num_epochs + 1)

    # Plotting delle loss
    plt.figure(figsize=(12, 6))

    # Loss per batch (sfumata)
    plt.plot(batch_x_axis, np.array(train_losses_per_batch), label='Train Loss per Batch', alpha=0.3, color='orange')

    # Loss medie per epoca
    plt.plot(epoch_x_axis, train_losses, label='Avg Train Loss per Epoch', color='blue', linewidth=2)
    plt.plot(epoch_x_axis, test_losses, label='Test Loss per Epoch', color='red', linestyle='--', linewidth=2)

    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.ylim(0, max(max(train_losses), max(test_losses)) * 2)  # Imposta un limite superiore dinamico
    plt.title('Training and Test Losses')
    plt.legend()
    plt.grid(True)

    plt.show()
    
    # Accuracy
    plt.figure(figsize=(10, 5))
    plt.plot(epoch_x_axis, test_accuracy_per_epoch, label='Test Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.title('Test Accuracy per Epoch')
    plt.legend()
    plt.grid(True)
    
    plt.show()

x_all = torch.stack([x for x, y in test_dataset])
y_all = torch.stack([y for x, y in test_dataset])

model.eval()
model.to('cpu')
with torch.no_grad():
    logits = model(x_all)
    probs = torch.sigmoid(logits)
    preds = (probs >= 0.5).float()


from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, precision_recall_curve, roc_curve
import matplotlib.pyplot as plt



# Classificazione standard
print(classification_report(y_all, preds))
print(confusion_matrix(y_all, preds))

# ROC AUC
print("ROC AUC:", roc_auc_score(y_all,probs))

# Curve
fpr, tpr, _ = roc_curve(y_all, probs)
precision, recall, _ = precision_recall_curve(y_all, probs)

plt.figure(figsize=(12,5))

# ROC
plt.subplot(1,2,1)
plt.plot(fpr, tpr, label='ROC Curve')
plt.plot([0,1],[0,1],'--',color='gray')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate (Recall)")
plt.title("ROC Curve")
plt.legend()

# PR Curve
plt.subplot(1,2,2)
plt.plot(recall, precision, label='PR Curve', color='green')
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve")
plt.legend()

plt.tight_layout()
plt.show()
from utils.plot import plot_confusion_matrix

plot_confusion_matrix(y_all, preds, labels=["Unquenched", "Quenched"])

torch.cuda.empty_cache()

'''

## Loss function del VAE

La funzione di loss di un VAE è composta da due termini:

- **Errore di ricostruzione**: misura quanto bene il decoder riesce a ricostruire l'input a partire dal vettore latente.
- **Div. di Kullback-Leibler (KL)**: regolarizza la distribuzione latente per essere vicina a una normale standard \( \mathcal{N}(0, I) \).

La formula completa è:

$$
\mathcal{L}_{\text{VAE}} = \underbrace{\text{MSE}(x, \hat{x})}_{\text{ricostruzione}} + \underbrace{D_{\text{KL}}\left( \mathcal{N}(\mu, \sigma^2) \,\|\, \mathcal{N}(0, 1) \right)}_{\text{regolarizzazione}}
$$

Esplicitando il termine KL:

$$
D_{\text{KL}} = -\frac{1}{2} \sum_{i=1}^d \left( 1 + \log(\sigma_i^2) - \mu_i^2 - \sigma_i^2 \right)
$$

Dove:
- \( \mu \) e \( \sigma \) sono i parametri della distribuzione latente \( q(z|x) \)
- \( d \) è la dimensione dello spazio latente


In [None]:
from models.VAE import VAE
from models.ConvAE import ConvAE

In [None]:
def vae_loss(x, x_hat, mu, logvar,beta=1):
    # Ricostruzione (mean squared error)
    recon_loss = nn.functional.mse_loss(x_hat, x, reduction='mean')

    # KL divergence (differenza tra z ~ q(z|x) e N(0,I))
    kl_div = -0.5 * torch.mean(1 + logvar - mu.pow(2) - logvar.exp())
    
    return recon_loss + beta*kl_div


In [None]:
def cae_loss(x, x_hat):
    # Ricostruzione (mean squared error)
    return nn.functional.mse_loss(x_hat, x, reduction='mean')

In [None]:
is_quenched = data.label == 1
is_unquenched = data.label == 0

quenched_dataset = torch_Dataset(data[is_quenched], grid_size=15)
unquenched_dataset = torch_Dataset(data[is_unquenched], grid_size=15)

num_test_normal = 300  # Numero di sequenze normali per il test
num_train= int((len(unquenched_dataset) - num_test_normal)*0.90)
num_val = int((len(unquenched_dataset) - num_test_normal)*0.10)

from torch.utils.data import DataLoader
normal_train, normal_val, normal_test = random_split(unquenched_dataset, [num_train, num_val, num_test_normal])
batch_size = 10
train_loader = DataLoader(normal_train, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(normal_val, batch_size=batch_size, shuffle=False)

test_dataset = quenched_dataset + normal_test
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
print(f"Train size: {len(train_loader.dataset)}, Val size: {len(val_loader.dataset)}, Test size: {len(test_loader.dataset)}")



In [None]:
from train.trainer import TrainerVAE

model_vae = VAE(latent_dim=2,k=1)
count_parameters(model_vae)

trainer_vae = TrainerVAE(model_vae, vae_loss, optimizer='Adam', lr=1e-4, num_epochs=50, train_loader=train_loader, val_loader=val_loader)
results_vae = trainer_vae.run()



In [None]:
# Plotting the training and validation losses
plt.figure(figsize=(10, 5))
plt.plot(results_vae.get_val_loss(), label='Validation Loss', color='orange')
plt.plot(results_vae.get_train_loss(), label='Train Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('VAE Training Loss')
plt.legend()
plt.grid(True)
plt.savefig(f"plots/vae_training_loss_{trainer_vae.num_epochs}_epochs_{trainer_vae.lr}_lr.png")
plt.show()

In [None]:

import torch.nn.functional as F
from sklearn.metrics import roc_auc_score, roc_curve, accuracy_score, precision_score, recall_score, f1_score#
from utils.plot import plot_latent_space_2d
from utils.plot import plot_confusion_matrix


def test_anomaly_detection(model, trainer,test_loader,model_name):
    lr = trainer.lr
    num_epochs = trainer.num_epochs
    device= trainer.device

    model.eval()
    scores = []
    labels = []
    mus = []

    with torch.no_grad():
        for x, y in test_loader:
            x = x.to(device)
            x_hat = model(x)
            mu, logvar = model.encode(x)
            
            # Calcola loss di ricostruzione sommata per ogni esempio (batch element)
            recon_loss = F.mse_loss(x_hat, x, reduction='none')
            recon_loss = recon_loss.view(x.size(0), -1).sum(dim=1)  # somma su tutti i pixel/timepoint
            scores.append(recon_loss.cpu())
            labels.append(y)
            mus.append(mu.cpu())

    scores = torch.cat(scores).numpy()
    labels = torch.cat(labels).numpy().reshape(-1)
    mus = torch.cat(mus).numpy()
    # Visualizzazione dello spazio latente
    plot_latent_space_2d(mus,labels,lr,num_epochs, only_quenched=False)

    # Histograms of scores
    plt.figure(figsize=(10, 5))
    plt.hist(scores[labels == 0], bins=40, alpha=0.5, label='Normal', color='blue')
    plt.hist(scores[labels == 1], bins=40, alpha=0.5, label='Anomalous', color='red')
    plt.xlabel('scores')
    plt.ylabel('Frequency')
    plt.title('VAE Reconstruction scores')
    plt.legend()
    plt.grid(True)
    mean_normal = scores[labels == 0].mean()
    mean_anomalous = scores[labels == 1].mean()

    plt.text(0.05, 0.95,f"lr={lr}, epochs={num_epochs} \nMean Normal: {mean_normal:.4f} \nMean Anomalous: {mean_anomalous:.4f} \nDistance: {abs(mean_normal - mean_anomalous):.4f}",
             transform=plt.gca().transAxes, fontsize=10, verticalalignment='top')
    plt.savefig(f"plots/{model_name}_hist_scores_lr_{lr}_epochs_{num_epochs}.png")
    plt.show()


    # Calcolo ROC AUC
    auc = roc_auc_score(labels, scores)
    print(f"ROC AUC: {auc:.4f}")

    # Calcolo ROC curve per trovare soglia ottimale (Youden’s J statistic)
    fpr, tpr, thresholds = roc_curve(labels, scores)
    youden_j = tpr - fpr
    best_idx = np.argmax(youden_j)
    best_threshold = thresholds[best_idx]

    print(f"Soglia ottimale (Youden's J): {best_threshold:.4f}")

    # Classificazione binaria con soglia ottimale
    preds = (scores > best_threshold).astype(int)

    acc = accuracy_score(labels, preds)
    prec = precision_score(labels, preds)
    rec = recall_score(labels, preds)
    f1 = f1_score(labels, preds)

    print(f"Accuracy: {acc:.4f}")
    print(f"Precision: {prec:.4f}")
    print(f"Recall: {rec:.4f}")
    print(f"F1-score: {f1:.4f}")


    # Plot ROC curve
    plt.figure(figsize=(6,6))
    plt.plot(fpr, tpr, label=f'ROC curve (AUC = {auc:.3f})')
    plt.plot([0,1], [0,1], 'k--')
    plt.scatter(fpr[best_idx], tpr[best_idx], marker='o', color='red', label='Best threshold')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve per Anomaly Detection')
    plt.legend()
    plt.grid(True)

    plot_confusion_matrix(labels, preds, labels=["Normal", "Anomalous"],title=f"{model_name}-lr_{lr}_epochs_{num_epochs}_confusion_matrix")
    
    plt.show()
    return best_threshold, auc, acc, prec, rec, f1


In [None]:
best_threshold, auc, acc, prec, rec, f1 = test_anomaly_detection(model_vae, trainer_vae,test_loader,'VAE')
torch.cuda.empty_cache() 

In [None]:
from train.trainer import TrainerConvAE
model_cae = ConvAE(latent_dim=2,k=1)
count_parameters(model_cae)

trainer_cae = TrainerConvAE(model_cae, cae_loss, optimizer='Adam', lr=1e-4, num_epochs=50, train_loader=train_loader, val_loader=val_loader)
results_cae = trainer_cae.run()
# Plotting the training and validation losses
plt.figure(figsize=(10, 5))
plt.plot(results_cae.get_val_loss(), label='Validation Loss', color='orange')
plt.plot(results_cae.get_train_loss(), label='Train Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('ConvAE Training Loss')
plt.legend()
plt.grid(True)
plt.savefig(f"plots/conv_ae_training_loss_{trainer_cae.num_epochs}_epochs_{trainer_cae.lr}_lr.png")
plt.show()
best_threshold, auc, acc, prec, rec, f1 = test_anomaly_detection(model_cae, trainer_cae,test_loader,'ConvAE')

