In [None]:
import os
import re
import pandas as pd
from tensorflow.keras.callbacks import Callback
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping


def test(): return 'prova'

class CSVLoggerCallback(Callback):
    def __init__(self, path):
        super(CSVLoggerCallback, self).__init__()
        self.path = path
        self.history_file = os.path.join(self.path, 'training_history.csv')

        # Crea la directory se non esiste
        if not os.path.exists(self.path):
            os.makedirs(self.path)

        # Controllo se il file CSV esiste già
        self.file_exists = os.path.exists(self.history_file)

    def on_epoch_end(self, epoch, logs=None):
        # Estrazione dei dati di interesse alla fine dell'epoca
        logs = logs or {}
        epoch_data = {
            'epoch': [epoch + 1],
            'training_accuracy': [logs.get('accuracy')],
            'training_loss': [logs.get('loss')],
            'validation_accuracy': [logs.get('val_accuracy')],
            'validation_loss': [logs.get('val_loss')]
        }

        # Creiamo un DataFrame per la singola epoca
        epoch_df = pd.DataFrame(epoch_data)

        # Se il file esiste già, appendo i nuovi dati senza intestazione
        if self.file_exists:
            epoch_df.to_csv(self.history_file, mode='a', header=False, index=False)
        else:
            # Se il file non esiste, lo creo con l'intestazione
            epoch_df.to_csv(self.history_file, index=False)
            self.file_exists = True  # Ora il file esiste

        print(f"Salvati i dati dell'epoca {epoch + 1} in {self.history_file}")



def generate_plot(path, epoch=0):
    # Percorso del file CSV contenente la storia dell'addestramento
    history_file = os.path.join(path, 'training_history.csv')

    # Controlla se il file CSV esiste
    if not os.path.exists(history_file):
        print(f"File {history_file} non trovato!")
        return

    # Carica i dati dal file CSV
    try:
        history_df = pd.read_csv(history_file)
    except Exception as e:
        print(f"Errore durante la lettura del CSV: {e}")
        return

    # Verifica che le colonne necessarie esistano nel file CSV
    required_columns = ['epoch', 'training_accuracy', 'validation_accuracy', 'training_loss', 'validation_loss']
    if not all(col in history_df.columns for col in required_columns):
        print("Il file CSV non contiene tutte le colonne richieste.")
        return

    # Determina il numero massimo di epoche presenti nel file
    max_epoch_in_data = history_df['epoch'].max()

    # Se l'epoch specificata è maggiore del numero di epoche disponibili, usa quella massima
    if epoch > 0:
        epoch = min(epoch, max_epoch_in_data)
        history_df = history_df[history_df['epoch'] <= epoch]
    else:
        epoch = max_epoch_in_data  # Usa tutte le epoche se epoch è 0

    # Generazione del grafico di accuracy
    plt.plot(history_df['epoch'], history_df['training_accuracy'], label='Train')
    plt.plot(history_df['epoch'], history_df['validation_accuracy'], label='Validation')
    plt.title('Model accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(loc='upper left')
    accuracy_plot_path = os.path.join(path, 'accuracy_plot.png')
    plt.savefig(accuracy_plot_path)
    plt.show()  # Mostra il grafico di accuracy
    plt.close()  # Chiude il grafico corrente
    print(f"Salvato grafico di accuracy in: {accuracy_plot_path}")

    # Generazione del grafico di loss
    plt.plot(history_df['epoch'], history_df['training_loss'], label='Train')
    plt.plot(history_df['epoch'], history_df['validation_loss'], label='Validation')
    plt.title('Model loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(loc='upper left')
    loss_plot_path = os.path.join(path, 'loss_plot.png')
    plt.savefig(loss_plot_path)
    plt.show()  # Mostra il grafico di loss
    plt.close()  # Chiude il grafico corrente
    print(f"Salvato grafico di loss in: {loss_plot_path}")




def show_plots(path):
    # Percorso dei file di grafici salvati
    accuracy_plot_path = os.path.join(path, 'accuracy_plot.png')
    loss_plot_path = os.path.join(path, 'loss_plot.png')

    # Controlla e mostra il grafico di accuracy
    if os.path.exists(accuracy_plot_path):
        accuracy_img = mpimg.imread(accuracy_plot_path)
        plt.imshow(accuracy_img)
        plt.axis('off')  # Rimuove gli assi
        plt.title('Model Accuracy')
        plt.show()  # Mostra il grafico di accuracy
    else:
        print(f"Grafico di accuracy non trovato in: {accuracy_plot_path}")

    # Controlla e mostra il grafico di loss
    if os.path.exists(loss_plot_path):
        loss_img = mpimg.imread(loss_plot_path)
        plt.imshow(loss_img)
        plt.axis('off')  # Rimuove gli assi
        plt.title('Model Loss')
        plt.show()  # Mostra il grafico di loss
    else:
        print(f"Grafico di loss non trovato in: {loss_plot_path}")




def get_latest_epoch_number(folder_path):
    # Percorso del file CSV
    history_file = os.path.join(folder_path, 'training_history.csv')

    # Controlla se il file CSV esiste
    if not os.path.exists(history_file):
        return None  # Se il file non esiste, restituisci None

    # Legge il file CSV
    try:
        history_df = pd.read_csv(history_file)

        # Controlla se ci sono dati nel file CSV
        if history_df.empty:
            return None  # Se il CSV è vuoto, restituisci None

        # Ottiene l'ultima epoca dal campo 'epoch'
        last_epoch = history_df['epoch'].iloc[-1]  # Legge l'ultima riga del campo 'epoch'

        return f"{int(last_epoch):02d}"  # Restituisce l'epoca formattata con due cifre
    except Exception as e:
        print(f"Errore durante la lettura del CSV: {e}")
        return None




def get_best_epoch_number(folder_path):
    # Espressione regolare per trovare i file con il pattern "model_epoch_0N.keras"
    pattern = r'model_epoch_(\d+)\.keras'

    # Lista per tenere traccia dei numeri di epoca
    epoch_numbers = []

    # Cerca i file nella cartella
    for filename in os.listdir(folder_path):
        # Cerca una corrispondenza con il pattern
        match = re.match(pattern, filename)
        if match:
            epoch_num = int(match.group(1))  # Estrae il numero dell'epoca
            epoch_numbers.append(epoch_num)

    # Controlla se ci sono file corrispondenti
    if not epoch_numbers:
        return None  # Nessun file trovato con il pattern

    # Restituisce il numero di epoca maggiore, formattato sempre con due cifre
    latest_epoch = max(epoch_numbers)
    return f"{latest_epoch:02d}"  # Formatta il numero con due cifre




def load_model(model, model_path):
    # Carica il modello dai pesi salvati
    best_model = get_best_epoch_number(model_path)
    model.load_weights(os.path.join(model_path, "model_epoch_" + best_model + ".keras"))

    # Stampare il sommario del modello
    print(model.summary())

    # Percorsi dei grafici salvati
    accuracy_path = os.path.join(os.path.dirname(model_path), 'accuracy_plot.png')
    loss_path = os.path.join(os.path.dirname(model_path), 'loss_plot.png')

    # Mostra i plot
    show_plots(model_path)

    # Percorso del file CSV con la storia del training
    history_csv_path = os.path.join(model_path, 'training_history.csv')

    # Controlla se il file CSV esiste
    if os.path.exists(history_csv_path):
        # Carica il file CSV
        training_history = pd.read_csv(history_csv_path)
        print("\nContenuto di training_history.csv:\n")
        print(training_history)
    else:
        print(f"File CSV 'training_history.csv' non trovato in {history_csv_path}")




def train_model(model, path, train_gen, val_gen, epochs=50):
    if not os.path.exists(path):
        os.makedirs(path)

    # EarlyStopping per fermare l'addestramento se il modello non migliora
    stop_early = EarlyStopping(monitor="val_loss", patience=5, verbose=1)

    # ModelCheckpoint per salvare il miglior modello basato su val_accuracy
    checkpoint = ModelCheckpoint(
        filepath=os.path.join(path, "model_epoch_{epoch:02d}.keras"),
        monitor='val_accuracy',
        verbose=1,
        save_best_only=True,
        mode='max'
    )

    # CSVLoggerCallback per monitorare come cambia l'accuracy e la loss ad ogni epoch
    csv_logger = CSVLoggerCallback(path)


    # Eseguo il training con il model e i dataset di training e validation
    history = model.fit(
        train_gen,
        validation_data=val_gen,
        epochs=epochs,
        callbacks=[csv_logger, checkpoint, stop_early]
    )

    # Salvataggio dei grafici di accuratezza
    generate_plot(path)


# Carica il modello dal checkpoint (l'ultimo salvato)
def continue_model(model, model_path, train_gen, val_gen, epochs=50, from_best=0):
  load_model(model, model_path)

  # EarlyStopping per fermare l'addestramento se il modello non migliora
  stop_early = EarlyStopping(monitor="val_loss", patience=5, verbose=1)

  # ModelCheckpoint per salvare il miglior modello basato su val_accuracy
  checkpoint = ModelCheckpoint(
      filepath=os.path.join(model_path, "model_epoch_{epoch:02d}.keras"),
      monitor='val_accuracy',
      verbose=1,
      save_best_only=True,
      mode='max'
  )

  # CSVLoggerCallback per monitorare come cambia l'accuracy e la loss ad ogni epoch
  csv_logger = CSVLoggerCallback(model_path)

  if (from_best): best_model = get_best_epoch_number(model_path)
  else: best_model = get_latest_epoch_number(model_path)
  start_epoch = int(best_model)

  # Continua il training da dove è stato interrotto
  history = model.fit(
      train_gen,
      validation_data=val_gen,
      initial_epoch=start_epoch,  # Imposta l'epoca iniziale a 26 (ultimo checkpoint salvato)
      epochs=epochs,         # Continua fino all'epoca 50 o più, se desiderato
      callbacks=[csv_logger, checkpoint, stop_early]  # Mantieni le callback originali
  )

  # Salvataggio dei grafici di accuratezza
  generate_plot(model_path)


print("ml_utils.py caricato correttamente!")

Writing ml_utils.py
