In [2]:
import os
import numpy as np
from PIL import Image
import scipy.io
from datasets import load_dataset

num_samples = 200
mat_data = scipy.io.loadmat("classMapping40.mat")
old_map = mat_data["mapClass"].squeeze()  
old_map = old_map - 1 
padded_map = np.zeros(895, dtype=old_map.dtype)
padded_map[:894] = old_map
padded_map[894] = 39
class_mapping = padded_map

dataset = load_dataset(
    "0jl/NYUv2",
    trust_remote_code=True,
    split="train",
)

# Seleziona num_samples indici casuali e distinti nell'intervallo da 0 a 1448
indices = np.random.choice(range(1448), size=num_samples, replace=False)
subset = dataset.select(indices)

os.makedirs("rgb_images", exist_ok=True)
os.makedirs("depth_maps", exist_ok=True)
os.makedirs("labels", exist_ok=True)

rgb_count = 0
depth_count = 0
label_count = 0

for idx, example in enumerate(subset):
    try:
        image_pil = example["image"]  
        image_pil.save(f"rgb_images/rgb_{idx}.png")
        rgb_count += 1

        depth_map = example["depth"]  
        depth_norm = (depth_map / np.max(depth_map) * 255).astype(np.uint8)
        depth_image = Image.fromarray(depth_norm)
        depth_image.save(f"depth_maps/depth_{idx}.png")
        depth_count += 1

        label_map = example["label"] 
        label_map_40 = class_mapping[label_map] 
        np.save(f"labels/label_{idx}.npy", label_map_40)
        label_count += 1

    except Exception as e:
        print(f"Errore al salvataggio del dato idx={idx}: {e}")

print("Data extraction completed!")
print(f"Total RGB images saved: {rgb_count}")
print(f"Total depth maps saved: {depth_count}")
print(f"Total labels saved: {label_count}")

Data extraction completed!
Total RGB images saved: 200
Total depth maps saved: 200
Total labels saved: 200


In [1]:
import os
import glob
import numpy as np
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

# Definizione della U-Net

class DoubleConv(nn.Module):
    """
    Modulo che esegue due convoluzioni consecutive con kernel 3, ciascuna seguita da
    Batch Normalization e attivazione ReLU.
    """
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )
        
    def forward(self, x):
        return self.double_conv(x)

class Down(nn.Module):
    """
    Modulo per la fase di downsampling: max pooling seguito da doppia convoluzione.
    """
    def __init__(self, in_channels, out_channels):
        super(Down, self).__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(kernel_size=2),
            DoubleConv(in_channels, out_channels)
        )
        
    def forward(self, x):
        return self.maxpool_conv(x)

class Up(nn.Module):
    """
    Modulo per l'upsampling. Può utilizzare l'interpolazione bilineare o la convoluzione trasposta.
    Dopo l'upsampling il modulo concatena la feature map proveniente dalla fase di encoder (skip connection)
    e applica una doppia convoluzione.
    """
    def __init__(self, in_channels, out_channels, bilinear=True):
        super(Up, self).__init__()
        if bilinear:
            self.up = nn.Sequential(
                nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
                nn.Conv2d(in_channels, in_channels // 2, kernel_size=1)
            )
        else:
            self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
            
        self.conv = DoubleConv(in_channels, out_channels)
        
    def forward(self, x1, x2):
        x1 = self.up(x1)
        # Aggiustamento dimensioni per la concatenazione
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]
        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)

class OutConv(nn.Module):
    """
    Strato finale che riduce il numero di canali all’output desiderato (40 classi).
    """
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)
        
    def forward(self, x):
        return self.conv(x)

class UNet(nn.Module):
    """
    Implementazione della U-Net con fase di encoder, decoder e relative skip connection.
    """
    def __init__(self, n_channels, n_classes, bilinear=True):
        super(UNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.bilinear = bilinear
        
        self.inc = DoubleConv(n_channels, 64)
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        factor = 2 if bilinear else 1
        self.down4 = Down(512, 1024 // factor)
        self.up1 = Up(1024, 512 // factor, bilinear)
        self.up2 = Up(512, 256 // factor, bilinear)
        self.up3 = Up(256, 128 // factor, bilinear)
        self.up4 = Up(128, 64, bilinear)
        self.outc = OutConv(64, n_classes)
        
    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits

# Definizione del Dataset

class SegmentationDataset(Dataset):
    """
    Dataset per la semantic segmentation che carica le immagini RGB da file PNG e
    le relative label da file .npy. Si assume che il nome delle immagini e delle label
    condivida lo stesso suffisso numerico (ad esempio: rgb_1.png e labels_1.npy).
    """
    def __init__(self, rgb_dir, labels_dir, transform=None, target_transform=None):
        super(SegmentationDataset, self).__init__()
        self.rgb_dir = rgb_dir
        self.labels_dir = labels_dir
        # Ordinamento dei file in modo che corrispondano
        self.rgb_files = sorted(glob.glob(os.path.join(rgb_dir, "rgb_*.png")))
        self.label_files = sorted(glob.glob(os.path.join(labels_dir, "labels_*.npy")))
        self.transform = transform
        self.target_transform = target_transform
        
        if len(self.rgb_files) != len(self.label_files):
            raise ValueError("Il numero di immagini RGB e di label non coincide.")
        
    def __len__(self):
        return len(self.rgb_files)
    
    def __getitem__(self, idx):
        # Caricamento immagine RGB
        rgb_path = self.rgb_files[idx]
        image = Image.open(rgb_path).convert("RGB")
        if self.transform is not None:
            image = self.transform(image)
        else:
            image = transforms.ToTensor()(image)
        
        # Caricamento label (file .npy)
        label_path = self.label_files[idx]
        label = np.load(label_path)
        # Convertiamo la label in tensore Long (necessario per la CrossEntropyLoss)
        label = torch.from_numpy(label).long()
        if self.target_transform is not None:
            label = self.target_transform(label)
        return image, label

if __name__ == '__main__':
    # Impostazioni: percorso delle cartelle e parametri di training
    rgb_dir = os.path.join("data", "rgb_images")
    labels_dir = os.path.join("data", "labels")
    num_classes = 40
    batch_size = 2
    num_epochs = 1  # Per esempio, una sola epoca per testare il funzionamento
    
    # Definizione delle trasformazioni:
    # Le immagini nativamente 480x640 vengono ridimensionate a 320x240 (240,320 in HxW)
    # e successivamente center-cropped a 304x228 (228,304 in HxW)
    transform = transforms.Compose([
        transforms.Resize((240, 320)),
        transforms.CenterCrop((228, 304)),
        transforms.ToTensor()
    ])
    
    # Creazione del dataset e del dataloader
    dataset = SegmentationDataset(rgb_dir, labels_dir, transform=transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    
    # Creazione del modello, definizione dell'ottimizzatore e della loss
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = UNet(n_channels=3, n_classes=num_classes, bilinear=True).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
    criterion = nn.CrossEntropyLoss()  # CrossEntropyLoss si aspetta label in [0, num_classes-1]
    
    # Ciclo di training di base
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for i, (images, labels) in enumerate(dataloader):
            images = images.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            print(f"Epoch [{epoch+1}/{num_epochs}], Batch [{i+1}/{len(dataloader)}], Loss: {loss.item():.4f}")
        print(f"Epoch [{epoch+1}] terminata. Loss media: {running_loss/len(dataloader):.4f}")
    
    print("Training completato.")

ValueError: num_samples should be a positive integer value, but got num_samples=0