In [12]:
# PyTorch
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F # FFFFF

# Data loading
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split
from torch.utils.data.sampler import SubsetRandomSampler

# Auxiliary functions
from torch.utils.tensorboard import SummaryWriter  # Used for Tensorboard logging
import os
import numpy as np
import matplotlib.pyplot as plt
from math import floor, ceil
import datetime

import json
import dataset_semseg

# Carga de dataloaders baja resolucion, 10 clases

Leemos las imagenes y sus mascaras con el objeto proporcionado

In [None]:
train=dataset_semseg.SupermarketSemSeg("dataset_res_144_192_10classes/train/images","dataset_res_144_192_10classes/train/masks")
test_dataset=dataset_semseg.SupermarketSemSeg("dataset_res_144_192_10classes/test/images","dataset_res_144_192_10classes/test/masks")

Separamos train en train/validacion 80/20.Con semilla para obtener siempre la misma particion

In [None]:
total_size = len(train)
train_size = int(0.8 * total_size)
val_size = total_size - train_size

generator = torch.Generator().manual_seed(42)

train_dataset, val_dataset = random_split(
    train, 
    [train_size, val_size], 
    generator=generator
)

Creamos los dataloaders

In [None]:
batch_size = 35  # Tamaño de lotes
num_workers = 0  # Controla cuántos procesos cargan datos en paralelo (lo dejaremos a 0 para ahorrar recursos)

train_loader = DataLoader(
    dataset=train_dataset,      
    batch_size=batch_size,
    shuffle=True,
    num_workers=num_workers
)

val_loader = DataLoader(
    dataset=val_dataset,        
    batch_size=batch_size,
    shuffle=True,
    num_workers=num_workers
)

test_loader = DataLoader(
    dataset=test_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=num_workers
)

# Segmentación de imagen mediante arquitectura U-Net

In [2]:
PATH_ROOT = os.path.join('.')
# Ruta para datos:
PATH_DATA = os.path.join(PATH_ROOT, 'data')
# Ruta para modelos:
PATH_MODELS = os.path.join(PATH_ROOT, 'reports', 'models')
# Ruta para resultados:
PATH_RESULTS = os.path.join(PATH_ROOT, 'reports', 'results')
# Ruta para ejecuciones:
PATH_RUNS = os.path.join(PATH_ROOT, 'reports', 'runs')

In [3]:
# Para cada sesión creamos un directorio nuevo, a partir de la fecha y hora de su ejecución:
date = datetime.datetime.now()
test_name = str(date.year) + '_' + str(date.month) + '_' +  str(date.day) + '__' + str(date.hour) + '_' + str(date.minute)
print('Nombre del directorio de pruebas: {}'.format(test_name))
models_folder = os.path.join(PATH_MODELS, test_name)
try:
    os.makedirs(models_folder)
except:
    print(f'Folder {models_folder} already existed.')
results_folder = os.path.join(PATH_RESULTS, test_name)
try:
    os.makedirs(results_folder)
except:
    print(f'Folder {results_folder} already existed.')
runs_folder = os.path.join(PATH_RUNS, test_name)
try:
    os.makedirs(runs_folder)
except:
    print(f'Folder {runs_folder} already existed.')


Nombre del directorio de pruebas: 2025_11_19__10_0


In [10]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [22]:
def leer_mapeado(ruta):
    with open(ruta, 'r') as f:
        data = json.load(f)
    clases = {}
    for i in range(len(data)):
        clases[data[i]['index']] = data[i]['name']
    return clases
leer_mapeado('dataset_res_144_192_10classes/label_mapping_10classes.json')

{0: 'background',
 1: 'coca_cola_05',
 2: 'koelln_muesli_schoko',
 3: 'apple_granny_smith',
 4: 'banana_single',
 5: 'oranges',
 6: 'pasta_reggia_spaghetti',
 7: 'gepa_bio_und_fair_fencheltee',
 8: 'cucumber',
 9: 'carrot',
 10: 'lettuce'}

## Definición del modelo

Como vamos a repetir la estructura de Convolución, Convolución, pooling, hacemos una clase para ello

In [5]:
class dobleConvolucionMaxPool(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.convolucion_1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)  
        self.convolucion_2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
    
    def forward(self, x):
        x = torch.nn.functional.relu(self.convolucion_1(x))
        x = torch.nn.functional.relu(self.convolucion_2(x))
        skip_connection = x
        x = self.pool(x)
        return x, skip_connection

Lo mismo para hacer las convoluciones y deconvolución

In [6]:
class deconvolucionDobleConvolucion(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.deconvolucion = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)
        self.convolucion_1 = nn.Conv2d(out_channels * 2, out_channels, kernel_size=3, padding=1)  # *2 porque concatenamos
        self.convolucion_2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        
    
    def forward(self, x, skip_connection):
        x = self.deconvolucion(x)
        x = torch.nn.functional.relu(self.convolucion_1(torch.cat([x, skip_connection], dim=1)))
        x = torch.nn.functional.relu(self.convolucion_2(x))
        return x

In [7]:
def calcular_num_filtros(num_base_filtros, num_niveles):
    filtros = []
    for i in range(num_niveles):
        filtros.append(num_base_filtros * (2**i)) # porque en cada nivel quermos duplicar el num de filtros
    return filtros

In [None]:
class MLP(nn.Module):
    def __init__(self, in_dim, hidden_sizes=[10, 10], bias=True):
        super().__init__()
        self.capa_1 = nn.Linear(in_dim, hidden_sizes[0], bias=bias, device=device)
        self.capa_2 = nn.Linear(hidden_sizes[0], hidden_sizes[1], bias=bias, device=device)

    def forward(self, x):
        x = torch.nn.functional.relu(self.capa_1(x))
        x = self.capa_2(x) 
        return x

In [None]:
class UnetModel(nn.Module):

    def __init__(self, num_base_filtros = 64, num_niveles = 4, num_clases = 10):
        super().__init__()
        self.encoders = nn.ModuleList()
        self.decoders = nn.ModuleList()
        self.filtros = calcular_num_filtros(num_base_filtros, num_niveles + 1)  # [64, 128, 256, 512, 1024] por defecto. Podemos aumentar el número de niveles para comprobar rendimiento
        for i in range(num_niveles):
            self.encoders.append(dobleConvolucionMaxPool(in_channels=self.filtros[i], out_channels=self.filtros[i+1]))
            self.decoders.append(deconvolucionDobleConvolucion(in_channels=self.filtros[num_niveles-i], out_channels=self.filtros[num_niveles-1-i])) # Queremos ir al revés
        self.cuello_botella = nn.Conv2d(self.filtros[-1], self.filtros[-1], kernel_size=3, padding=1)
        #self.cuello_botella = MLP(self.filtros[-1], [4096, 4096]) # IMPORTANTE: preguntar si usar modelo de diapositivas o del pdf (lo mismo para el unpooling/deconvolución)
        self.convolucion_final = nn.Conv2d(self.filtros[0], num_clases+1, kernel_size=1) # +1 por el background
    
    def forward(self, X):
        skip_connections = []
        for encoder in self.encoders:
            X, skip_connection = encoder(X)
            skip_connections.append(skip_connection)
        #X_shape = X.shape
        #X = X.flatten(1)
        X = self.cuello_botella(X)
        #X = X.reshape(X_shape)
        for i, decoder in enumerate(self.decoders):
            X = decoder(X, skip_connections[len(skip_connections)-1-i])
        X = self.convolucion_final(X)
        return X

# Entrenamiento del modelo

Durante el entrenamiento, para saber si este ha sido fructuoso, registraremos las siguientes métricas

- Salida del modelo para algunas imágenes
- Loss durante el entrenamiento
- Accuracy durante el entrenamiento

In [None]:
def train(model, train_loader, criterion, optimizer, summary_writer, val_loader=None, num_epochs=20, device='cuda'):

    # Listas para generar logs durante el entrenamiento:
    train_acc = []  
    train_loss = []
    if val_loader is not None:
        val_acc = []
        val_loss = []

    # Bucle de entrenamiento:
    for epoch in range(num_epochs):
        running_loss = 0.0  # Acumulamos el valor de coste obtenido tras cada epoch
        count_evaluated = 0
        count_correct = 0
        for batch_idx, data in enumerate(train_loader, 0):    
            model.train()  
            inputs, labels = data[0].to(device), data[1].to(device)  
            optimizer.zero_grad()
            imgs, masks = inputs 
            imagen_filtrada = torch.where(masks > 0, imgs, 0)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            ### Fase de log ###
            running_loss += loss.item()  # Acumulamos el error obtenido para utilizarlo
                # a la hora de generar logs del proceso.
            # Contamos el número de ejemplos evaluados y acertados:
            count_evaluated += inputs.shape[0]
            count_correct += torch.sum(labels == torch.max(outputs, dim=1)[1])
        if outputs: # Guardamos la predicción de un batch cada epoch
            summary_writer.add_histogram("predicciones", outputs.cpu(), epoch)
        # Log del valor de la función de coste y accuracy
        print('Training: [%d, %5d] loss: %.3f' % (epoch + 1, batch_idx + 1, running_loss / (batch_idx+1)))
        train_loss.append(running_loss / (batch_idx+1))
        # Almacenamos la accuracy al final de la epoch (en train)
        train_acc.append(float(count_correct) / count_evaluated)

        summary_writer.add_scalar("coste_train", train_loss[-1], epoch)
        summary_writer.add_scalar("acc_train", train_acc[-1], epoch)
        
        ### Fase de validación ### 
        if val_loader is not None:
            running_loss_val = 0.0
            count_evaluated = 0
            count_correct = 0
            model.eval()
            with torch.no_grad():
                for val_batch_idx, data_val in enumerate(val_loader, 0):
                    inputs_val, labels_val = data_val[0].to(device), data_val[1].to(device)
                    outputs_val = model(inputs_val)
                    loss = criterion(outputs_val, labels_val)
                    running_loss_val += loss.item()
                    count_evaluated += inputs_val.shape[0]
                    count_correct += torch.sum(labels_val == torch.max(outputs_val, dim=1)[1])
                # Presentamos el resumen de la validación de la epoch:
                val_loss.append(running_loss_val / (val_batch_idx + 1))
                acc_val = float(count_correct) / count_evaluated
                
                summary_writer.add_scalar("acc_val", acc_val, epoch)
                print('Validation: epoch %d - acc: %.3f' %
                            (epoch + 1, acc_val))
                val_acc.append(acc_val)
        for name, param in model.named_parameters():
            summary_writer.add_histogram(f"param_{name}", param, epoch)
            if param.grad is not None:
                summary_writer.add_histogram(f"grad_{name}", param.grad, epoch)

    # Devolvemos, tanto el modelo entrenado, como el diccionario con las estadísticas del entrenamiento
    return model

In [None]:
def test(model, test_loader, criterion, device='cuda'):
    with torch.no_grad():
        number_samples = 0
        number_correct = 0
        running_loss_test = 0.0
        for test_batch_idx, data_test in enumerate(test_loader, 0):
            inputs_test, labels_test = data_test[0].to(device), data_test[1].long().to(device)
            imgs, masks = inputs_test
            outputs_test = model(inputs_test)
            loss = criterion(outputs_test, labels_test)
            running_loss_test += loss.cpu().numpy()
            # Accuracy:
            _, outputs_class = torch.max(outputs_test, dim=1)
            number_correct += torch.sum(outputs_class == labels_test).cpu().numpy()
            number_samples += len(labels_test)
        acc_test = number_correct / number_samples
        print('Test - Accuracy: %.3f' % acc_test)
        print('Test - CrossEntropy: %.3f' % (running_loss_test / (test_batch_idx+1)))

Creamos el modelo, usamos un optimizador arbitrario y lo entrenamos para el dataset con 10 clases.

In [None]:
model = UnetModel(num_base_filtros=64, num_clases=10, num_niveles=4)
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=0.0001)
writer = torch.utils.tensorboard.SummaryWriter(log_dir=PATH_RUNS)
model = train(model, train_loader, criterion, optimizer, writer, val_loader = val_loader, device=device)

## Funcion de coste

## Optimizador

### Evolución de factor de aprendizaje