<a href="https://colab.research.google.com/github/RAvila-bioeng/DeepLearningModel_Evaluation/blob/main/DeepLearningModel_Evaluation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import kagglehub
import tensorflow as tf
from tensorflow.keras.layers import (Dense, Conv2D, MaxPooling2D, Dropout,
    BatchNormalization, Conv2DTranspose, Activation, concatenate, Input)
from tensorflow.keras import Model
import numpy as np
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import os
import pandas as pd
import time
from google.colab import drive
import gc

In [None]:
drive.mount('/content/drive')
SAVE_DIR = "/content/drive/MyDrive/unet_experimentos"
os.makedirs(SAVE_DIR, exist_ok=True)

print("Guardando resultados en:", SAVE_DIR)
download_path = kagglehub.dataset_download(
    "nikitamanaenkov/fundus-image-dataset-for-vessel-segmentation"
)
print("Dataset descargado en:", download_path)

Mounted at /content/drive
Guardando resultados en: /content/drive/MyDrive/unet_experimentos
Using Colab cache for faster access to the 'fundus-image-dataset-for-vessel-segmentation' dataset.
Dataset descargado en: /kaggle/input/fundus-image-dataset-for-vessel-segmentation


In [None]:
def conv_block(input_tensor, filters, kernel_size=3, activation="relu",
               batch_norm=True, dropout_rate=0.0):

    x = Conv2D(filters, kernel_size, padding="same")(input_tensor)
    if batch_norm:
        x = BatchNormalization()(x)
    x = Activation(activation)(x)

    x = Conv2D(filters, kernel_size, padding="same")(x)
    if batch_norm:
        x = BatchNormalization()(x)
    x = Activation(activation)(x)

    if dropout_rate > 0:
        x = Dropout(dropout_rate)(x)

    return x


def encoder_block(input_tensor, filters, **kwargs):
    x = conv_block(input_tensor, filters, **kwargs)
    p = MaxPooling2D((2,2))(x)
    return x, p


def decoder_block(input_tensor, skip_tensor, filters, **kwargs):
    x = Conv2DTranspose(filters, 2, strides=2, padding="same")(input_tensor)
    x = concatenate([x, skip_tensor])
    x = conv_block(x, filters, **kwargs)
    return x


In [None]:
def build_model(img_size=(512,512),
                filters_first_layer=64,
                activation="relu",
                kernel_size=3,
                batch_norm=True,
                dropout_rate=0.0):
    """
    Misma arquitectura que el notebook original:
    4 bloques de encoder, bottleneck, 4 bloques de decoder.
    Solo hacemos algunos parámetros configurables.
    """
    inp = Input(shape=(*img_size, 3))

    # --- Encoder ---
    x1, p1 = encoder_block(
        inp,
        filters_first_layer,
        activation=activation,
        kernel_size=kernel_size,
        batch_norm=batch_norm,
        dropout_rate=dropout_rate
    )

    x2, p2 = encoder_block(
        p1,
        filters_first_layer * 2,
        activation=activation,
        kernel_size=kernel_size,
        batch_norm=batch_norm,
        dropout_rate=dropout_rate
    )

    x3, p3 = encoder_block(
        p2,
        filters_first_layer * 4,
        activation=activation,
        kernel_size=kernel_size,
        batch_norm=batch_norm,
        dropout_rate=dropout_rate
    )

    x4, p4 = encoder_block(
        p3,
        filters_first_layer * 8,
        activation=activation,
        kernel_size=kernel_size,
        batch_norm=batch_norm,
        dropout_rate=dropout_rate
    )

    # --- Bottleneck ---
    b = conv_block(
        p4,
        filters_first_layer * 16,
        activation=activation,
        kernel_size=kernel_size,
        batch_norm=batch_norm,
        dropout_rate=dropout_rate
    )

    # --- Decoder ---
    d1 = decoder_block(
        b, x4,
        filters_first_layer * 8,
        activation=activation,
        kernel_size=kernel_size,
        batch_norm=batch_norm,
        dropout_rate=dropout_rate
    )

    d2 = decoder_block(
        d1, x3,
        filters_first_layer * 4,
        activation=activation,
        kernel_size=kernel_size,
        batch_norm=batch_norm,
        dropout_rate=dropout_rate
    )

    d3 = decoder_block(
        d2, x2,
        filters_first_layer * 2,
        activation=activation,
        kernel_size=kernel_size,
        batch_norm=batch_norm,
        dropout_rate=dropout_rate
    )

    d4 = decoder_block(
        d3, x1,
        filters_first_layer,
        activation=activation,
        kernel_size=kernel_size,
        batch_norm=batch_norm,
        dropout_rate=dropout_rate
    )

    out = Conv2D(1, 1, padding="same", activation="sigmoid")(d4)
    model = Model(inp, out)

    return model


In [None]:
test_model = build_model()
print(test_model is None)
test_model.summary()


False


In [None]:
def load_data(path, img_size=(512,512), num_images=100):
    images, masks = [], []

    originals = sorted(os.listdir(path + "/Original"))[:num_images]
    grounds   = sorted(os.listdir(path + "/Ground truth"))[:num_images]

    for img_file, mask_file in zip(originals, grounds):

        img = load_img(os.path.join(path, "Original", img_file),
                       target_size=img_size)
        mask = load_img(os.path.join(path, "Ground truth", mask_file),
                        target_size=img_size, color_mode="grayscale")

        images.append(img_to_array(img)/255)
        masks.append(img_to_array(mask)/255)

    return np.array(images), np.array(masks)


def augment(image, mask, level="medium"):

    if level in ["medium", "strong"]:
        image = tf.image.random_flip_left_right(image)
        mask  = tf.image.random_flip_left_right(mask)

    if level == "strong":
        image = tf.image.random_brightness(image, 0.1)
        image = tf.image.random_contrast(image, 0.8, 1.2)

    return image, mask


def tf_dataset(x, y, batch_size=8, shuffle_size=8, aug_level="medium"):

    ds = tf.data.Dataset.from_tensor_slices((x, y))
    ds = ds.shuffle(shuffle_size).batch(batch_size)

    def aug_fn(img, mask):
        return augment(img, mask, aug_level)

    if aug_level != "none":
        ds = ds.map(aug_fn, num_parallel_calls=tf.data.AUTOTUNE)

    ds = ds.prefetch(tf.data.AUTOTUNE)
    return ds

In [None]:
def save_epoch_results(parameter, value, history, save_dir, config):
    """Guarda un CSV con los resultados por época."""
    rows = []

    epochs = len(history.history["loss"])

    for e in range(epochs):
        rows.append({
            "parameter": parameter,
            "value": value,
            "epoch": e+1,
            "train_loss": history.history["loss"][e],
            "train_acc": history.history["accuracy"][e],
            "val_loss": history.history["val_loss"][e],
            "val_acc": history.history["val_accuracy"][e],
            **config
        })

    df = pd.DataFrame(rows)
    df.to_csv(f"{save_dir}/epochs_{parameter}_{value}.csv", index=False)


In [None]:
base_config = {
    "filters_first_layer": 64,
    "kernel_size": 3,
    "activation": "relu",
    "batch_norm": True,
    "dropout_rate": 0.0,
    "batch_size": 8,
    "learning_rate": 1e-3,
    "optimizer": "adam",
    "epochs": 10,
    "shuffle_size": 8,
    "data_augmentation": "medium",
    "threshold": 0.5,
    "num_images": 100,
    "img_size": (512, 512)
}

In [None]:
parameters_change = [
    {"param": "filters_first_layer", "values": [32, 64, 128]},
    {"param": "unet_depth", "values": [3, 4, 5]},
    {"param": "kernel_size", "values": [3, 5, 7]},
    {"param": "activation", "values": ["relu", "elu", "leaky_relu"]},
    {"param": "batch_norm", "values": [True, False]},
    {"param": "dropout_rate", "values": [0.0, 0.2, 0.4]},
    {"param": "batch_size", "values": [2, 4, 8]},
    {"param": "learning_rate", "values": [1e-3, 1e-4, 1e-5]},
    {"param": "optimizer", "values": ["adam", "rmsprop", "sgd"]},
    {"param": "epochs", "values": [30, 60, 100]},
    {"param": "shuffle_size", "values": [8, 32, 128]},
    {"param": "image_size", "values": [(256,256), (384,384), (512,512)]},
    {"param": "threshold", "values": [0.3, 0.5, 0.7]},
    {"param": "data_augmentation", "values": ["light", "medium", "strong"]}
]


In [None]:
for exp in parameters_change:

    param = exp["param"]
    values = exp["values"]

    for value in values:

        print(f"\n\n>>> EXPERIMENTO: {param} = {value}")

#Vaciamos lo anterior antes de cada bucle, si no se queda ocupando espacio y la GPU no llega
        tf.keras.backend.clear_session()
        gc.collect()


        conf = base_config.copy()
        conf[param] = value

        # Cargar datos
        x_train, y_train = load_data(
            f"{download_path}/train",
            img_size=conf["img_size"],
            num_images=conf["num_images"]
        )

        x_test, y_test = load_data(
            f"{download_path}/test",
            img_size=conf["img_size"],
            num_images=conf["num_images"]
        )

        # Dataset
        train_ds = tf_dataset(x_train, y_train,
                              batch_size=conf["batch_size"],
                              shuffle_size=conf["shuffle_size"],
                              aug_level=conf["data_augmentation"])

        test_ds = tf_dataset(x_test, y_test,
                             batch_size=conf["batch_size"],
                             shuffle_size=1,
                             aug_level="none")

        # Modelo
        model = build_model(
            img_size=conf["img_size"],
            filters_first_layer=conf["filters_first_layer"],
            activation=conf["activation"],
            kernel_size=conf["kernel_size"],
            batch_norm=conf["batch_norm"],
            dropout_rate=conf["dropout_rate"]
        )

        # Optimizador
        if conf["optimizer"] == "adam":
            opt = tf.keras.optimizers.Adam(conf["learning_rate"])
        else:
            opt = tf.keras.optimizers.RMSprop(conf["learning_rate"])

        model.compile(loss="binary_crossentropy",
                      optimizer=opt,
                      metrics=["accuracy"])

        # Callbacks
        checkpoint_path = f"{SAVE_DIR}/best_{param}_{value}.keras"
        checkpoint = tf.keras.callbacks.ModelCheckpoint(
            checkpoint_path, save_best_only=True, monitor="val_loss"
        )

        early = tf.keras.callbacks.EarlyStopping(
            patience=8, restore_best_weights=True, monitor="val_loss"
        )

        # Entrenar
        history = model.fit(
            train_ds,
            epochs=conf["epochs"],
            validation_data=test_ds,
            callbacks=[checkpoint, early],
            verbose=1
        )

        # Guardar métricas por época
        save_epoch_results(param, value, history, SAVE_DIR, conf)

        print(f"✓ DONE: {param}={value}")

print("\n===== TODOS LOS EXPERIMENTOS COMPLETADOS =====\n")



>>> EXPERIMENTO: data_augmentation = medium
Epoch 1/10
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m256s[0m 10s/step - accuracy: 0.6853 - loss: 0.6082 - val_accuracy: 0.0711 - val_loss: 401.7237
Epoch 2/10
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m35s[0m 3s/step - accuracy: 0.9311 - loss: 0.2753 - val_accuracy: 0.0726 - val_loss: 53.1305
Epoch 3/10
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 3s/step - accuracy: 0.9317 - loss: 0.2514 - val_accuracy: 0.6515 - val_loss: 1.7390
Epoch 4/10
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m33s[0m 3s/step - accuracy: 0.9313 - loss: 0.2413 - val_accuracy: 0.8960 - val_loss: 1.3819
Epoch 5/10
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 2s/step - accuracy: 0.9315 - loss: 0.2364 - val_accuracy: 0.8349 - val_loss: 3.0002
Epoch 6/10
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m43s[0m 3s/step - accuracy: 0.9307 - loss: 0.2341 - val_accuracy: 0.9280 - val_lo