In [None]:
# Import necessari
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset
import numpy as np
import pandas as pd
import json
import os
from sklearn.metrics import precision_recall_fscore_support, f1_score, precision_score, recall_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from google.colab import drive

In [None]:
# Variabili globali
VERSION = "v2"
WINDOW_SIZE = 60
STRIDE = 1

# Iperparametri del modello e di addestramento
DROPOUT_RATE = 0.2              # Dropout rate
LEARNING_RATE = 1e-4            # Learning rate
WEIGHT_DECAY = 1e-4             # Regolarizzazione L2 per l'optimizer
NUM_EPOCHS = 20                 # Epoche massime di addestramento
EMBEDDING_DIM = 16              # Dimensione di default per gli embedding
HIDDEN_DIM = 128                # Dimensione dello stato nascosto
NUM_LAYERS = 1                  # LSTM Unidirezionale
OUTPUT_DIM = 1                  # Dimensione dell'output del modello (binaria)
USE_BATCH_NORM_EMB = True       # Applica BatchNorm dopo la concatenazione degli embedding
USE_BATCH_NORM_LSTM_OUT = True  # Applica BatchNorm dopo l'output LSTM (prima dei linear heads)
BATCH_SIZE = 64                 # Dimensione dei batch per il training e la valutazione
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Montaggio Drive
drive.mount('/content/drive')

# Percorsi
base_path = '/content/drive/MyDrive/Colab Notebooks/Deep_Learning_2025_IV'
sequences_path = f'{base_path}/Sequences'
embeddings_path = f'{base_path}/Embeddings'
models_path = f'{base_path}/Models'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Percorsi ai file .npy salvati
sequences_folder_name = f"sequences_{VERSION}_W{WINDOW_SIZE}_S{STRIDE}"
user_split_strategy_folder_name = f"user_split_W{WINDOW_SIZE}_S{STRIDE}"
split_data_dir = f'{sequences_path}/{sequences_folder_name}/{user_split_strategy_folder_name}'

X_train_path = f'{split_data_dir}/X_train_normal_only.npy'
y_train_path = f'{split_data_dir}/y_train_normal_only.npy' # Valori 0
X_val_path = f'{split_data_dir}/X_val_user_split.npy'
y_val_path = f'{split_data_dir}/y_val_user_split.npy' # Valori 0 e 1
X_test_path = f'{split_data_dir}/X_test_user_split.npy'
y_test_path = f'{split_data_dir}/y_test_user_split.npy' # Valori 0 e 1

# Caricamento degli array NumPy preprocessati
print(f"Caricamento X_train da: {X_train_path}")
X_train_np = np.load(X_train_path)
print(f"Caricamento y_train da: {y_train_path}")
y_train_np = np.load(y_train_path)

print(f"Caricamento X_val da: {X_val_path}")
X_val_np = np.load(X_val_path)
print(f"Caricamento y_val da: {y_val_path}")
y_val_np = np.load(y_val_path)

print(f"Caricamento X_test da: {X_test_path}")
X_test_np = np.load(X_test_path)
print(f"Caricamento y_test da: {y_test_path}")
y_test_np = np.load(y_test_path)

# Stampa delle dimensioni dei dati caricati
print(f"\nForme dei dati caricati:")
print(f"X_train: {X_train_np.shape}, y_train: {y_train_np.shape}")
print(f"X_val:   {X_val_np.shape},  y_val:   {y_val_np.shape}")
print(f"X_test:  {X_test_np.shape}, y_test:  {y_test_np.shape}")

# File metadati embedding
# Percorso al file JSON con i dati degli embedding (vocab sizes)
embedding_data_path = f'{embeddings_path}/embedding_data_{VERSION}.json'

# Carica embedding_data per ottenere le vocab_sizes
with open(embedding_data_path, 'r') as f:
    embedding_data_json = json.load(f)

In [None]:
# Determina le feature da usare per le sequenze
# Lista delle feature usate per creare X_final_sequences
# Questa lista DEVE corrispondere a 'feature_cols_for_sequences' nel codice di preprocessing
categorical_feature_names = [
    'src_user', 'dst_user', 'src_comp', 'dst_comp',
    'auth_type', 'logon_type', 'auth_orientation', 'status'
]

# Nomi delle feature continue
continuous_feature_names = ['time_scaled_per_user'] # Assumendo che sia l'ultima/e colonna/e

# Determina il numero di feature categoriche e continue
num_categorical_features = len(categorical_feature_names)
num_continuous_features = len(continuous_feature_names)

print(f"Nomi feature categoriche (per embedding e vocab_size): {categorical_feature_names}")
print(f"Nomi feature continue: {continuous_feature_names}")

# Verifica la coerenza con X_train_np.shape[2]
# Il numero totale di feature deve corrispondere al numero di feature
# effettivamente presenti nei dati caricati
expected_total_features = num_categorical_features + num_continuous_features
if X_train_np.shape[2] != expected_total_features:
    print(f"ATTENZIONE: Il numero totale di feature attese ({expected_total_features})")
    print(f"non corrisponde alla terza dimensione di X_train ({X_train_np.shape[2]})!")
    print("Verifica l'ordine e i nomi in 'categorical_feature_names' e 'continuous_feature_names'.")
else:
    print(f"OK: Numero totale di feature ({expected_total_features}) corrisponde a X_train.shape[2].")

# Estrai le vocab_sizes solo per le feature categoriche
# Crea la lista 'ordered_vocab_sizes' che verrà passata al modello
ordered_vocab_sizes = []
# Itera su 'categorical_feature_names'
for original_col_name in categorical_feature_names:
    vocab_size_key = f'{original_col_name}_vocab_size'
    if vocab_size_key in embedding_data_json:
        ordered_vocab_sizes.append(embedding_data_json[vocab_size_key])
    else: # Fallback se la chiave non è trovata (improbabile)
        vocab_size_key_alt = f'{original_col_name.replace("_encoded", "")}_vocab_size'
        if vocab_size_key_alt in embedding_data_json:
            ordered_vocab_sizes.append(embedding_data_json[vocab_size_key_alt])
        else:
            raise ValueError(f"Vocab size non trovata per {original_col_name} in {embedding_data_path} (chiavi provate: {vocab_size_key}, {vocab_size_key_alt})")

print(f"\nDimensioni dei vocabolari (solo per feature categoriche): {ordered_vocab_sizes}")

In [None]:
class LSTM(nn.Module):
    def __init__(
        self,
        vocab_sizes,              # Lista delle dimensioni del vocabolario (per ogni feature categorica)
        embedding_dim,            # Dimensione dei vettori di embedding
        num_continuous_features,  # Numero di feature continue da concatenare
        hidden_dim,               # Numero di unità nel layer LSTM (dimensione dello stato nascosto)
        output_dim,               # Dimensione dell'output (1 per la classificazione binaria)
        num_layers,               # Numero di layer LSTM stacked
        dropout_rate=0.2,         # Dropout rate
        batch_first=True,         # Formato dell'input e dell'output dei tensori: (batch, seq, feature)
        use_batch_norm_emb=False,
        use_batch_norm_lstm_out=False
        ):

        super(LSTM, self).__init__()

        self.num_categorical_features = len(vocab_sizes)
        self.num_continuous_features = num_continuous_features
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.batch_first = batch_first
        self.use_batch_norm_emb = use_batch_norm_emb
        self.use_batch_norm_lstm_out = use_batch_norm_lstm_out

        # Creazione lista di Embedding layers, uno per ogni feature categorica di input
        # Ogni embedding layer ha la sua vocab_size specifica
        self.embeddings = nn.ModuleList([
            nn.Embedding(num_embeddings=v_size, embedding_dim=embedding_dim) for v_size in vocab_sizes
        ])

        # Calcola la dimensione totale degli embedding concatenati
        total_embedding_dim = self.num_categorical_features * embedding_dim

        # L'input_size effettivo per l'LSTM sarà la somma delle dimensioni
        # degli embedding e del numero di feature continue
        lstm_input_size = total_embedding_dim + self.num_continuous_features

        print(f"LSTM input size calcolata: {lstm_input_size} (Embeddings: {total_embedding_dim} + Continue: {self.num_continuous_features})")

        # Definizione del layer BatchNorm1d
        if self.use_batch_norm_emb and lstm_input_size  > 0 :
             self.bn_emb_concat = nn.BatchNorm1d(lstm_input_size) # Normalizza sull'intera dimensione delle feature concatenate

        # Definizione del layer LSTM
        self.lstm = nn.LSTM(input_size=lstm_input_size,
                            hidden_size=hidden_dim,
                            num_layers=self.num_layers,
                            dropout=dropout_rate , # if self.num_layers > 1 else 0,
                            batch_first=batch_first)

        # Definizione del layer BatchNorm1d da applicare all'output dell'LSTM
        if self.use_batch_norm_lstm_out:
            self.bn_lstm_out = nn.BatchNorm1d(hidden_dim) # Applicato all'output dell'LSTM

        # Layer di Dropout separato, applicato dopo l'LSTM e BatchNorm
        self.dropout = nn.Dropout(dropout_rate)

        # AI: Fully connected layer lineare per la classificazione finale
        self.fc = nn.Linear(in_features=hidden_dim, out_features=output_dim)

    def forward(
        self,
        x, # L'input tensor (batch_size, seq_length, num_input_features) se batch_first=True
        hidden_state=None # Opzionale per passare stati nascosti. Se None, viene inizializzato a zero
        ):

        batch_size = x.size(0)

        # Dividi l'input x in componenti categoriche e continue
        # Assumiamo che le prime 'num_categorical_features' colonne siano categoriche
        # e le successive 'num_continuous_features' siano continue.
        x_categorical = x[:, :, :self.num_categorical_features].long() # .long() per i layer di embedding
        x_continuous = x[:, :, self.num_categorical_features:] # Feature continue

        # Processa ogni feature categorica attraverso il suo layer di embedding
        embedded_features_list = []
        for i in range(self.num_categorical_features):
            feature_column = x_categorical[:, :, i]
            embedded_feature = self.embeddings[i](feature_column)
            embedded_features_list.append(embedded_feature)

        # Concatena gli embedding risultanti
        if self.num_categorical_features > 0:
            combined_embeddings = torch.cat(embedded_features_list, dim=2) # (batch, seq, num_cat_feat * emb_dim)
             # Concatena le feature continue (se presenti) agli embedding
            if self.num_continuous_features > 0:
                # Assicurati che x_continuous sia float
                lstm_input_features = torch.cat((combined_embeddings, x_continuous.float()), dim=2)
            else:
                lstm_input_features = combined_embeddings
        elif self.num_continuous_features > 0: # Solo feature continue
            lstm_input_features = x_continuous.float()
        else: # Nessuna feature, errore
            raise ValueError("Il modello deve avere almeno una feature categorica o continua.")

        # Batch Normalization dopo la concatenazione degli embedding e delle feature concatenate
        if hasattr(self, 'bn_emb_concat') and self.use_batch_norm_emb:
            # BatchNorm1d si aspetta (N, C) o (N, C, L)
            # Qui abbiamo (N, L, C). Occorre permutare
            lstm_input_features = lstm_input_features.permute(0, 2, 1) # (batch, features, seq_len)
            lstm_input_features = self.bn_emb_concat(lstm_input_features)
            lstm_input_features = lstm_input_features.permute(0, 2, 1) # (batch, seq_len, features)

        # Inizializzazione a zero degli stati nascosti (h0, c0) per l'LSTM se non forniti
        if hidden_state is None:
            h0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(x.device)
            c0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(x.device)
            hidden_state = (h0, c0)

        # Passaggio attraverso il layer LSTM
        # lstm_out contiene gli output dell'LSTM per ogni time step
        # hidden_state contiene l'ultimo stato nascosto e di cella
        lstm_out, hidden_state = self.lstm(lstm_input_features, hidden_state)

        # Per la classificazione della sequenza si usa l'output dell'LSTM all'ultimo time step
        last_time_step_out = lstm_out[:, -1, :] # (batch_size, hidden_dim)

        # Applica BatchNorm1d opzionale all'output dell'ultimo time step dell'LSTM
        if hasattr(self, 'bn_lstm_out') and self.use_batch_norm_lstm_out: # Aggiunto self.use_batch_norm_lstm_out
            last_time_step_out = self.bn_lstm_out(last_time_step_out)

        # Applica il Dropout
        out = self.dropout(last_time_step_out)

        # Passa attraverso il layer finale per ottenere il logit di output
        out = self.fc(out) # (batch_size, output_dim)

        # Restituisce i logits e l'ultimo stato nascosto
        return out, hidden_state

    # Metodo helper per inizializzare gli stati nascosti (non usato esplicitamente
    # perché l'LSTM li inizializza a zero di default se hidden_state=None)
    def init_hidden(self, batch_size, device):
        weight = next(self.parameters()).data
        hidden = (weight.new(self.num_layers, batch_size, self.hidden_dim).zero_().to(device),
                  weight.new(self.num_layers, batch_size, self.hidden_dim).zero_().to(device))
        return hidden

print("Modello LSTM definito.")

Modello LSTM definito.


In [None]:
print(f"Dimensioni dei vocabolari (nell'ordine delle feature): {ordered_vocab_sizes}")

# Istanzia il modello
model = LSTM(
    vocab_sizes=ordered_vocab_sizes,
    embedding_dim=EMBEDDING_DIM,
    num_continuous_features=num_continuous_features,
    hidden_dim=HIDDEN_DIM,
    output_dim=OUTPUT_DIM,
    num_layers=NUM_LAYERS,
    dropout_rate=DROPOUT_RATE,
    use_batch_norm_emb=USE_BATCH_NORM_EMB,
    use_batch_norm_lstm_out=USE_BATCH_NORM_LSTM_OUT
    )

print("\nModello LSTM istanziato:")
print(model) # Stampa la struttura del modello

# Sposta il modello sulla GPU se disponibile
model.to(device)
print(f"Modello spostato su: {device}")

# Creazione DataLoaders
# Conversione degli array NumPy in Tensori PyTorch
# Gli input X devono essere LongTensor per i layer di embedding
X_train_tensor = torch.from_numpy(X_train_np).float()
y_train_tensor = torch.from_numpy(y_train_np).float() # Per BCEWithLogitsLoss

X_val_tensor = torch.from_numpy(X_val_np).float()
y_val_tensor = torch.from_numpy(y_val_np).float()

X_test_tensor = torch.from_numpy(X_test_np).float()
y_test_tensor = torch.from_numpy(y_test_np).float()

# Creazione di TensorDataset, che incapsula tensori con la stessa prima dimensione
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# Creazione dei DataLoader, che gestiscono il batching, lo shuffle e il caricamento parallelo dei dati
# shuffle=True per il train_loader per ridurre la varianza del gradiente e migliorare la generalizzazione
# drop_last=True per il train_loader per evitare batch di dimensioni diverse che potrebbero dare problemi con alcuni layer
# shuffle=False per val_loader e test_loader perché l'ordine non deve cambiare per una valutazione consistente
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True) # Shuffle per il training
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

print("\nDataLoaders creati.")
print(f"Numero di batch in train_loader: {len(train_loader)}")
print(f"Numero di batch in val_loader: {len(val_loader)}")

# Esempio di un batch dal train_loader
# Utile per verificare le dimensioni dei tensori prodotti dal DataLoader
if len(train_loader) > 0: # Controllo per evitare errori se il loader è vuoto
    dataiter = iter(train_loader)
    sample_x, sample_y = next(dataiter)
    print(f"\nForma di un batch di input X: {sample_x.shape}") # (BATCH_SIZE, WINDOW_SIZE, num_total_features=9)
    print(f"Forma di un batch di output y: {sample_y.shape}") # (BATCH_SIZE)
else:
    print("Train loader è vuoto.")

In [None]:
# Per classificazione binaria con logits in output
# Il modello impara cosa è "normale" cercando di predire 0 (o un logit molto negativo)
# Le anomalie (non viste in training) dovrebbero produrre logits meno negativi o positivi
criterion = nn.BCEWithLogitsLoss() # Combina la sigmoide e la BCE

optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
print("Loss function e optimizer definiti.")

# Setup per l'Early Stopping
best_val_loss = float('inf') # Inizializza la miglior loss di validazione a infinito
epochs_no_improve = 0 # Contatore per le epoche senza miglioramento
patience = 2 # Numero di epoche da attendere prima di fermare il training se la Val Loss non migliora
best_model_weights_path = f"{models_path}/lstm_best_model_weights.pt"

print(f"\nInizio training per {NUM_EPOCHS} epoche...")

for epoch in range(NUM_EPOCHS):
    model.train() # Mette il modello in modalità training
    train_loss_accum = 0

    # Itera sui batch del training set
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Azzera i gradienti accumulati dal batch precedente
        optimizer.zero_grad()

        # Forward pass: ottiene i logits dal modello
        # Il secondo output (stato nascosto) dell'LSTM non è usato qui
        outputs, _ = model(inputs)

        # Calcola la loss
        # .squeeze() rimuove la dimensione superflua (es. da [64,1] a [64])
        loss = criterion(outputs.squeeze(), labels)

        # Calcola i gradienti della loss rispetto ai parametri del modello (backpropagation)
        loss.backward()

        # Aggiorna i pesi del modello usando i gradienti calcolati
        optimizer.step()

        # Accumula la loss del batch
        train_loss_accum += loss.item()

    # Calcolo loss media di training per l'epoca
    avg_train_loss = train_loss_accum / len(train_loader)

    # VALIDATION LOOP
    model.eval() # Mette il modello in modalità valutazione
    val_loss_accum = 0
    all_val_preds = []
    all_val_labels = []

    with torch.no_grad(): # Disabilita il calcolo dei gradienti durante la validazione
        for inputs_val, labels_val in val_loader:
            inputs_val, labels_val = inputs_val.to(device), labels_val.to(device)

            outputs_val, _ = model(inputs_val)

            val_loss = criterion(outputs_val.squeeze(), labels_val)
            val_loss_accum += val_loss.item()

    avg_val_loss = val_loss_accum / len(val_loader)

    print(f"Epoch {epoch+1}/{NUM_EPOCHS} | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")

    # Logica di Early Stopping
    # Se la Val Loss attuale è migliore della migliore finora
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        # Salva i pesi del modello
        torch.save(model.state_dict(), best_model_weights_path)
        print(f"  Miglioramento Val Loss: {best_val_loss:.4f}. Modello salvato in {best_model_weights_path}")
        # Resetta il contatore delle epoche senza miglioramento
        epochs_no_improve = 0
    else: # Se la Val Loss non è migliorata
        epochs_no_improve += 1
        print(f"  Nessun miglioramento Val Loss per {epochs_no_improve} epoche.")
    # Se la Val Loss non migliora per 'patience' epoche consecutive
    if epochs_no_improve >= patience:
        print(f"Early stopping attivato dopo {epoch+1} epoche.")
        break # Interrompi il training

print("\nTraining completato.")

print(f"Caricamento del modello con la migliore Val Loss da: {best_model_weights_path}")
model.load_state_dict(torch.load(best_model_weights_path))
print("Modello migliore caricato.")

In [None]:
import torch
import numpy as np
import torch.nn.functional as F

def get_predictions_and_labels(model, data_loader, device_eval):
    model.eval()  # Mette il modello in modalità valutazione
    all_labels_eval = []
    all_probs_eval = []

    with torch.no_grad():  # Disabilita il calcolo dei gradienti
        for inputs_eval, labels_eval in data_loader:
            inputs_eval = inputs_eval.to(device_eval)

            # Ottieni l'output del modello
            model_output = model(inputs_eval)

            # Gestisce il caso in cui il modello restituisca una tupla (es. logits, attention_weights)
            # o solo i logits (per il modello LSTM attuale, model_output è già una tupla) (outputs, hidden_state)
            if isinstance(model_output, tuple):
                outputs_logits_eval = model_output[0] # Prende il primo elemento (i logits)
            else: # Il modello restituisce solo un tensore (i logits)
                outputs_logits_eval = model_output

            # Rimuove dimensioni superflue
            squeezed_logits = outputs_logits_eval.squeeze()
            # Converte i logits in probabilità
            probs_for_batch = torch.sigmoid(squeezed_logits)

            # Raccoglie le etichette vere
            all_labels_eval.extend(labels_eval.cpu().numpy().tolist())

            # Converti le probabilità in NumPy e gestisci il caso 0-D
            np_probs_for_batch = probs_for_batch.cpu().numpy()

            # Caso in cui probs_for_batch è uno scalare (se batch_size=1 e squeeze() rimuove tutte le dim)
            if np_probs_for_batch.ndim == 0:  # È uno scalare (array 0-D)
                all_probs_eval.append(float(np_probs_for_batch))
            else:
                all_probs_eval.extend(np_probs_for_batch.tolist())
    return np.array(all_labels_eval), np.array(all_probs_eval)

print("Funzione get_predictions_and_labels definita.")

Funzione get_predictions_and_labels definita.


In [None]:
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns

# Ottieni predizioni una sola volta
print("Ottenimento predizioni finali dal modello migliore...")
# Carica il modello migliore e ottiene le predizioni (probabilità)
# e le etichette vere per il validation e test set
val_labels_final, val_probs_final = get_predictions_and_labels(model, val_loader, device)
test_labels_final, test_probs_final = get_predictions_and_labels(model, test_loader, device)
print("Predizioni finali ottenute.")

# Visualizzazione della distribuzione delle probabilità (VALIDATION SET)
# Questa sezione serve a capire come il modello distribuisce le probabilità
# per le classi normali e anomale sul validation set. Serve una buona separazione
print("\nDistribuzione delle probabilità predette sul Validation Set:")
val_probs_normal = val_probs_final[val_labels_final == 0]
val_probs_anomalous = val_probs_final[val_labels_final == 1]

plt.figure(figsize=(12, 6))
plt.hist(val_probs_normal, bins=50, alpha=0.7, label=f'Normali (Validation) (n={len(val_probs_normal)})', color='blue', density=True)
if len(val_probs_anomalous) > 0:
    plt.hist(val_probs_anomalous, bins=50, alpha=0.7, label=f'Anomale (Validation) (n={len(val_probs_anomalous)})', color='red', density=True)
else:
    print("Nessuna anomalia nel validation set per plottare il suo istogramma.")
plt.title('Distribuzione delle Probabilità Predette sul Validation Set (dal Modello Migliore)')
plt.xlabel('Probabilità Predetta di Anomalia')
plt.ylabel('Densità')
plt.legend()
plt.yscale('log') # Utile se le probabilità sono molto piccole
plt.show()

# Stampa statistiche descrittive delle probabilità per le due classi
print(f"Statistiche val_probs_normal: N={len(val_probs_normal)}, Min={np.min(val_probs_normal):.2e}, Max={np.max(val_probs_normal):.2e}, Mean={np.mean(val_probs_normal):.2e}, Median={np.median(val_probs_normal):.2e}")
if len(val_probs_anomalous) > 0:
    print(f"Statistiche val_probs_anomalous: N={len(val_probs_anomalous)}, Min={np.min(val_probs_anomalous):.2e}, Max={np.max(val_probs_anomalous):.2e}, Mean={np.mean(val_probs_anomalous):.2e}, Median={np.median(val_probs_anomalous):.2e}")

# Determinazione della soglia ottimale con il metodo del percentile (VALIDATION SET)
print("\n--- Determinazione Soglia Ottimale con Metodo Percentile (su Validation Set) ---")
percentiles_to_try = [85, 90, 95, 97, 98, 99, 99.5, 99.9]
best_f1_val_percentile = -1 # Inizializza a -1.0 per assicurare che qualsiasi F1 valido sia maggiore
final_optimal_threshold = 0.5 # Soglia di default se non si trovano anomalie
best_percentile_chosen = 0
num_anomalies_val = np.sum(val_labels_final == 1)
print(f"Numero di anomalie effettive nel Validation Set: {num_anomalies_val}")

# Cerca la soglia percentile che massimizza l'F1-score sulla classe anomala
if len(val_probs_final) > 0 and num_anomalies_val > 0:
    for p_val in percentiles_to_try:
        # Calcola la soglia per il percentile corrente
        current_threshold = np.percentile(val_probs_final, p_val)
        # Classifica in base alla soglia
        val_preds_p = (val_probs_final >= current_threshold).astype(int)

        precision_p = precision_score(val_labels_final, val_preds_p, pos_label=1, zero_division=0)
        recall_p = recall_score(val_labels_final, val_preds_p, pos_label=1, zero_division=0)
        f1_p = f1_score(val_labels_final, val_preds_p, pos_label=1, zero_division=0)

        print(f"  Percentile: {p_val:>5}% -> Soglia: {current_threshold:.2e} | P: {precision_p:.4f}, R: {recall_p:.4f}, F1: {f1_p:.4f}")

        # Ottimizza per F1-score
        if f1_p > best_f1_val_percentile:
            best_f1_val_percentile = f1_p
            final_optimal_threshold = current_threshold
            best_percentile_chosen = p_val
    print(f"\nMiglior Soglia (Percentile) scelta: {final_optimal_threshold:.2e} (dal {best_percentile_chosen}° percentile, F1 su Val: {best_f1_val_percentile:.4f})")
elif num_anomalies_val == 0:
    print("Nessuna anomalia nel validation set, non posso ottimizzare la soglia percentile in modo significativo. Uso default 0.5.")
else: # val_probs_final è vuoto
    print("val_probs_final è vuoto, non posso calcolare la soglia. Uso default 0.5.")

# Valutazione sul validation set con la soglia scelta
print(f"\n--- Performance sul Validation Set con Soglia Finale ({final_optimal_threshold:.2e}) ---")
val_predicted_labels_final = (val_probs_final >= final_optimal_threshold).astype(int)
print(classification_report(val_labels_final, val_predicted_labels_final, target_names=['Normale (0)', 'Anomalo (1)'], zero_division=0))
cm_val = confusion_matrix(val_labels_final, val_predicted_labels_final)
plt.figure(figsize=(6,4))
sns.heatmap(cm_val, annot=True, fmt='d', cmap='BuGn', xticklabels=['Pred. Normale', 'Pred. Anomalo'], yticklabels=['Vero Normale', 'Vero Anomalo'])
plt.title(f'Validation Set - Confusion Matrix (Soglia={final_optimal_threshold:.2e})')
plt.xlabel('Etichetta Predetta'); plt.ylabel('Etichetta Vera'); plt.show()

# Valutazione finale sul test set con la soglia scelta
print(f"\n--- Performance Finale sul Test Set con Soglia ({final_optimal_threshold:.2e}) ---")
test_predicted_labels_final = (test_probs_final >= final_optimal_threshold).astype(int)

print("\nClassification Report completo sul Test Set:")
print(classification_report(test_labels_final, test_predicted_labels_final, target_names=['Normale (0)', 'Anomalo (1)'], zero_division=0))

print("\nConfusion Matrix sul Test Set:")
cm_test = confusion_matrix(test_labels_final, test_predicted_labels_final)
plt.figure(figsize=(6,4))
sns.heatmap(cm_test, annot=True, fmt='d', cmap='BuGn', xticklabels=['Pred. Normale', 'Pred. Anomalo'], yticklabels=['Vero Normale', 'Vero Anomalo'])
plt.title(f'Test Set - Confusion Matrix (Soglia={final_optimal_threshold:.2e})')
plt.xlabel('Etichetta Predetta'); plt.ylabel('Etichetta Vera'); plt.show()

# Conteggio finale dei veri positivi sul test set
num_anomalies_test = np.sum(test_labels_final == 1)
true_positives_test = cm_test[1, 1] if num_anomalies_test > 0 and cm_test.shape == (2,2) else 0
print(f"Numero di anomalie effettive nel Test Set: {num_anomalies_test}")
if num_anomalies_test > 0:
    print(f"Anomalie correttamente identificate (TP) nel Test Set: {true_positives_test} su {num_anomalies_test}")