In [None]:
!git clone https://github.com/isl-org/DPT.git

# Download models and weights
#!wget https://github.com/intel-isl/DPT/releases/download/1_0/dpt_hybrid-midas-501f0c75.pt
#!wget https://github.com/intel-isl/DPT/releases/download/1_0/dpt_large-midas-2f21e586.pt
#!wget https://github.com/intel-isl/DPT/releases/download/1_0/dpt_hybrid-ade20k-53898607.pt
!wget https://github.com/intel-isl/DPT/releases/download/1_0/dpt_large-ade20k-b12dca68.pt
    
# Import weights
#!mv ./dpt_hybrid-ade20k-53898607.pt ./DPT/weights
!mv ./dpt_large-ade20k-b12dca68.pt ./DPT/weights
#!mv ./dpt_large-midas-2f21e586.pt ./DPT/weights
#!mv ./dpt_hybrid-midas-501f0c75.pt ./DPT/weights

# Pip install required libraries with last releases
!pip install torch
!pip install torchvision
!pip install opencv-python
!pip install timm

In [None]:
!ls

In [None]:
!cd ../input/dataset-v1/d1/ && ls

In [None]:
import os 
def initialize_system():
    # Generate output directory
    if(not(os.path.isdir('/kaggle/working/output'))):
        os.mkdir('/kaggle/working/output')
    
    filename = "/kaggle/working/DPT/run_monodepth.py"
    text = open(filename).read()
    open(filename, "w+").write(text.replace('"output_monodepth"', '"/kaggle/working/DPT/output_monodepth"'))

    filename = "/kaggle/working/DPT/run_segmentation.py"
    text = open(filename).read()
    open(filename, "w+").write(text.replace('"output_semseg"', '"/kaggle/working/DPT/output_semseg"'))
    
    filename = "/kaggle/working/DPT/run_monodepth.py"
    text = open(filename).read()

    #Here goes your files
    open(filename, "w+").write(text.replace('"input"', '"/kaggle/working/DPT/input/"'))

    filename = "/kaggle/working/DPT/run_segmentation.py"
    text = open(filename).read()

    #Here goes your files
    open(filename, "w+").write(text.replace('"input"', '"/kaggle/working/DPT/input/"'))
    
    filename = "/kaggle/working/DPT/run_monodepth.py"
    text = open(filename).read()
    open(filename, "w+").write(text.replace('"weights/', '"/kaggle/working/DPT/weights/'))

    filename = "/kaggle/working/DPT/run_segmentation.py"
    text = open(filename).read()
    open(filename, "w+").write(text.replace('"weights/', '"/kaggle/working/DPT/weights/'))

In [None]:
initialize_system()

In [None]:
with open("DPT/dpt/base_model.py", "r") as f:
    code = f.read()

code = code.replace(
    "self.load_state_dict(parameters)",
    """own_state = self.state_dict()
        filtered = {k: v for k, v in parameters.items() if k in own_state and v.shape == own_state[k].shape}
        print(f"Cargando {len(filtered)} de {len(own_state)} parámetros del checkpoint.")
        self.load_state_dict(filtered, strict=False)"""
)


with open("DPT/dpt/base_model.py", "w") as f:
    f.write(code)

In [None]:
# Leer contenido original
with open("DPT/dpt/models.py", "r") as f:
    code = f.read()

# Añadir el nuevo método seguro al final del archivo si no existe
if "def load_partial_weights" not in code:
    code += """

    def load_partial_weights(self, path):
        parameters = torch.load(path, map_location="cpu")
        if "model" in parameters:
            parameters = parameters["model"]

        own_state = self.state_dict()
        filtered = {
            k: v for k, v in parameters.items()
            if k in own_state and v.shape == own_state[k].shape
        }

        print(f"Cargando {len(filtered)} de {len(own_state)} parámetros del checkpoint.")
        self.load_state_dict(filtered, strict=False)
"""



# Guardar cambios
with open("DPT/dpt/models.py", "w") as f:
    f.write(code)

print("Archivo 'models.py' modificado exitosamente.")


!cat DPT/dpt/models.py

In [None]:
!cat DPT/dpt/base_model.py

In [None]:
# ===============================
# IMPORTS
# ===============================
import os
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader,random_split
from tqdm import tqdm
import matplotlib.pyplot as plt
import albumentations as A
from albumentations.pytorch import ToTensorV2
from sklearn.metrics import f1_score
from DPT.dpt.models import DPTSegmentationModel

In [None]:
# ===============================
# DATASET CON ALBUMENTATIONS
# ===============================
class RoadSignSegmentationDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.samples = []
        self.transform = transform

        subdirs = [os.path.join(root_dir, d) for d in os.listdir(root_dir)
                   if os.path.isdir(os.path.join(root_dir, d))]

        for subdir in subdirs:
            for fname in os.listdir(subdir):
                if fname.endswith("_img.png"):
                    base = fname.replace("_img.png", "")
                    img_path = os.path.join(subdir, f"{base}_img.png")
                    mask_path = os.path.join(subdir, f"{base}_label.png")
                    label_names_path = os.path.join(subdir, f"{base}_label_names.txt")

                    if os.path.exists(mask_path):
                        label_names = []
                        if os.path.exists(label_names_path):
                            with open(label_names_path, 'r') as f:
                                label_names = [line.strip() for line in f.readlines()]

                        self.samples.append({
                            "image": img_path,
                            "mask": mask_path,
                            "labels": label_names
                        })

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        sample = self.samples[idx]
        image = np.array(Image.open(sample["image"]).convert("RGB"))
        mask = np.array(Image.open(sample["mask"]))

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented["image"]
            mask = augmented["mask"].long()

        return image, mask, sample["labels"]

In [None]:
# ===============================
# TRANSFORMACIONES
# ===============================
train_transform = A.Compose([
    A.Resize(512, 512),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
    A.GaussNoise(p=0.2),
    A.Affine(
        scale=(0.8, 1.2),
        translate_percent={"x": 0.05, "y": 0.05},
        rotate=(-15, 15),
        p=0.5
    ),
    A.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ToTensorV2()
])

In [None]:
# ===============================
# MÉTRICAS
# ===============================
def compute_iou(pred, target, num_classes=2):
    ious = []
    for cls in range(num_classes):
        pred_inds = pred == cls
        target_inds = target == cls
        intersection = (pred_inds & target_inds).sum()
        union = (pred_inds | target_inds).sum()
        if union == 0:
            ious.append(np.nan)
        else:
            ious.append(intersection / union)
    return np.nanmean(ious)

def pixel_accuracy(pred, target):
    correct = (pred == target).sum()
    total = target.size
    return correct / total


In [None]:
# ===============================
# DATASET
# ===============================
dataset = RoadSignSegmentationDataset(
    root_dir="../input/dataset-v2/d2/",
    transform=train_transform
)

In [None]:
for image, mask, labels in dataset:
    print(labels)
    break

In [None]:
# ===============================
# CONFIGURACIÓN
# ===============================
NUM_CLASSES = 4
BATCH_SIZE = 4
NUM_EPOCHS = 15
LEARNING_RATE = 1e-4
VAL_SPLIT = 0.2
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ===============================
# DATALOADER
# ===============================
val_size = int(VAL_SPLIT * len(dataset))
train_size = len(dataset) - val_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

print(len(train_dataset))
print(len(val_dataset))

In [None]:
# ===============================
# MODELO
# ===============================
model = DPTSegmentationModel(
    num_classes=NUM_CLASSES,
    backbone="vitl16_384",
    readout="project",
    features=256,
    use_bn=True
)

model.load_partial_weights("DPT/weights/dpt_large-ade20k-b12dca68.pt")
model.to(DEVICE)

In [None]:
# ===============================
# CONGELAR BACKBONE
# ===============================
for param in model.pretrained.parameters():
    param.requires_grad = False

In [None]:
print(DEVICE)

In [None]:
# ===============================
# ENTRENAMIENTO (CONGELADO)
# ===============================
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

metrics = {
    "train_loss": [], "train_iou": [], "train_acc": [], "train_f1": [],
    "val_loss": [], "val_iou": [], "val_acc": [], "val_f1": []
}

for epoch in range(NUM_EPOCHS):
    model.train()
    running_loss = 0.0
    train_ious, train_accs, train_f1s = [], [], []

    for images, masks, _ in tqdm(train_loader, desc=f"[Train] Epoch {epoch+1}"):
        images, masks = images.to(DEVICE), masks.to(DEVICE)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

        preds = torch.argmax(outputs, dim=1)
        for p, t in zip(preds, masks):
            p_np, t_np = p.cpu().numpy(), t.cpu().numpy()
            train_ious.append(compute_iou(p_np, t_np))
            train_accs.append(pixel_accuracy(p_np, t_np))
            train_f1s.append(f1_score(t_np.flatten(), p_np.flatten(), average="macro"))

    metrics["train_loss"].append(running_loss / len(train_loader))
    metrics["train_iou"].append(np.nanmean(train_ious))
    metrics["train_acc"].append(np.mean(train_accs))
    metrics["train_f1"].append(np.mean(train_f1s))

    # ===============================
    # VALIDACIÓN
    # ===============================
    model.eval()
    val_loss = 0.0
    ious, accs, f1s = [], [], []

    with torch.no_grad():
        for images, masks, _ in val_loader:
            images, masks = images.to(DEVICE), masks.to(DEVICE)
            outputs = model(images)
            loss = criterion(outputs, masks)
            val_loss += loss.item()

            preds = torch.argmax(outputs, dim=1)
            for p, t in zip(preds, masks):
                p_np, t_np = p.cpu().numpy(), t.cpu().numpy()
                ious.append(compute_iou(p_np, t_np))
                accs.append(pixel_accuracy(p_np, t_np))
                f1s.append(f1_score(t_np.flatten(), p_np.flatten(), average="macro"))

    metrics["val_loss"].append(val_loss / len(val_loader))
    metrics["val_iou"].append(np.nanmean(ious))
    metrics["val_acc"].append(np.mean(accs))
    metrics["val_f1"].append(np.mean(f1s))

    print(f"[Train] Epoch {epoch+1} | Loss: {metrics['train_loss'][-1]:.4f} | IoU: {metrics['train_iou'][-1]:.4f} | Acc: {metrics['train_acc'][-1]:.4f} | F1: {metrics['train_f1'][-1]:.4f}")
    print(f"[Val]   Epoch {epoch+1} | Loss: {metrics['val_loss'][-1]:.4f} | IoU: {metrics['val_iou'][-1]:.4f} | Acc: {metrics['val_acc'][-1]:.4f} | F1: {metrics['val_f1'][-1]:.4f}")

    scheduler.step()

# ===============================
# GUARDAR MODELO
# ===============================
os.makedirs("checkpoints", exist_ok=True)
torch.save(model.state_dict(), "checkpoints/dpt_finetuned.pt")
print("Modelo guardado en checkpoints/dpt_finetuned.pt")

# ===============================
# GRÁFICAS
# ===============================
plt.figure(figsize=(12, 8))
plt.plot(metrics["train_loss"], label="Train Loss")
plt.plot(metrics["val_loss"], label="Val Loss")
plt.plot(metrics["train_iou"], label="Train IoU")
plt.plot(metrics["val_iou"], label="Val IoU")
plt.plot(metrics["train_acc"], label="Train Accuracy")
plt.plot(metrics["val_acc"], label="Val Accuracy")
plt.plot(metrics["train_f1"], label="Train F1")
plt.plot(metrics["val_f1"], label="Val F1")
plt.xlabel("Epoch")
plt.ylabel("Valor")
plt.title("Métricas de Entrenamiento y Validación")
plt.grid(True)
plt.legend()
plt.savefig("checkpoints/metrics_plot.png")
plt.show()

In [None]:
import matplotlib.pyplot as plt
import os

# Asegura carpeta de salida
os.makedirs("checkpoints/metric_plots", exist_ok=True)

def plot_metric(train_values, val_values, title, ylabel, filename):
    plt.figure(figsize=(10, 6))
    plt.plot(train_values, label="Train")
    plt.plot(val_values, label="Validation")
    plt.xlabel("Epoch")
    plt.ylabel(ylabel)
    plt.title(title)
    plt.legend()
    plt.grid(True)
    plt.savefig(f"checkpoints/metric_plots/{filename}.png")
    plt.show()

# 1. Pérdida (Loss) para detectar overfitting/underfitting
plot_metric(
    metrics["train_loss"],
    metrics["val_loss"],
    "Evolución de la Pérdida - ¿Hay overfitting o underfitting?",
    "CrossEntropy Loss",
    "loss_comparison"
)

# 2. IoU (Intersección sobre Unión) - Métrica clave para segmentación
plot_metric(
    metrics["train_iou"],
    metrics["val_iou"],
    "Evolución del IoU - Precisión espacial de la segmentación",
    "IoU",
    "iou_comparison"
)

# 3. Pixel Accuracy - Proporción de píxeles correctamente clasificados
plot_metric(
    metrics["train_acc"],
    metrics["val_acc"],
    "Exactitud por píxel - ¿Qué tan bien clasifica cada píxel?",
    "Pixel Accuracy",
    "accuracy_comparison"
)

# 4. F1 Score - Balance entre precisión y recall
plot_metric(
    metrics["train_f1"],
    metrics["val_f1"],
    "Evolución del F1 Score - Equilibrio entre precisión y cobertura",
    "F1 Score",
    "f1_comparison"
)


In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import numpy as np
import matplotlib.pyplot as plt

def plot_confusion_matrix(model, dataloader, device, num_classes=2, normalize=True, title="Matriz de Confusión"):
    all_preds = []
    all_targets = []

    model.eval()
    with torch.no_grad():
        for images, masks, _ in dataloader:
            images, masks = images.to(device), masks.to(device)
            outputs = model(images)
            preds = torch.argmax(outputs, dim=1)

            all_preds.extend(preds.cpu().numpy().flatten())
            all_targets.extend(masks.cpu().numpy().flatten())

    cm = confusion_matrix(all_targets, all_preds, labels=list(range(num_classes)), normalize='true' if normalize else None)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[f"Clase {i}" for i in range(num_classes)])
    disp.plot(cmap=plt.cm.Blues)
    plt.title(title)
    plt.grid(False)
    plt.savefig("checkpoints/confusion_matrix.png")
    plt.show()

plot_confusion_matrix(model, val_loader, DEVICE, num_classes=2)


In [None]:
# ===============================
# EVALUADOR DE MODELO
# ===============================
def evaluate_model(model_path, dataloader, device, num_classes=2, pretrained=True):
    if pretrained:
        model = DPTSegmentationModel(
            num_classes=num_classes,
            path=model_path,
            backbone="vitl16_384",
        )
    else:
        model = DPTSegmentationModel(
            num_classes=num_classes,
            path=None,
            backbone="vitl16_384",
        )
        model.load_partial_weights(model_path)

    model.to(device)
    model.eval()

    iou_scores = []
    pixel_accuracies = []
    f1_scores = []

    with torch.no_grad():
        for images, masks, labels in tqdm(dataloader, desc=f"Evaluando {model_path}"):
            images, masks = images.to(device), masks.to(device)
            outputs = model(images)
            preds = torch.argmax(outputs, dim=1)

            for p, t in zip(preds, masks):
                p_np = p.cpu().numpy()
                t_np = t.cpu().numpy()
                iou_scores.append(compute_iou(p_np, t_np, num_classes))
                pixel_accuracies.append(pixel_accuracy(p_np, t_np))
                f1_scores.append(f1_score(t_np.flatten(), p_np.flatten(), average="macro"))

    return {
        "IoU": np.nanmean(iou_scores),
        "Pixel Accuracy": np.mean(pixel_accuracies),
        "F1 Score": np.mean(f1_scores)
    }

In [None]:
eval_transform = A.Compose([
    A.Resize(512, 512),
    A.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ToTensorV2()
])

DATASET_PATH = "../input/dataset-v1/d1/"

eval_dataset = RoadSignSegmentationDataset(DATASET_PATH, transform=eval_transform)
eval_loader = DataLoader(eval_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Evaluación
print("Evaluando modelo preentrenado...")
results_pre = evaluate_model("DPT/weights/dpt_large-ade20k-b12dca68.pt", eval_loader, DEVICE, NUM_CLASSES, pretrained=True)
print("Modelo Preentrenado:", results_pre)

print("Evaluando modelo fine-tuned...")
results_fine = evaluate_model("checkpoints/dpt_finetuned.pt", eval_loader, DEVICE, NUM_CLASSES, pretrained=False)
print("Modelo Fine-Tuned:", results_fine)


In [None]:
# ================================================
# INTERACTIVO: ELECCIÓN DE MODELO + SUBIDA DE IMAGEN
# ================================================
from IPython.display import display
from ipywidgets import Dropdown, FileUpload
import io
from PIL import Image
import torch
import torchvision.transforms as T
import numpy as np
import matplotlib.pyplot as plt

# -------- Dropdown para elegir modelo ----------
model_selector = Dropdown(
    options=[('Fine-tuned (model)', 'model'), ('Preentrenado (model2)', 'model2')],
    value='model',
    description='Modelo:',
)
display(model_selector)

# -------- Subida de imagen ----------
uploader = FileUpload(accept='image/*', multiple=False)
display(uploader)


In [None]:
# ================================================
# PROCESAMIENTO Y PREDICCIÓN - VERSIÓN ROBUSTA
# ================================================

if uploader.value:
    # Detectar tipo de estructura
    if isinstance(uploader.value, dict):
        uploaded_file = list(uploader.value.values())[0]
    elif isinstance(uploader.value, tuple):
        uploaded_file = uploader.value[0]
    else:
        raise ValueError("Formato de archivo no reconocido")

    # Obtener imagen
    image_bytes = uploaded_file['content']
    image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
    orig_image = image.copy()

    # Transformación
    transform = T.Compose([
        T.Resize((512, 512)),
        T.ToTensor(),
        T.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ])
    input_tensor = transform(image).unsqueeze(0).to(DEVICE)

    # Selección de modelo
    selected_model = model if model_selector.value == 'model' else model2
    selected_model.to(DEVICE)
    selected_model.eval()

    with torch.no_grad():
        output = selected_model(input_tensor)
        pred_mask = torch.argmax(output, dim=1).squeeze().cpu().numpy()

    # Visualización
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.imshow(orig_image)
    plt.title("Imagen Original")
    plt.axis("off")

    plt.subplot(1, 2, 2)
    plt.imshow(pred_mask, cmap="gray")
    plt.title("Máscara Predicha")
    plt.axis("off")

    plt.tight_layout()
    plt.show()

else:
    print("⬆️ Subí una imagen para visualizar la predicción.")


In [None]:
# Descongelar todo
for param in model.parameters():
    param.requires_grad = True

# O: solo descongelar encoder
for param in model.pretrained.parameters():
    param.requires_grad = True

# Nuevo optimizador con lr más bajo
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)
