In [None]:
#1. Bibliotecas - Roboflow
!pip install roboflow
from roboflow import Roboflow
import os

In [None]:
#2. Exporto las máscaras de segmentación
rf = Roboflow(api_key="VN4WNXMOAA6kMFJVi4w0")
project = rf.workspace("prueba-segformer").project("my-first-project-w4jed")
version = project.version(1)
dataset = version.download("png-mask-semantic")

#3. Verifico la ubicación de las máscaras
print(dataset.location)

from torch.utils.data import Dataset, DataLoader
from PIL import Image

In [None]:
#4. Clase para obtener Dataset
class SemanticSegmentationDataset(Dataset):
    """Image (semantic) segmentation dataset."""

    def __init__(self, root_dir, feature_extractor):
        self.root_dir = root_dir
        self.feature_extractor = feature_extractor
        self.classes_csv_file = os.path.join(self.root_dir, "_classes.csv")
        with open(self.classes_csv_file, 'r') as fid:
            data = [l.split(',') for i,l in enumerate(fid) if i !=0]
        self.id2label = {x[0]:x[1] for x in data}

        image_file_names = [f for f in os.listdir(self.root_dir) if '.jpg' in f]
        mask_file_names = [f for f in os.listdir(self.root_dir) if '.png' in f]

        self.images = sorted(image_file_names)
        self.masks = sorted(mask_file_names)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):

        image = Image.open(os.path.join(self.root_dir, self.images[idx]))
        segmentation_map = Image.open(os.path.join(self.root_dir, self.masks[idx]))

        encoded_inputs = self.feature_extractor(image, segmentation_map, return_tensors="pt")

        for k,v in encoded_inputs.items():
          encoded_inputs[k].squeeze_()

        return encoded_inputs

In [None]:
#5. Extracción de los datasets para entrenamiento, validación y test
from transformers import SegformerFeatureExtractor, SegformerImageProcessor

feature_extractor = SegformerImageProcessor.from_pretrained("nvidia/segformer-b0-finetuned-ade-512-512")
feature_extractor.reduce_labels = False
#feature_extractor.size = 128
feature_extractor.size = 512

dataset_root = "/content/My-First-Project-1"

train_dataset = SemanticSegmentationDataset(os.path.join(dataset_root, "train"), feature_extractor)
val_dataset = SemanticSegmentationDataset(os.path.join(dataset_root, "valid"), feature_extractor)
test_dataset = SemanticSegmentationDataset(os.path.join(dataset_root, "test"), feature_extractor)

batch_size = 8
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=3, prefetch_factor=8)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, num_workers=3, prefetch_factor=8)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, num_workers=3, prefetch_factor=8)

In [None]:
# Instalación de PyTorch Lightning
!pip install pytorch-lightning

# Importación de bibliotecas necesarias
import pytorch_lightning as pl
from transformers import SegformerForSemanticSegmentation  # Modelo SegFormer para segmentación
from datasets import load_metric  # Para cargar métricas de evaluación
import torch
from torch import nn  # Módulo de redes neuronales de PyTorch
import numpy as np

# Definición de la clase principal para fine-tuning de SegFormer
class SegformerFinetuner(pl.LightningModule):
    """
    Clase que implementa el fine-tuning de SegFormer para segmentación semántica
    usando PyTorch Lightning para manejar el ciclo de entrenamiento.
    """

    def __init__(self, id2label, train_dataloader=None, val_dataloader=None, test_dataloader=None, metrics_interval=100):
        """
        Inicializa el fine-tuner de SegFormer.

        Args:
            id2label (dict): Diccionario de mapeo de IDs de clase a etiquetas
            train_dataloader (DataLoader): DataLoader para datos de entrenamiento
            val_dataloader (DataLoader): DataLoader para datos de validación
            test_dataloader (DataLoader): DataLoader para datos de prueba
            metrics_interval (int): Cada cuántos batches calcular métricas
        """
        super(SegformerFinetuner, self).__init__()  # Inicializa la clase padre
        self.id2label = id2label  # Diccionario para convertir IDs a nombres de clase
        self.metrics_interval = metrics_interval  # Frecuencia para calcular métricas
        self.train_dl = train_dataloader  # DataLoader de entrenamiento
        self.val_dl = val_dataloader  # DataLoader de validación
        self.test_dl = test_dataloader  # DataLoader de prueba
        self.test_losses = []  # Lista para almacenar pérdidas durante prueba

        # Calcula el número de clases y crea mapeo inverso (etiqueta -> ID)
        self.num_classes = len(id2label.keys())
        self.label2id = {v: k for k, v in self.id2label.items()}

        # Carga el modelo pre-entrenado SegFormer con configuración personalizada
        self.model = SegformerForSemanticSegmentation.from_pretrained(
            "nvidia/segformer-b0-finetuned-ade-512-512",  # Modelo base pre-entrenado
            return_dict=False,  # Devuelve tuplas en lugar de diccionarios
            num_labels=self.num_classes,  # Número de clases personalizado
            id2label=self.id2label,  # Mapeo de IDs a etiquetas
            label2id=self.label2id,  # Mapeo de etiquetas a IDs
            ignore_mismatched_sizes=True,  # Permite ajustar tamaño de salida
        )

        self.model.train()  # Establece el modelo en modo entrenamiento

        # Inicializa métricas para entrenamiento, validación y prueba
        self.train_mean_iou = load_metric("mean_iou")  # Métrica IoU para entrenamiento
        self.val_mean_iou = load_metric("mean_iou")  # Métrica IoU para validación
        self.test_mean_iou = load_metric("mean_iou")  # Métrica IoU para prueba

        self.val_losses = []  # Lista para acumular pérdidas de validación

    def forward(self, images, masks):
        """
        Paso forward del modelo.

        Args:
            images (Tensor): Tensor de imágenes de entrada
            masks (Tensor): Tensor de máscaras de ground-truth

        Returns:
            tuple: (loss, logits) - Pérdida y logits del modelo
        """
        outputs = self.model(pixel_values=images, labels=masks)
        return outputs  # Retorna las salidas del modelo

    def training_step(self, batch, batch_nb):
        """
        Paso de entrenamiento para un batch de datos.

        Args:
            batch (dict): Batch de datos con 'pixel_values' y 'labels'
            batch_nb (int): Número del batch actual

        Returns:
            dict: Diccionario con métricas de entrenamiento
        """
        # Extrae imágenes y máscaras del batch
        images, masks = batch['pixel_values'], batch['labels']

        # Pasa los datos por el modelo
        outputs = self(images, masks)
        loss, logits = outputs[0], outputs[1]  # Separa pérdida y logits

        # Interpola los logits al tamaño original de las máscaras
        upsampled_logits = nn.functional.interpolate(
            logits,
            size=masks.shape[-2:],  # Tamaño objetivo (H, W)
            mode="bilinear",  # Método de interpolación
            align_corners=False  # Alineación de esquinas
        )
        predicted = upsampled_logits.argmax(dim=1)  # Obtiene predicciones

        # Agrega el batch actual a las métricas de entrenamiento
        self.train_mean_iou.add_batch(
            predictions=predicted.detach().cpu().numpy(),  # Predicciones
            references=masks.detach().cpu().numpy()  # Verdades terreno
        )

        # Calcula métricas cada cierto intervalo de batches
        if batch_nb % self.metrics_interval == 0:
            metrics = self.train_mean_iou.compute(
                num_labels=self.num_classes,  # Número de clases
                ignore_index=255,  # Ignora píxeles sin etiqueta (generalmente 255)
                reduce_labels=False,  # No reduce etiquetas
            )
            # Prepara métricas para logging
            metrics = {
                'loss': loss,
                "mean_iou": metrics["mean_iou"],  # IoU promedio
                "mean_accuracy": metrics["mean_accuracy"]  # Precisión promedio
            }
            # Registra métricas
            for k, v in metrics.items():
                self.log(k, v)
            return metrics
        else:
            return {'loss': loss}  # Retorna solo la pérdida

    def validation_step(self, batch, batch_nb):
        """
        Paso de validación para un batch de datos.

        Args:
            batch (dict): Batch de datos de validación
            batch_nb (int): Número del batch actual

        Returns:
            dict: Diccionario con pérdida de validación
        """
        images, masks = batch['pixel_values'], batch['labels']
        outputs = self(images, masks)
        loss, logits = outputs[0], outputs[1]

        # Interpola logits y obtiene predicciones
        upsampled_logits = nn.functional.interpolate(
            logits,
            size=masks.shape[-2:],
            mode="bilinear",
            align_corners=False
        )
        predicted = upsampled_logits.argmax(dim=1)

        # Agrega batch a métricas de validación
        self.val_mean_iou.add_batch(
            predictions=predicted.detach().cpu().numpy(),
            references=masks.detach().cpu().numpy()
        )

        # Acumula pérdida para cálculo posterior
        self.val_losses.append(loss.detach())

        return {'val_loss': loss}

    def on_validation_epoch_end(self):
        """
        Método llamado al final de cada época de validación.
        Calcula y registra métricas agregadas.
        """
        # Calcula métricas IoU para toda la época
        metrics = self.val_mean_iou.compute(
            num_labels=self.num_classes,
            ignore_index=255,
            reduce_labels=False,
        )

        # Calcula pérdida promedio de validación
        avg_val_loss = torch.stack(self.val_losses).mean() if self.val_losses else torch.tensor(0.0)
        self.val_losses.clear()  # Limpia para próxima época

        # Extrae métricas importantes
        val_mean_iou = metrics["mean_iou"]
        val_mean_accuracy = metrics["mean_accuracy"]

        # Registra métricas (aparecen en la barra de progreso)
        self.log("val_loss", avg_val_loss, prog_bar=True, on_epoch=True)
        self.log("val_mean_iou", val_mean_iou, prog_bar=True, on_epoch=True)
        self.log("val_mean_accuracy", val_mean_accuracy, prog_bar=True, on_epoch=True)

        # Prepara diccionario de métricas
        metrics = {
            "val_loss": avg_val_loss,
            "val_mean_iou": val_mean_iou,
            "val_mean_accuracy": val_mean_accuracy
        }

        # Registra todas las métricas
        for k, v in metrics.items():
            self.log(k, v, prog_bar=True, on_epoch=True)

    def test_step(self, batch, batch_nb):
        """
        Paso de prueba para un batch de datos.

        Args:
            batch (dict): Batch de datos de prueba
            batch_nb (int): Número del batch actual

        Returns:
            dict: Diccionario con pérdida de prueba
        """
        images, masks = batch['pixel_values'], batch['labels']
        outputs = self(images, masks)
        loss, logits = outputs[0], outputs[1]

        # Interpola logits y obtiene predicciones
        upsampled_logits = nn.functional.interpolate(
            logits,
            size=masks.shape[-2:],
            mode="bilinear",
            align_corners=False
        )
        predicted = upsampled_logits.argmax(dim=1)

        # Agrega batch a métricas de prueba
        self.test_mean_iou.add_batch(
            predictions=predicted.detach().cpu().numpy(),
            references=masks.detach().cpu().numpy()
        )

        # Acumula pérdida para cálculo posterior
        self.test_losses.append(loss.detach())

        return {'test_loss': loss}

    def on_test_epoch_end(self):
        """
        Método llamado al final de la prueba.
        Calcula y registra métricas finales de prueba.
        """
        # Calcula métricas IoU para todo el conjunto de prueba
        metrics = self.test_mean_iou.compute(
            num_labels=self.num_classes,
            ignore_index=255,
            reduce_labels=False,
        )

        # Calcula pérdida promedio de prueba
        avg_test_loss = torch.stack(self.test_losses).mean() if self.test_losses else torch.tensor(0.0)
        self.test_losses.clear()  # Limpia para próxima ejecución

        # Extrae métricas importantes
        test_mean_iou = metrics["mean_iou"]
        test_mean_accuracy = metrics["mean_accuracy"]

        # Registra métricas finales (aparecen en la barra de progreso)
        self.log("test_loss", avg_test_loss, prog_bar=True)
        self.log("test_mean_iou", test_mean_iou, prog_bar=True)
        self.log("test_mean_accuracy", test_mean_accuracy, prog_bar=True)

    def configure_optimizers(self):
        """
        Configura el optimizador para el entrenamiento.

        Returns:
            Optimizer: Optimizador Adam con learning rate bajo
        """
        return torch.optim.Adam(
            [p for p in self.parameters() if p.requires_grad],  # Solo parámetros entrenables
            lr=2e-05,  # Tasa de aprendizaje pequeña para fine-tuning
            eps=1e-08  # Término épsilon para estabilidad numérica
        )

    def train_dataloader(self):
        """Retorna el DataLoader de entrenamiento."""
        return self.train_dl

    def val_dataloader(self):
        """Retorna el DataLoader de validación."""
        return self.val_dl

    def test_dataloader(self):
        """Retorna el DataLoader de prueba."""
        return self.test_dl

# Instanciación del modelo fine-tuner
segformer_finetuner = SegformerFinetuner(
    train_dataset.id2label,  # Mapeo de IDs a etiquetas
    train_dataloader=train_dataloader,  # DataLoader de entrenamiento
    val_dataloader=val_dataloader,  # DataLoader de validación
    test_dataloader=test_dataloader,  # DataLoader de prueba
    metrics_interval=10,  # Calcula métricas cada 10 batches
)

In [None]:
# ==================== CONFIGURACIÓN INICIAL ====================
# Instalación de dependencias (para Colab/Jupyter)
!pip install pytorch-lightning transformers datasets

# Importaciones esenciales
import os
import numpy as np
import torch
from torch import nn
from PIL import Image
from matplotlib import pyplot as plt
from google.colab import files  # Solo para Colab
from IPython.display import display

# Frameworks principales
import pytorch_lightning as pl
from transformers import SegformerForSemanticSegmentation, SegformerFeatureExtractor

# ==================== DEFINICIÓN DE CALLBACKS ====================
"""
Callbacks son funciones que se ejecutan durante el entrenamiento para:
1. EarlyStopping: Detener el entrenamiento si no hay mejora
2. ModelCheckpoint: Guardar los mejores modelos
"""
early_stop_callback = EarlyStopping(
    monitor="val_mean_iou",  # Métrica a monitorear (IoU en validación)
    min_delta=0.00,         # Cambio mínimo para considerar mejora
    patience=3,             # Épocas sin mejora antes de parar
    verbose=True,           # Mostrar mensajes
    mode="max"              # Objetivo: maximizar la métrica
)

checkpoint_callback = ModelCheckpoint(
    save_top_k=1,                          # Guardar solo el mejor modelo
    monitor="val_mean_iou",                # Métrica de referencia
    mode="max",                            # Maximizar el IoU
    filename="best-model-{epoch:02d}-{val_mean_iou:.2f}",  # Nombre del archivo
    verbose=True                           # Mostrar mensajes
)

# ==================== CONFIGURACIÓN DEL ENTRENADOR ====================
trainer = pl.Trainer(
    accelerator="auto",          # Usar GPU si está disponible
    devices="auto",             # Usar todos los dispositivos
    callbacks=[early_stop_callback, checkpoint_callback],  # Callbacks
    max_epochs=20,              # Límite de épocas
    val_check_interval=len(train_dataloader),  # Validar después de cada época
)

# ==================== ENTRENAMIENTO Y EVALUACIÓN ====================
# Iniciar entrenamiento
trainer.fit(segformer_finetuner)

# Evaluar con el mejor modelo guardado
test_results = trainer.test(ckpt_path="best")
print("Resultados de prueba:", test_results)

# ==================== VISUALIZACIÓN DE RESULTADOS ====================
# Mapa de colores para las clases (personalizar según dataset)
color_map = {
    0: (0, 0, 0),      # Fondo - Negro
    1: (255, 0, 0),    # Clase 1 - Rojo
    2: (0, 255, 0),    # Clase 2 - Verde
    3: (0, 0, 255)     # Clase 3 - Azul
}

def prediction_to_vis(prediction):
    """Convierte máscara de predicción a imagen RGB coloreada"""
    vis_shape = prediction.shape + (3,)  # Añadir canales RGB
    vis = np.zeros(vis_shape, dtype=np.uint8)
    for class_id, color in color_map.items():
        vis[prediction == class_id] = color
    return Image.fromarray(vis)

# Visualización comparativa (predicción vs ground truth)
def visualize_predictions(dataloader, model, num_samples=3):
    model.eval()
    with torch.no_grad():
        for batch in dataloader:
            images, masks = batch['pixel_values'], batch['labels']
            outputs = model(images, masks)
            logits = outputs[1] if isinstance(outputs, tuple) else outputs.logits

            # Upsample logits al tamaño original
            upsampled_logits = nn.functional.interpolate(
                logits,
                size=masks.shape[-2:],
                mode="bilinear",
                align_corners=False
            )
            preds = upsampled_logits.argmax(dim=1).cpu().numpy()
            masks = masks.cpu().numpy()

            # Mostrar resultados
            fig, ax = plt.subplots(num_samples, 2, figsize=(10, num_samples*5))
            for i in range(num_samples):
                ax[i,0].imshow(prediction_to_vis(preds[i]))
                ax[i,0].set_title("Predicción")
                ax[i,1].imshow(prediction_to_vis(masks[i]))
                ax[i,1].set_title("Ground Truth")
            plt.show()
            break

visualize_predictions(test_dataloader, segformer_finetuner.model)

# ==================== INFERENCIA EN IMAGEN PERSONALIZADA ====================
def predict_and_visualize(image_path, model, feature_extractor):
    """Proceso completo para una imagen personalizada"""
    # 1. Cargar y preprocesar imagen
    image = Image.open(image_path).convert("RGB")
    inputs = feature_extractor(images=image, return_tensors="pt")

    # 2. Predicción
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits if hasattr(outputs, 'logits') else outputs[0]

    # 3. Postprocesamiento
    upsampled_logits = nn.functional.interpolate(
        logits,
        size=image.size[::-1],  # (height, width)
        mode="bilinear",
        align_corners=False
    )
    pred_mask = upsampled_logits.argmax(dim=1).squeeze().cpu().numpy()

    # 4. Visualización
    fig, ax = plt.subplots(1, 2, figsize=(15, 7))
    ax[0].imshow(image)
    ax[0].set_title("Imagen Original")
    ax[1].imshow(prediction_to_vis(pred_mask))
    ax[1].set_title("Segmentación Predicha")
    plt.show()

    return pred_mask

# ==================== INTERFAZ PARA USUARIO ====================
def run_inference():
    """Función para cargar imagen y mostrar resultados"""
    # Subir imagen (en Colab)
    uploaded = files.upload()
    if uploaded:
        image_path = next(iter(uploaded))
        print(f"\nProcesando imagen: {image_path}")

        # Realizar predicción
        pred_mask = predict_and_visualize(
            image_path,
            segformer_finetuner.model,
            feature_extractor
        )

        # Opcional: Guardar resultado
        result_img = prediction_to_vis(pred_mask)
        result_path = "resultado_segmentacion.png"
        result_img.save(result_path)
        print(f"Resultado guardado como: {result_path}")
        return result_img
    else:
        print("No se subió ninguna imagen")
        return None

# Ejecutar interfaz (descomentar para usar)
# result_image = run_inference()
# if result_image:
#     display(result_image)

# ==================== FUNCIONES ADICIONALES ====================
def create_prediction_overlay(image_path, mask):
    """Crea una superposición de la imagen original con la máscara"""
    original_img = Image.open(image_path).convert("RGBA")
    mask_img = prediction_to_vis(mask).convert("RGBA")
    mask_img = mask_img.resize(original_img.size)

    # Ajustar transparencia
    overlay = Image.blend(original_img, mask_img, alpha=0.5)
    return overlay

# Ejemplo de uso:
# overlay = create_prediction_overlay("mi_imagen.jpg", pred_mask)
# display(overlay)