# 1. Imports and Setup

In this cell, we import all the necessary libraries and set up the working environment.

In [None]:
# Import necessary libraries
import os
import json
import numpy as np
import pandas as pd
import seaborn as sns
from PIL import Image

# TensorFlow and Keras modules
import tensorflow as tf
from tensorflow.keras import layers, models, Input, Model, regularizers
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, CSVLogger, ModelCheckpoint
from tensorflow.keras.preprocessing.image import ImageDataGenerator, image_dataset_from_directory
from tensorflow.keras.layers import Conv2D, MaxPooling2D, UpSampling2D, Concatenate
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import BinaryCrossentropy

# scikit-learn for data splitting
from sklearn.model_selection import train_test_split

In [None]:
# Function to load configuration from a JSON file
def load_config(filename: str = "config-Unet.json") -> dict:
    """
    Loads the configuration from a JSON file.

    Parameters:
    - filename (str): The path to the configuration file.

    Returns:
    - dict: Configuration parameters loaded from the JSON file.
    """
    with open(filename, 'r') as f:
        return json.load(f)

# Load configuration
config = load_config()

In [None]:
# Parameter to select dataset
DATASET_TYPE = "GASF"  # Change to "MARKOV" or "RGB" to switch dataset

# Paths based on dataset type
if DATASET_TYPE == "GASF":
    data_dir = "/media/neurone-pc6/Volume/Michele/Prog_GAF_Michele/pythonProject/data/GASF"
    logger_path = os.path.join("results", "training_u-net_32x32_gasf.log")
    model_save_path = os.path.join("models", "u-net_gasf.h5")
    test_results_path = os.path.join("results", "test_results_u-net_32x32_gasf.txt")
elif DATASET_TYPE == "GADF":
    data_dir = "/media/neurone-pc6/Volume/Michele/Prog_GAF_Michele/pythonProject/data/GADF"
    logger_path = os.path.join("results", "training_u-net_32x32_gadf.log")
    model_save_path = os.path.join("models", "u-net_gadf.h5")
    test_results_path = os.path.join("results", "test_results_u-net_32x32_gadf.txt")
elif DATASET_TYPE == "MARKOV":
    data_dir = "/media/neurone-pc6/Volume/Michele/Prog_GAF_Michele/pythonProject/data/MARKOV"
    logger_path = os.path.join("results", "training_u-net_32x32_markov.log")
    model_save_path = os.path.join("models", "u-net_markov.h5")
    test_results_path = os.path.join("results", "test_results_u-net_32x32_markov.txt")
elif DATASET_TYPE == "RGB":
    data_dir = "/media/neurone-pc6/Volume/Michele/Prog_GAF_Michele/pythonProject/data/RGB"
    logger_path = os.path.join("results", "training_u-net_32x32_rgb.log")
    model_save_path = os.path.join("models", "u-net_rgb.h5")
    test_results_path = os.path.join("results", "test_results_u-net_32x32_rgb.txt")

# Directories for labels
label_0_folder = os.path.join(data_dir, "Label_0")
label_1_folder = os.path.join(data_dir, "Label_1")

In [None]:
# Training parameters
BATCH_SIZE = config["unet"]["training"]["batch_size"]
AUTOTUNE = tf.data.experimental.AUTOTUNE
INPUT_SHAPE = (32, 32, 1)
EPOCHS = config["unet"]["training"]["epochs"]


# Define steps per epoch and validation steps based on dataset cardinality
STEPS_PER_EPOCH = tf.data.experimental.cardinality(train_ds).numpy()
VALIDATION_STEPS = tf.data.experimental.cardinality(val_ds).numpy()

# Define optimizer, loss, and metrics from configuration
OPTIMIZER = tf.keras.optimizers.get({
    "class_name": config["unet"]["training"]["optimizer"],
    "config": {
        "learning_rate": config["unet"]["training"]["learning_rate"]
    }
})

LOSS = config["unet"]["training"]["loss"]
METRICS = config["unet"]["training"]["metrics"]

# Configure callbacks for training
early_stopping = EarlyStopping(
    monitor=config["unet"]["training"]["early_stopping"]["monitor"],
    patience=config["unet"]["training"]["early_stopping"]["patience"],
    restore_best_weights=config["unet"]["training"]["early_stopping"]["restore_best_weights"]
)

model_checkpoint = ModelCheckpoint(
    filepath=config["unet"]["training"]["model_checkpoint"]["filepath"],
    monitor=config["unet"]["training"]["model_checkpoint"]["monitor"],
    save_best_only=config["unet"]["training"]["model_checkpoint"]["save_best_only"]
)

reduce_lr = ReduceLROnPlateau(
    monitor=config["unet"]["training"]["lr_scheduler"]["monitor"],
    factor=config["unet"]["training"]["lr_scheduler"]["factor"],
    patience=config["unet"]["training"]["lr_scheduler"]["patience"]
)

csv_logger = CSVLogger(logger_path)

In [None]:
# Preprocessing function
def preprocess_image(image_path, dataset_type):
    image = np.load(image_path).astype(np.float32)
    if dataset_type == "GASF":
        # Normalize values from [-1, 1] to [0, 1]
        image = (image + 1) / 2.0
        if image.ndim == 2:  # Grayscale images
            image = np.expand_dims(image, axis=-1)
    elif dataset_type == "GADF":
        # Preprocessing specific for GADF: normalize to [0, 1]
        image = (image - np.min(image)) / (np.max(image) - np.min(image))
        if image.ndim == 2:  # Ensure channel dimension for grayscale images
            image = np.expand_dims(image, axis=-1)
    elif dataset_type == "MARKOV":
        # Custom preprocessing for MARKOV (if needed)
        image = (image - np.min(image)) / (np.max(image) - np.min(image))
        if image.ndim == 2:
            image = np.expand_dims(image, axis=-1)
    elif dataset_type == "RGB":
         image = np.load(image_path.decode('utf-8')).astype(np.float32)
         image = np.expand_dims(image, axis=-1)  # Add channel dimension
    return image

    image = tf.numpy_function(_load_image, [image_path], tf.float32)
    image.set_shape([32, 32, 3])  # Explicitly set shape for TensorFlow compatibility
    # Normalizzazione dei pixel da [0, 255] a [0, 1]
    image = image / 255.0
    return image, image

In [None]:
# Example usage of preprocess function
# image_path = "example_path.npy"
# processed_image = preprocess_image(image_path, DATASET_TYPE)

In [None]:
def unet(input_shape: tuple[int, int, int]) -> Model:
    """
    Builds a U-Net model for image segmentation.

    Parameters:
    - input_shape (tuple[int, int, int]): Shape of the input image (height, width, channels).

    Returns:
    - Model: A compiled U-Net model.
    """
    inputs = Input(shape=input_shape)

    # Encoder
    conv1 = Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
    conv1 = Conv2D(32, (3, 3), activation='relu', padding='same')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = Conv2D(64, (3, 3), activation='relu', padding='same')(pool1)
    conv2 = Conv2D(64, (3, 3), activation='relu', padding='same')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = Conv2D(128, (3, 3), activation='relu', padding='same')(pool2)
    conv3 = Conv2D(128, (3, 3), activation='relu', padding='same')(conv3)

    # Decoder
    up4 = UpSampling2D(size=(2, 2))(conv3)
    up4 = Conv2D(64, (2, 2), activation='relu', padding='same')(up4)
    up4 = Concatenate()([up4, conv2])

    conv4 = Conv2D(64, (3, 3), activation='relu', padding='same')(up4)
    conv4 = Conv2D(64, (3, 3), activation='relu', padding='same')(conv4)

    up5 = UpSampling2D(size=(2, 2))(conv4)
    up5 = Conv2D(32, (2, 2), activation='relu', padding='same')(up5)
    up5 = Concatenate()([up5, conv1])

    conv5 = Conv2D(32, (3, 3), activation='relu', padding='same')(up5)
    conv5 = Conv2D(32, (3, 3), activation='relu', padding='same')(conv5)

    outputs = Conv2D(1, (1, 1), activation='sigmoid')(conv5)

    # Create model
    model = Model(inputs=[inputs], outputs=[outputs])
    return model

In [None]:
# Creation of the pandas DataFrame with image paths and labels
df = create_dataframe(label_0_folder, label_1_folder)
df["label"] = df["label"].astype(np.float32)  # Convert labels to float32 for compatibility with TensorFlow

# Split into training, validation, and test sets
train_df, temp_df = train_test_split(df, test_size=0.2, random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42)

# Function to create a TensorFlow dataset from a DataFrame
def create_tf_dataset(df: pd.DataFrame, batch_size: int, autotune: int) -> tf.data.Dataset:
    """
    Converts a DataFrame containing image paths and labels into a TensorFlow dataset.

    Parameters:
    - df (pd.DataFrame): DataFrame with columns 'image_path' and 'label'.
    - batch_size (int): Number of samples per batch.
    - autotune (int): Number of parallel calls to optimize performance.

    Returns:
    - tf.data.Dataset: A batched and prefetched TensorFlow dataset ready for training.
    """
    ds = tf.data.Dataset.from_tensor_slices((df["image_path"].values, df["label"].values))
    ds = ds.map(load_and_preprocess_image, num_parallel_calls=autotune)
    ds = ds.batch(batch_size).prefetch(autotune)
    return ds

# Create TensorFlow datasets for training, validation, and testing
train_ds = create_tf_dataset(train_df, BATCH_SIZE, AUTOTUNE)
val_ds = create_tf_dataset(val_df, BATCH_SIZE, AUTOTUNE)
test_ds = create_tf_dataset(test_df, 1, AUTOTUNE)  # Batch size of 1 for test set

In [None]:
def train_unet_model(model: Model, train_ds: tf.data.Dataset, val_ds: tf.data.Dataset, 
                     epochs: int, optimizer: tf.keras.optimizers.Optimizer, 
                     loss: str, metrics: list[str], callbacks: list[tf.keras.callbacks.Callback]) -> tf.keras.callbacks.History:
    """
    Compiles and trains a U-Net model.

    Parameters:
    - model (Model): The U-Net model to train.
    - train_ds (tf.data.Dataset): The training dataset.
    - val_ds (tf.data.Dataset): The validation dataset.
    - epochs (int): Number of epochs to train the model.
    - optimizer (tf.keras.optimizers.Optimizer): The optimizer to use for training.
    - loss (str): The loss function to use for training.
    - metrics (list[str]): A list of metrics to evaluate during training.
    - callbacks (list[tf.keras.callbacks.Callback]): A list of callbacks to use during training.

    Returns:
    - tf.keras.callbacks.History: The history object containing the training details.
    """
    model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

    # Train the model
    history = model.fit(
        train_ds,
        epochs=epochs,
        validation_data=val_ds,
        callbacks=callbacks
    )

    return history

# Model generation and compilation
model = unet(INPUT_SHAPE)

# Train the U-Net model using the defined function
history = train_unet_model(
    model=model,
    train_ds=train_ds,
    val_ds=val_ds,
    epochs=EPOCHS,
    optimizer=OPTIMIZER,
    loss=LOSS,
    metrics=METRICS,
    callbacks=[early_stopping, model_checkpoint, reduce_lr, csv_logger]
)

In [None]:
def evaluate_and_save_results(model: Model, test_ds: tf.data.Dataset, metrics: list[str], results_dir: str) -> None:
    """
    Evaluates the model on the test dataset and saves the results to a file.

    Parameters:
    - model (Model): The trained model to evaluate.
    - test_ds (tf.data.Dataset): The test dataset.
    - metrics (list[str]): A list of metrics to include in the evaluation.
    - results_dir (str): Directory to save the evaluation results.
    """
    # Evaluate the model on the test dataset
    test_results = model.evaluate(test_ds)

    # Print the results to the console
    print("Test Loss:", test_results[0])
    print("Test Accuracy:", test_results[1])

    # Save the results to a text file
    test_results_path = os.path.join(results_dir, 'test_results_u-net_32x32_markov.txt')
    with open(test_results_path, 'w') as f:
        f.write(f"Test Loss: {test_results[0]}\n")
        for i, metric in enumerate(metrics):
            f.write(f"Test {metric}: {test_results[i + 1]}\n")

# Evaluate the model and save the results
evaluate_and_save_results(model, test_ds, METRICS, results_dir)

In [None]:
def save_model(model: Model, model_save_path: str) -> None:
    """
    Saves the trained model to the specified path.

    Parameters:
    - model (Model): The trained model to save.
    - model_save_path (str): Path where the model will be saved.
    """
    # Save the model to the specified path
    model.save(model_save_path)
    print(f"Model saved to {model_save_path}")

# Save the trained model
save_model(model, model_save_path)