# Dependencies

In [None]:
import os
import numpy as np
import cv2
from glob import glob
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.model_selection import train_test_split
from tensorflow.keras.metrics import MeanAbsoluteError, MeanIoU


import matplotlib.pyplot as plt


In [None]:
# Imprimir las GPUs disponibles
print("GPUs disponibles:", tf.config.list_physical_devices('GPU'))

# Configurar la GPU
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        # Configura el crecimiento de memoria para evitar la asignación total
        tf.config.experimental.set_memory_growth(gpus[0], True)  # Habilitar crecimiento de memoria dinámico
    except RuntimeError as e:
        print(f"Error al configurar el crecimiento de memoria: {e}")

# Leer archivos

## Funciones de lectura

In [None]:
IMG_HEIGHT = 512
IMG_WIDTH = 288  # Ancho menor que altura (formato celular)

input_dir = "input/combined/"
target_dir = "target/semantic_annotations"

def count_unique_colors(image_path):
    img = cv2.imread(image_path)  # Leer en BGR
    if img is None:
        return 0
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # Convertir a RGB
    img = img.reshape(-1, 3)  # Aplanar
    unique_colors = np.unique(img, axis=0)
    return len(unique_colors)

def is_well_segmented(image_path, threshold=2):
    unique_colors = count_unique_colors(image_path)
    return unique_colors > threshold

def load_image_pair(num_image):
    input_path = os.path.join(input_dir, f"{num_image}.jpg")
    target_path = os.path.join(target_dir, f"{num_image}.png")
    
    if not is_well_segmented(target_path):
        return None, None

    input_img = cv2.imread(input_path)
    if input_img is None:
        return None, None
    input_img = cv2.cvtColor(input_img, cv2.COLOR_BGR2RGB)
    input_img = cv2.resize(input_img, (IMG_WIDTH, IMG_HEIGHT)).astype(np.float32) / 255.0

    target_img = cv2.imread(target_path)
    if target_img is None:
        return None, None
    target_img = cv2.cvtColor(target_img, cv2.COLOR_BGR2RGB)
    target_img = cv2.resize(target_img, (IMG_WIDTH, IMG_HEIGHT)).astype(np.float32) / 255.0

    return input_img, target_img

def load_image_pair_gray(num_image):
    input_path = os.path.join(input_dir, f"{num_image}.jpg")
    target_path = os.path.join(target_dir, f"{num_image}.png")
    
    if not is_well_segmented(target_path):
        return None, None

    input_img = cv2.imread(input_path, cv2.IMREAD_GRAYSCALE)
    target_img = cv2.imread(target_path, cv2.IMREAD_GRAYSCALE)

    if input_img is None or target_img is None:
        return None, None

    input_img = cv2.resize(input_img, (IMG_WIDTH, IMG_HEIGHT)).astype(np.float32) / 255.0
    target_img = cv2.resize(target_img, (IMG_WIDTH, IMG_HEIGHT)).astype(np.float32) / 255.0

    input_img = np.expand_dims(input_img, axis=-1)
    target_img = np.expand_dims(target_img, axis=-1)

    return input_img, target_img

## Ejemplo de lectura

In [None]:
# Ejemplo de visualización
X_example, Y_example = load_image_pair(3)

if X_example is not None and Y_example is not None:
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.title("Input")
    plt.imshow(X_example)
    plt.axis('off')
    plt.subplot(1, 2, 2)
    plt.title("Ground Truth")
    plt.imshow(Y_example)
    plt.axis('off')
    plt.show()
else:
    print("La imagen no está bien segmentada o no se pudo cargar.")

# Modelo

## Creacion modelo

### Auxiliar functions

In [None]:
def encoder_block(inputs, num_filters):

    # Convolution with 3x3 filter followed by ReLU activation
    x = tf.keras.layers.Conv2D(num_filters, 3, padding='same')(inputs)
    x = tf.keras.layers.Activation('relu')(x)
    
    # Convolution with 3x3 filter followed by ReLU activation
    x = tf.keras.layers.Conv2D(num_filters, 3, padding='same')(x)
    x = tf.keras.layers.Activation('relu')(x)

    # Convolution with 3x3 filter followed by ReLU activation    
    p = tf.keras.layers.MaxPool2D(pool_size=(2, 2), strides=2)(x)
    return x, p  # <-- x es skip connection, p va al siguiente encoder


def decoder_block(inputs, skip_features, num_filters):
    
    # Upsampling layer with 2x2 filter and stride of 2
    x = tf.keras.layers.Conv2DTranspose(num_filters, (2, 2), strides=2, padding='same')(inputs)
    
    # Concatenate the skip features with the upsampled input
    x = tf.keras.layers.Concatenate()([x, skip_features])
    
    # Convolution with 3x3 filter followed by ReLU activation
    x = tf.keras.layers.Conv2D(num_filters, 3, padding='same')(x)
    x = tf.keras.layers.Activation('relu')(x)

    # Convolution with 3x3 filter followed by ReLU activation
    x = tf.keras.layers.Conv2D(num_filters, 3, padding='same')(x)
    x = tf.keras.layers.Activation('relu')(x)
    
    return x


### Model construction

In [None]:
def unet_model(input_shape = (512, 288, 3), num_classes = 1, base_filters = 16):
    inputs = tf.keras.layers.Input(input_shape) # Shape: (512, 288, 3)

    # Contracting Path
    s1, p1 = encoder_block(inputs, base_filters) # Shape: (512, 288, 3) -> (256, 144, 16)
    s2, p2 = encoder_block(p1, base_filters * 2) # Shape: (256, 144, 16) -> (128, 72, 32)
    s3, p3 = encoder_block(p2, base_filters * 4) # Shape: (128, 72, 32) -> (64, 36, 64)
    s4, p4 = encoder_block(p3, base_filters * 8) # Shape: (64, 36, 64) -> (32, 18, 128)
    
    # Bottleneck
    b1 = tf.keras.layers.Conv2D(base_filters * 16, 3, padding='same')(p4)
    b1 = tf.keras.layers.Activation('relu')(b1) # Shape: (32, 18, 128) -> (32, 18, 256)
    b1 = tf.keras.layers.Conv2D(base_filters * 16, 3, padding='same')(b1)
    b1 = tf.keras.layers.Activation('relu')(b1) # Shape: (32, 18, 256) -> (32, 18, 256)
    
    # Expansive Path
    d1 = decoder_block(b1, s4, base_filters * 8) # Shape: (32, 18, 256) -> (64, 36, 128)
    d2 = decoder_block(d1, s3, base_filters * 4) # Shape: (64, 36, 128) -> (128, 72, 64)
    d3 = decoder_block(d2, s2, base_filters * 2) # Shape: (128, 72, 64) -> (256, 144, 32)
    d4 = decoder_block(d3, s1, base_filters) # Shape: (256, 144, 32) -> (512, 288, 16)
    
    # Output
    outputs = tf.keras.layers.Conv2D(num_classes, 1, padding = 'same', activation = 'softmax')(d4) # Shape: (512, 288, 16) -> (512, 288, 1)
    
    model = tf.keras.models.Model(inputs = inputs, outputs = outputs, name = 'U-Net')
    return model

## Compilacion del modelo

In [None]:
model = unet_model(input_shape=(IMG_HEIGHT, IMG_WIDTH, 3), num_classes=24, base_filters=16)
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=[MeanIoU(num_classes=24)]
)
model.summary()

## Entrenamiento

In [None]:
# --- Configuración de entrenamiento y checkpoints globales ---
save_dir = 'model/rico/unet'
os.makedirs(save_dir, exist_ok=True)

best_model_path = os.path.join(save_dir, 'best_model_overall.keras')
last_model_path = os.path.join(save_dir, 'last_batch_model.keras')

# Callback para guardar el mejor modelo de TODOS los batches
global_callbacks = [
    EarlyStopping(patience=5, restore_best_weights=True, monitor='val_loss'),
    ModelCheckpoint(best_model_path, save_best_only=True, monitor='val_loss')
]

total_images   = 20000
batch_size     = 10
ignored_images = 0

for batch_start in range(0, total_images, batch_size):
    print(f"\n--- Procesando lote {batch_start} a {batch_start + batch_size} ---")

    X_batch, Y_batch = [], []
    for i in range(batch_start, batch_start + batch_size):
        if i % 10 == 0:
            print(f"  Procesando imagen {i}")
        x, y = load_image_pair(i)
        if x is None or y is None:
            ignored_images += 1
            continue
        X_batch.append(x)
        Y_batch.append(y)

    X_batch = np.array(X_batch, dtype=np.float32)
    Y_batch = np.array(Y_batch, dtype=np.float32)
    print(f"  Total imágenes cargadas: {len(X_batch)} (Ignoradas hasta ahora: {ignored_images})")

    # Split train/val
    X_train, X_val, Y_train, Y_val = train_test_split(
        X_batch, Y_batch, test_size=0.2, random_state=42
    )

    # Entrena sobre este batch, pero usa callbacks globales para actualizar el mejor modelo
    model.fit(
        X_train, Y_train,
        validation_data=(X_val, Y_val),
        batch_size=4,
        epochs=50,
        callbacks=global_callbacks,
        verbose=1
    )

# Al terminar TODOS los batches, guardamos también el estado final (último batch)
model.save(last_model_path)
print(f"\nMejor modelo guardado en: {best_model_path}")
print(f"Modelo del último batch guardado en: {last_model_path}")


In [None]:
# 1. Ruta a los modelos guardados
model_dir = "model/rico"
model_files = sorted([
    f for f in os.listdir(model_dir)
    if f.endswith(".keras") and f.startswith("best_model_batch_")
])

# 2. Cargar imágenes de test (una sola vez)
print("\n--- Cargando imágenes de test ---")
X_test, Y_test = [], []
eval_start = total_images  # Asegúrate que esta variable esté definida
eval_end = total_images + 10  # Evaluaremos con 500 imágenes
ignored_eval = 0

for i in range(eval_start, eval_end):
    x, y = load_image_pair(i)
    if x is None or y is None:
        ignored_eval += 1
        continue
    X_test.append(x)
    Y_test.append(y)

X_test = np.array(X_test, dtype=np.float32)
Y_test = np.array(Y_test, dtype=np.float32)
print(f"  Total test cargadas: {len(X_test)} (Ignoradas en test: {ignored_eval})")

# 3. Evaluar todos los modelos
resultados = []

for model_file in model_files:
    model_path = os.path.join(model_dir, model_file)
    print(f"\n--- Evaluando modelo: {model_file} ---")
    try:
        model = tf.keras.models.load_model(model_path, compile=True)
        metrics = model.evaluate(X_test, Y_test, verbose=0)  # silencioso
        resultados.append((model_file, metrics))
        print(f"  Métricas: {metrics}")
    except Exception as e:
        print(f"  ❌ Error al cargar o evaluar {model_file}: {e}")

# 4. Ordenar modelos por la primera métrica (usualmente loss)
resultados_ordenados = sorted(resultados, key=lambda x: x[1][0])  # menor loss

# 5. Mostrar ranking
print("\n📊 Ranking de modelos por pérdida (menor es mejor):")
for rank, (name, metrics) in enumerate(resultados_ordenados):
    print(f"{rank+1}. {name}: loss={metrics[0]:.4f}, otras métricas={metrics[1:]}")

# 6. (Opcional) Guardar en archivo
with open("ranking_modelos.txt", "w") as f:
    f.write("Modelo\tLoss\tMétricas adicionales\n")
    for name, metrics in resultados_ordenados:
        f.write(f"{name}\t{metrics[0]:.4f}\t{metrics[1:]}\n")

In [None]:
model = tf.keras.models.load_model("model/rico/best_model_batch_1500.keras", compile=True)
def show_prediction(idx):
    x, y = load_image_pair(idx)
    if x is None or y is None:
        print("No se pudo cargar la imagen o no está bien segmentada.")
        return

    print(f"Formato de entrada (original): {x.shape}, Formato de salida: {y.shape}")
    print("Tipo de x:", type(x), "| dtype:", x.dtype)

    # Verifica que esté en el formato correcto
    if x.shape != (512, 288, 3):
        print("Redimensionando x...")
        x = tf.image.resize(x, (512, 288)).numpy()
    if y.shape != (512, 288, 3):
        print("Redimensionando y...")
        y = tf.image.resize(y, (512, 288)).numpy()

    # Normaliza si es necesario
    if x.max() > 1.0:
        x = x / 255.0

    # Asegura que tenga la dimensión de batch
    x_batch = np.expand_dims(x, axis=0)
    print("Forma para predicción:", x_batch.shape)

    # Predicción
    pred = model.predict(x_batch)[0]

    plt.figure(figsize=(12, 4))
    plt.subplot(1, 3, 1)
    plt.title("Input")
    plt.imshow(x)
    plt.subplot(1, 3, 2)
    plt.title("Ground Truth")
    plt.imshow(y)
    plt.subplot(1, 3, 3)
    plt.title("Prediction")
    plt.imshow(pred)
    plt.show()

show_prediction(3)

In [None]:
# Evaluación final con imágenes no vistas
print("\n--- Evaluando modelo con 500 imágenes nuevas ---")
X_test, Y_test = [], []
eval_start = total_images
eval_end = total_images + 10
ignored_eval = 0

for i in range(eval_start, eval_end):
    x, y = load_image_pair(i)
    if x is None or y is None:
        ignored_eval += 1
        continue
    X_test.append(x)
    Y_test.append(y)

X_test = np.array(X_test, dtype=np.float32)
Y_test = np.array(Y_test, dtype=np.float32)

print(f"  Total test cargadas: {len(X_test)} (Ignoradas en test: {ignored_eval})")

# Cargar el último modelo guardado para evaluar
model = tf.keras.models.load_model(checkpoint_path, compile=True)
metrics = model.evaluate(X_test, Y_test, verbose=1)
print(f"\nResultados de evaluación: {metrics}")

# Reporte final
print(f"\nTotal de imágenes ignoradas en entrenamiento: {ignored_images}")

def show_prediction(idx):
    x, y = load_image_pair(idx)
    if x is None or y is None:
        print("No se pudo cargar la imagen o no está bien segmentada.")
        return

    print(f"Formato de entrada (original): {x.shape}, Formato de salida: {y.shape}")
    print("Tipo de x:", type(x), "| dtype:", x.dtype)

    # Verifica que esté en el formato correcto
    if x.shape != (512, 288, 3):
        print("Redimensionando x...")
        x = tf.image.resize(x, (512, 288)).numpy()
    if y.shape != (512, 288, 3):
        print("Redimensionando y...")
        y = tf.image.resize(y, (512, 288)).numpy()

    # Normaliza si es necesario
    if x.max() > 1.0:
        x = x / 255.0

    # Asegura que tenga la dimensión de batch
    x_batch = np.expand_dims(x, axis=0)
    print("Forma para predicción:", x_batch.shape)

    # Predicción
    pred = model.predict(x_batch)[0]

    plt.figure(figsize=(12, 4))
    plt.subplot(1, 3, 1)
    plt.title("Input")
    plt.imshow(x)
    plt.subplot(1, 3, 2)
    plt.title("Ground Truth")
    plt.imshow(y)
    plt.subplot(1, 3, 3)
    plt.title("Prediction")
    plt.imshow(pred)
    plt.show()

show_prediction(3)

