In [1]:
import os
import cv2
import numpy as np
import pandas as pd
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import random
import torch
import shutil

In [2]:
class PreprocessDataset(Dataset):
    def __init__(self, dir_dataset, df_labels, dir_new_dataset, set_type="train", with_brightness_contrast=True, var_gaussian=10, amount = 0.02, prob_gen=0.4, partition=3): 
        self.with_brightness_contrast = with_brightness_contrast
        self.var_gaussian = var_gaussian
        self.amount = amount
        self.dir_dataset = dir_dataset
        self.prob_gen = prob_gen
        self.set_type = set_type
        self.dir_new_dataset = dir_new_dataset
        self.partition = partition
        self.rgb_to_index, self.index_to_label = self.dict_labels(df_labels)
        self.df_labels = df_labels
        self.dim_images = None
        self.num_images = None
        self.image_list = [] 
        self.label_list = [] 
        self.num_classes = len(self.df_labels)
        self.load_images_from_folder()
    def __len__(self):
        return len(self.image_list)
    
    def __getitem__(self, idx):
        # Carregar a imagem e o rótulo baseado no índice `idx`
        img_path = self.image_list[idx]
        lbl_path = self.label_list[idx]

        # Carregar a imagem e o rótulo do disco
        img = cv2.imread(img_path)
        lbl = cv2.imread(lbl_path)

        # Normalizar a imagem (opcional)
        img = img / 255.0

        # Converter os dados para tensores
        img = torch.tensor(img, dtype=torch.float32).permute(2, 0, 1)  # De (H, W, C) para (C, H, W)
        lbl = torch.tensor(lbl, dtype=torch.long).permute(2, 0, 1)  # De (H, W, C) para (C, H, W)

        return img, lbl

    def create_or_reset_directory(self):
        new_dir_img = os.path.join(self.dir_new_dataset, self.set_type)
        new_dir_lbl = os.path.join(self.dir_new_dataset, f"{self.set_type}_labels")
        
        # Verificar se o diretório de imagens já existe; se não, criar
        if not os.path.exists(new_dir_img):
            os.makedirs(new_dir_img)  # Cria o diretório se não existir
        
        # Verificar se o diretório de rótulos já existe; se não, criar
        if not os.path.exists(new_dir_lbl):
            os.makedirs(new_dir_lbl)  # Cria o diretório se não existir
        
    def load_images_from_folder(self):
        # Definir os caminhos originais dos datasets
        images_dir = os.path.join(self.dir_dataset, self.set_type)
        labels_dir = os.path.join(self.dir_dataset, f"{self.set_type}_labels")
        
        dim_images = self.load_images_and_labels(images_dir, labels_dir)
        self.dim_images = dim_images
            
    def load_images_and_labels(self, images_dir, labels_dir):
        self.create_or_reset_directory()

        # Listar arquivos nos diretórios
        image_files = sorted(os.listdir(images_dir))
        label_files = sorted(os.listdir(labels_dir))

        prob_to_transform = self.prob_gen
        img_lbl_files = zip(image_files, label_files)

        for img_name, lbl_name in img_lbl_files:
            transform_img = random.choices([True, False], weights=[prob_to_transform, 1 - prob_to_transform], k=1)[0]

            # Definir caminho para a imagem e rótulo
            img_path = os.path.join(images_dir, img_name)
            lbl_path = os.path.join(labels_dir, lbl_name)

            # Carregar imagem e rótulo
            img = cv2.imread(img_path)
            lbl = cv2.imread(lbl_path)

            # Reduzir a resolução da imagem
            img = img[::self.partition, ::self.partition, :]
            lbl = lbl[::self.partition, ::self.partition, :]

            # Converter de BGR para RGB
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            lbl = cv2.cvtColor(lbl, cv2.COLOR_BGR2RGB)

            # Aplicar Gaussian blur
            img = cv2.GaussianBlur(img, (3, 3), 0)

            # Transformação aleatória
            if transform_img:
                img_transform = img.copy()
                lbl_transform = lbl.copy()
                img_transform, lbl_transform, trans = self.apply_random_transformation(img_transform, lbl_transform)

                # Salvar imagem e rótulo transformado com sufixo de transformação
                transformed_img_name = f"{img_name}_{trans}.png"
                transformed_lbl_name = f"{lbl_name}_{trans}.png"

                processed_img_path = os.path.join(self.dir_new_dataset, self.set_type, transformed_img_name)
                processed_lbl_path = os.path.join(self.dir_new_dataset, f"{self.set_type}_labels", transformed_lbl_name)

                # Verifique se o arquivo já existe antes de salvar
                if not os.path.exists(processed_img_path):
                    cv2.imwrite(processed_img_path, img_transform)
                    cv2.imwrite(processed_lbl_path, lbl_transform)

                    # Adicionar as imagens e rótulos transformados às listas
                    self.image_list.append(processed_img_path)
                    self.label_list.append(processed_lbl_path)

            # Criar caminho para salvar a imagem transformada
            processed_img_path = os.path.join(self.dir_new_dataset, self.set_type, img_name)
            processed_lbl_path = os.path.join(self.dir_new_dataset, f"{self.set_type}_labels", lbl_name)

            cv2.imwrite(processed_img_path, img)
            cv2.imwrite(processed_lbl_path, lbl)
            
            # Adicionar as imagens e rótulos processados às listas
            self.image_list.append(processed_img_path)
            self.label_list.append(processed_lbl_path)

        return img.shape

    def apply_random_transformation(self, img, lbl):
        if self.with_brightness_contrast:
            random_transform = random.randint(0, 9)
        else:
            random_transform = random.randint(0, 4)
        if random_transform == 0:
            arg_1, arg_2 = self.augment(img, lbl, "rotation")
            return arg_1, arg_2, "rotation"
        elif random_transform == 1:
            arg_1, arg_2 = self.augment(img, lbl, "flip")
            return arg_1, arg_2, "flip"
        elif random_transform == 2:
            arg_1, arg_2 = self.augment(img, lbl, "flip_rotation")
            return arg_1, arg_2, "flip_rotation"
        elif random_transform == 3:
            arg_1, arg_2 = self.equalize_histogram(img, lbl)
            return arg_1, arg_2, "equalize_histogram"
        elif random_transform == 4:
            arg_1, arg_2 = self.add_noise(img, lbl, noise_type="gaussian")
            return arg_1, arg_2, "gaussian_noise"
        elif random_transform == 5:
            arg_1, arg_2 = self.add_noise(img, lbl, noise_type="salt_pepper")
            return arg_1, arg_2, "sal_pepper_noise"
        elif random_transform == 6:
            arg_1, arg_2 = self.adjust_brightness_contrast(img, lbl, brightness=0, contrast=50)
            return arg_1, arg_2, "b_c_0_50"
        elif random_transform == 7:
            arg_1, arg_2 = self.adjust_brightness_contrast(img, lbl, brightness=-10, contrast=30)
            return arg_1, arg_2, "b_c_-10_30"
        elif random_transform == 8:
            arg_1, arg_2 = self.adjust_brightness_contrast(img, lbl, brightness=60, contrast=60)
            return arg_1, arg_2, "b_c_60_60"
        else:
            arg_1, arg_2 = self.adjust_brightness_contrast(img, lbl, brightness=-20, contrast=0)
            return arg_1, arg_2, "b_c_-20_0"

    def rgb_to_label(self, mask):
        unique_colors = np.unique(mask.reshape(-1, mask.shape[2]), axis=0)
        label_mask = np.zeros((mask.shape[0], mask.shape[1]), dtype=np.int32)
        for color in unique_colors:
            color_tuple = tuple(color)
            if color_tuple in self.rgb_to_index:
                label = self.rgb_to_index[color_tuple]
                matches = np.all(mask == color, axis=-1)
                label_mask[matches] = label
        return label_mask

    def dict_labels(self, df_labels):
        rgb_to_index = {}
        index_to_label = {}
        for count, row in df_labels.iterrows():
            color = (row['r'], row['g'], row['b'])
            label = row['name']
            rgb_to_index[color] = count
            index_to_label[count] = label
        return rgb_to_index, index_to_label

    def augment(self, img, lbl, transform):
        if transform == "rotation":
            rotation = cv2.getRotationMatrix2D((img.shape[1] // 2, img.shape[0] // 2), 180, 1)
            rotated_image = cv2.warpAffine(img, rotation, (img.shape[1], img.shape[0]))
            rotated_label = cv2.warpAffine(lbl, rotation, (lbl.shape[1], lbl.shape[0]), flags=cv2.INTER_NEAREST)
            return rotated_image, rotated_label
        elif transform == "flip":
            flipped_image = cv2.flip(img, 1)
            flipped_label = cv2.flip(lbl, 1)
            return flipped_image, flipped_label
        elif transform == "flip_rotation":
            flipped_image = cv2.flip(img, 0)
            flipped_label = cv2.flip(lbl, 0)
            return flipped_image, flipped_label
        
   
    def equalize_histogram(self, img, lbl):
        img_yuv = cv2.cvtColor(img, cv2.COLOR_BGR2YUV)
        img_yuv[:, :, 0] = cv2.equalizeHist(img_yuv[:, :, 0])
        img = cv2.cvtColor(img_yuv, cv2.COLOR_YUV2BGR)
        return img, lbl
        
    def adjust_brightness_contrast(self, img, lbl, brightness=0, contrast=0):
        img = np.int16(img)
        img = img * (contrast / 127 + 1) - contrast + brightness
        img = np.clip(img, 0, 255)
        img = np.uint8(img)
        return img, lbl
    
    def add_noise(self, img, lbl, noise_type="gaussian"):
        if noise_type == "gaussian":
            mean = 0
            var = self.var_gaussian
            sigma = var ** 0.5
            gauss = np.random.normal(mean, sigma, img.shape)
            noisy = img + gauss
            noisy = np.clip(noisy, 0, 255)
            return noisy, lbl
    
        elif noise_type == "salt_pepper":
            s_vs_p = 0.5
            amount = self.amount
            out = np.copy(img)
    
            # Salt mode
            num_salt = np.ceil(amount * img.size * s_vs_p)
            coords = [np.random.randint(0, i, int(num_salt)) for i in img.shape]
            out[tuple(coords)] = 255
    
            # Pepper mode
            num_pepper = np.ceil(amount * img.size * (1.0 - s_vs_p))
            coords = [np.random.randint(0, i, int(num_pepper)) for i in img.shape]
            out[tuple(coords)] = 0
    
            return out, lbl

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class UNet(nn.Module):
    def __init__(self, image_dim, n_channels=64, n_classes=32, depth=5, conv_kernel_size=3, conv_stride=1, conv_padding=1, pool_kernel_size=2, pool_stride=2, pool_padding=0, transpose_kernel_size=3, transpose_stride=2, transpose_padding=1):
        super(UNet, self).__init__()

        self.image_dim = image_dim  # Dimensões da imagem de entrada (C, H, W)
        self.depth = depth 
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.conv_kernel_size = conv_kernel_size
        self.conv_stride = conv_stride
        self.conv_padding = conv_padding
        self.pool_kernel_size = pool_kernel_size
        self.pool_stride = pool_stride
        self.pool_padding = pool_padding
        self.transpose_kernel_size = transpose_kernel_size
        self.transpose_stride = transpose_stride
        self.transpose_padding = transpose_padding

        # Encoder
        self.encoders = nn.ModuleList([self.conv_block(3 if i == 0 else self.n_channels * (2 ** (i-1)), self.n_channels * (2 ** i)) for i in range(self.depth)])
        self.pool = nn.MaxPool2d(kernel_size=self.pool_kernel_size, stride=self.pool_stride, padding=self.pool_padding)

        # Bottleneck
        self.bottleneck = self.conv_block(self.n_channels * (2 ** (self.depth-1)), self.n_channels * (2 ** self.depth))

        # Decoder
        self.decoders = nn.ModuleList([self.conv_transpose(self.n_channels * (2 ** (i+2)), self.n_channels * (2 ** i)) for i in range(self.depth-2, -1, -1)])

        # Final conv layer
        self.final_conv = nn.Conv2d(self.n_channels, n_classes, kernel_size=1)

    def conv_block(self, in_channels, out_channels):
        # Camada convolucional com normalização e função de ativação; 2 vezes
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=self.conv_kernel_size, stride=self.conv_stride, padding=self.conv_padding),
            nn.BatchNorm2d(out_channels),  # Normalização para acelerar o treinamento
            nn.ReLU(inplace=True),  # Função de ativação (zera os valores negativos)
            nn.Conv2d(out_channels, out_channels, kernel_size=self.conv_kernel_size, stride=self.conv_stride, padding=self.conv_padding),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def crop(self, encoder_feature, decoder_feature):
        _, _, h, w = decoder_feature.size()
        encoder_feature = F.interpolate(encoder_feature, size=(h, w), mode='bilinear', align_corners=False)  # Redimensiona a feature map do encoder
        return encoder_feature

    def conv_transpose(self, in_channels, out_channels):
        return nn.Sequential(
            nn.ConvTranspose2d(in_channels, out_channels, kernel_size=self.transpose_kernel_size, stride=self.transpose_stride, padding=self.transpose_padding),
            self.conv_block(out_channels, out_channels)
        )
    
    def forward(self, x):
        print(f"Input shape: {x.shape}")
        encoders_features = []

        # Encoder pass
        for idx, encoder in enumerate(self.encoders):
            print("idx: ", idx)
            print("encoder: ", encoder)
            x = encoder(x)
            encoders_features.append(x)
            print(f"After encoder block {idx+1}: {x.shape}")
            x = self.pool(x)
            print(f"After pooling {idx+1}: {x.shape}")

        # Bottleneck
        x = self.bottleneck(x)
        print(f"After bottleneck: {x.shape}")
        # Doubled the block
        print("Starting decoder pass")
        # Decoder pass
        for i, decoder in enumerate(self.decoders):
            print("i: ", i)
            print("decoder: ", decoder)
            encoder_feature = encoders_features[-(i+1)]
            encoder_feature = self.crop(encoder_feature, x)  # Aplica o crop nas feature maps
            print(f"Encoder feature {i+1} after crop: {encoder_feature.shape}")

            if i != 0:
                x = torch.cat([encoder_feature, x], dim=1)  # Concatena encoder com decoder
                print(f"After concatenation with encoder feature {i+1}: {x.shape}")

            x = decoder(x)

        # Final convolution
        x = self.final_conv(x)
        print(f"Output shape after final convolution: {x.shape}")

        return x

In [5]:
# Definindo os caminhos para cada conjunto
dir_dataset = "data\CamVid"

class_dict = os.path.join(dir_dataset, 'class_dict.csv')
df_labels = pd.read_csv(class_dict)

  dir_dataset = "data\CamVid"


In [6]:
# Função para remover todos subdiretórios e arquivos de um diretório
def remove_all_files_from_dir(dir_path):
    for item in os.listdir(dir_path):
        item_path = os.path.join(dir_path, item)
        if os.path.isfile(item_path):
            os.remove(item_path)
        elif os.path.isdir(item_path):
            shutil.rmtree(item_path)

In [7]:
remove_all_files_from_dir("debug_processed_data")
# Parâmetros
partition = 3  # Reduzir o tamanho da imagem por um fator de 4
prob_train = 0.25  # Probabilidade de aplicar transformações no conjunto de treino
prob_others = 0.10  # Probabilidade de aplicar transformações nos conjuntos de validação e teste
dir_new_dataset = "debug_processed_data"  # Diretório onde os conjuntos processados serão salvos
with_brightness_contrast = True  # Aplicar transformações de brilho e contraste
var_gaussian = 20  # Variância do ruído gaussiano
amount = 0.02  # Quantidade de ruído sal e pimenta

# Criando o dataset para o conjunto de treino
train_dataset = PreprocessDataset(
    dir_dataset=dir_dataset,
    df_labels=df_labels,
    dir_new_dataset=dir_new_dataset,
    set_type='train',  # Carrega o conjunto de treino
    partition=partition  # Reduzir a imagem por um fator de 4
)

# Criando o dataset para o conjunto de validação
val_dataset = PreprocessDataset(
    dir_dataset=dir_dataset,
    df_labels=df_labels,
    dir_new_dataset=dir_new_dataset,
    set_type='val',  # Carrega o conjunto de validação
    partition=partition
)

# Criando o dataset para o conjunto de teste
test_dataset = PreprocessDataset(
    dir_dataset=dir_dataset,
    df_labels=df_labels,
    dir_new_dataset=dir_new_dataset,
    set_type='test',  # Carrega o conjunto de teste
    partition=partition
)

# Definindo o tamanho do batch
batch_size = 24

# Criando o DataLoader para o conjunto de treino
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Criando o DataLoader para o conjunto de validação
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Criando o DataLoader para o conjunto de teste
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

print(f'Número de batches de treino: {len(train_loader)}')
print(f'Número de batches de validação: {len(val_loader)}')
print(f'Número de batches de teste: {len(test_loader)}')

# Iterando sobre o DataLoader de treino
for images, labels in train_loader:
    print(f'Batch de imagens: {images.shape}')
    print(f'Batch de labels: {labels.shape}')
    # Aqui você pode passar as imagens e labels para o seu modelo
    break  # Apenas um exemplo, interrompendo após o primeiro batch

Número de batches de treino: 22
Número de batches de validação: 6
Número de batches de teste: 14
Batch de imagens: torch.Size([24, 3, 240, 320])
Batch de labels: torch.Size([24, 3, 240, 320])


In [8]:
# Instanciando o modelo U-Net com as dimensões das imagens
model = UNet(image_dim=(3, 240, 320), n_channels=64, n_classes=32, depth=5, conv_kernel_size=3, conv_stride=1, conv_padding=1, pool_kernel_size=2, pool_stride=2, pool_padding=0, transpose_kernel_size=3, transpose_stride=2, transpose_padding=1)
# Configurando o dispositivo (GPU, se disponível)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Dispositivo: {device}")
model = model.to(device)  # Mover o modelo para o dispositivo

# Testando com um batch de dados do DataLoader
for images, labels in train_loader:
    # Move os dados para o dispositivo (GPU/CPU)
    images = images.to(device)
    labels = labels.to(device)
    print("Imagens e rótulos movidos para o dispositivo.")
    print(images.shape)
    print(labels.shape)
    print("Iniciando forward pass...")
    # Passa as imagens pelo modelo U-Net
    output = model(images)
    
    # Exibe as dimensões das imagens, labels e da saída do modelo
    print(f"Imagens: {images.shape}")
    print(f"Labels: {labels.shape}")
    print(f"Saída do modelo: {output.shape}")
    
    # Quebrar após o primeiro batch, apenas para teste
    break

Dispositivo: cuda
Imagens e rótulos movidos para o dispositivo.
torch.Size([24, 3, 240, 320])
torch.Size([24, 3, 240, 320])
Iniciando forward pass...
Input shape: torch.Size([24, 3, 240, 320])
idx:  0
encoder:  Sequential(
  (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (2): ReLU(inplace=True)
  (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (5): ReLU(inplace=True)
)
After encoder block 1: torch.Size([24, 64, 240, 320])
After pooling 1: torch.Size([24, 64, 120, 160])
idx:  1
encoder:  Sequential(
  (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (2): ReLU(inplace=True)
  (3): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (4): Ba