In [1]:
# Importando bibliotecas
import os
import torch
import numpy as np
import matplotlib.pyplot as plt
import cv2

%matplotlib inline

# Importando bibliotecas úteis para a rede
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import torchvision.utils as torch_utils
import torch.nn as nn

# Importando TravNet e seu dataset
import sys

sys.path.append('../src')

from models.TravNet_WOBlocks import TravNet
# from models.TravNet_ViT import VisualTransformer # ADICIONAR ViT
# from models.TravNet_ViT import CustomTransformer # ADICIONAR ViT
from utils.TravDataloader import TravNetDataset

## Initialization

In [2]:
class Object(object):
    pass

params = Object() # Cria um objeto para armazenar os parâmetros
# Parametros do dataset
params.data_path        = r'../../data/' 
params.csv_path         = os.path.join(params.data_path, 'data.csv')
params.preproc          = True  # Vertical flip augmentation - inverte a imagem verticalmente
params.depth_mean       = 3.5235
params.depth_std        = 10.6645

# Parametros de treino
params.seed             = 230 # Seed para o gerador de números aleatórios - como saber a melhor seed para o modelo?
params.epochs           = 25 # MUDAR AQUI AS ÉPOCAS
params.batch_size       = 16
params.learning_rate    = 1e-4
params.weight_decay     = 1e-5

# Parametros do modelo 
params.pretrained = True
params.load_network_path = None 
params.input_size       = (424, 240)
params.output_size      = (424, 240)
params.output_channels  = 1
params.bottleneck_dim   = 256

# Parametros da ViT
transformer_params = Object()
transformer_params.in_channels = 3
transformer_params.hidden_dim = 256
transformer_params.num_heads = 8
transformer_params.num_layers = 12

In [3]:
torch.manual_seed(params.seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed(params.seed) 

# Selecionar GPU ou CPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")

Using cuda:0 device


In [4]:
def load_data():
    # Transformações para o dataset
    # Pré-estudo ablativo - Alterar aqui
    transform = transforms.Compose([
                transforms.ToPILImage(), # Converte o tensor para uma imagem PIL
                transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
                transforms.RandomChoice([
                    transforms.RandomHorizontalFlip(p=1),
                    transforms.RandomRotation(15),
                    transforms.RandomInvert(0.5),
                ]),
                transforms.ToTensor(), # Converte a imagem PIL para um tensor
                transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), # Normaliza o tensor (média e desvio padrão)
                ])
    
    '''
    transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            ])
    '''

    dataset = TravNetDataset(params, transform)

    # Divide o dataset em treino e validação 
    train_size, val_size = int(0.8*len(dataset)), np.ceil(0.2*len(dataset)).astype('int')
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size]) # Separa os dados de acordo com os tamanhos estabelecidos e embaralha

    train_loader = DataLoader(train_dataset, batch_size=params.batch_size, shuffle=True, num_workers=2)
    val_loader = DataLoader(val_dataset, batch_size=params.batch_size, shuffle=True, num_workers=2)

    print('Total loaded %d images' % len(dataset))
    print('Loaded %d train images' % train_size)
    print('Loaded %d valid images' % val_size)

    return train_loader, val_loader

In [5]:
def get_bounding_box(binary_image):
    binary_image_np = binary_image.detach().cpu().numpy()
    binary_image_np = np.where(binary_image_np > 0.5, 255, 0).astype(np.uint8)

    contours, _ = cv2.findContours(binary_image_np, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    bounding_boxes = []

    for contour in contours:
        x, y, largura, altura = cv2.boundingRect(contour)
        bounding_boxes.append((x, y, largura, altura))

    return bounding_boxes

In [6]:
def get_iou(ground_truth, predicted):
    iou_list = []

    for gt_box, pred_box in zip(ground_truth, predicted):
        x1_1, y1_1, largura_1, altura_1 = gt_box
        x1_2, y1_2, largura_2, altura_2 = pred_box

        x2_1 = x1_1 + largura_1
        y2_1 = y1_1 + altura_1
        x2_2 = x1_2 + largura_2
        y2_2 = y1_2 + altura_2

        x_overlap = max(0, min(x2_1, x2_2) - max(x1_1, x1_2))
        y_overlap = max(0, min(y2_1, y2_2) - max(y1_1, y1_2))
        intersection = x_overlap * y_overlap

        gt_area = largura_1 * altura_1
        pred_area = largura_2 * altura_2

        if gt_area == 0 or pred_area == 0:
            iou = 0.0  # Evitar divisão por zero
        else:
            iou = intersection / (gt_area + pred_area - intersection)
        
        iou_list.append(iou)

    return iou_list

In [7]:
def draw_bounding_boxes(path_img, pred_img):
    bounding_box_path = get_bounding_box(path_img[0, 0, :, :])
    bounding_box_pred = get_bounding_box(pred_img[0, 0, :, :])

    iou_val = get_iou(bounding_box_path, bounding_box_pred)

    path_img_np = path_img[0, 0, :, :].cpu().numpy()
    pred_img_np = pred_img[0, 0, :, :].cpu().numpy()

    combined_image = np.zeros_like(path_img_np)

    plt.figure(figsize=(8, 8))
    plt.imshow(path_img_np, cmap='gray')  
    plt.title("path_img com Bounding Boxes")
    for box in bounding_box_path:
        x, y, w, h = box
        plt.gca().add_patch(plt.Rectangle((x, y), w, h, fill=False, edgecolor='red'))
    plt.show()

    plt.figure(figsize=(8, 8))
    plt.imshow(pred_img_np, cmap='gray')  
    plt.title("pred_img com Bounding Boxes")
    for box in bounding_box_pred:
        x, y, w, h = box
        plt.gca().add_patch(plt.Rectangle((x, y), w, h, fill=False, edgecolor='blue'))
    plt.show()

    for box in bounding_box_pred:
        x, y, w, h = box
        combined_image[y:y+h, x:x+w] = 0.5 * pred_img_np[y:y+h, x:x+w] + 0.5 * combined_image[y:y+h, x:x+w]

    for box in bounding_box_path:
        x, y, w, h = box
        combined_image[y:y+h, x:x+w] = 0.5 * path_img_np[y:y+h, x:x+w] + 0.5 * combined_image[y:y+h, x:x+w]

    plt.figure(figsize=(8, 8))
    plt.imshow(combined_image, cmap='gray')
    
    if iou_val:  
        plt.title(f"IoU = {iou_val[0]:.2f}") 
    else:
        plt.title("IoU não disponível")    
    
    for box in bounding_box_path:
        x, y, w, h = box
        plt.gca().add_patch(plt.Rectangle((x, y), w, h, fill=False, edgecolor='red'))
    
    for box in bounding_box_pred:
        x, y, w, h = box
        plt.gca().add_patch(plt.Rectangle((x, y), w, h, fill=False, edgecolor='blue'))

    plt.show()

In [8]:
# Pré-estudo ablativo - Alterar aqui
def fit(net, criterion, optimizer, scheduler, train_loader, val_loader):    

    # Pré-estudo ablativo - Alterar aqui
    patience = 10 # Número de épocas sem melhora na perda de validação para parar o treinamento
    counter = 0 

    best_val_loss = np.inf 
    train_loss_list = [] 
    val_loss_list = [] 

    for epoch in range(params.epochs):
        net.train()
        train_loss = 0.0 
        
        for i, data in enumerate(train_loader):
            data = (item.to(device).type(torch.float32) for item in data) 
            color_img, depth_img, path_img, mu_img, nu_img, weight = data 

            # Forward pass
            pred = net(color_img, depth_img) # EDITAR AQUI CASO QUEIRA TESTAR COM/SEM PROFUNDIDADE
            label = mu_img

            loss = weight*criterion(pred*path_img, label) 
            loss = torch.mean(loss)
            optimizer.zero_grad() 
            loss.backward() 
            optimizer.step() 

            train_loss += loss.item() 

        train_loss /= len(train_loader) 
        train_loss_list.append(train_loss) 

        if (epoch) % 10 == 0:
            outstring = 'Epoch [%d/%d], Loss: ' % (epoch+1, params.epochs)
            print(outstring, train_loss)
            print('Learning Rate for this epoch: {}'.format(optimizer.param_groups[0]['lr']))

        # Testando o modelo 
        with torch.no_grad():
            net.eval() 

            val_loss = 0.0 

            for i, data in enumerate(val_loader):
                data = (item.to(device).type(torch.float32) for item in data)
                color_img, depth_img, path_img, mu_img, nu_img, weight = data

                pred = net(color_img, depth_img) # EDITAR AQUI CASO QUEIRA TESTAR COM/SEM PROFUNDIDADE
                label = mu_img

                loss = weight*criterion(pred*path_img, label)
                loss = torch.mean(loss)

                val_loss += loss.item()

            val_loss /= len(val_loader)
            val_loss_list.append(val_loss)
        
            # Pré-estudo ablativo - Alterar aqui   
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                counter = 0

                # Salva o modelo
                print('Updating best validation loss: %.5f' % best_val_loss)
                torch.save(net.state_dict(), 'checkpoints/best_predictor_depth.pth')

            else:
                counter += 1
                print('No improvement since last epoch: %d' % counter)
                if counter >= patience:
                    net.module.load_state_dict(torch.load('checkpoints/best_predictor_depth.pth'))
                    print('Early stopping!')
                    break

        # Pré-estudo ablativo - Alterar aqui    
        scheduler.step() 

        if (epoch + 1) % 5 == 0:
            plt.figure(figsize = (14,14))
            plt.subplot(1, 3, 1)
            plt.imshow(color_img[0].permute(1, 2, 0).cpu().numpy())
            plt.subplot(1, 3, 2)
            plt.imshow(255*pred[0,0,:,:].detach().cpu().numpy(), vmin=0, vmax=255)
            plt.show(block=False)
            
            draw_bounding_boxes(path_img, pred)

    print('Training Loss list: ', train_loss_list)
    print('Validation Loss list: ', val_loss_list)

    plt.plot(train_loss_list, label='Treinamento')
    plt.plot(val_loss_list, label='Validação')
    plt.xlabel('Época')
    plt.ylabel('Perda')
    plt.legend()
    plt.show()

In [9]:
# REINICIALIZAR AQUI
net = TravNet(params) # Instancia a rede - # EDITAR AQUI CASO QUEIRA TESTAR COM/SEM PROFUNDIDADE

# Usado para carregar um modelo salvo
if params.load_network_path is not None:
    print('Loading saved network from {}'.format(params.load_network_path))
    net.load_state_dict(torch.load(params.load_network_path))

print("Let's use", torch.cuda.device_count(), "GPUs!") 
net = torch.nn.DataParallel(net).to(device) 

Let's use 1 GPUs!


In [10]:
# Inicializa um tensor de teste
# EDITAR AQUI CASO QUEIRA TESTAR COM/SEM PROFUNDIDADE
test = net(torch.rand([2, 3, params.input_size[1], params.input_size[0]]).to(device), torch.rand([2, 1, params.input_size[1], params.input_size[0]]).to(device))
print('test.shape:', test.shape)

test.shape: torch.Size([2, 1, 240, 424])


In [11]:
train_data, val_data = load_data() 

Initializing dataset


ParserError: Error tokenizing data. C error: Calling read(nbytes) on source failed. Try engine='python'.

In [None]:
data_iterator = iter(train_data)
data = next(data_iterator)
first_image = data[0][0]
first_image = torch_utils.make_grid(first_image)

plt.figure(figsize=(10, 10))
plt.imshow(first_image.permute(1, 2, 0))
plt.title('First image')
plt.show() 

## Set up training tools and fitting

In [None]:
criterion = torch.nn.L1Loss(reduction='none') # Perda L1 (erro absoluto médio)
optimizer = torch.optim.Adam(net.parameters(), lr=params.learning_rate, weight_decay=params.weight_decay) # Verificar o melhor otimizador para o modelo
# Pré-estudo ablativo - Alterar aqui   
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[10,20], gamma=0.1)

In [None]:
# train = fit(net=net, criterion=criterion, optimizer=optimizer, train_loader=train_data, val_loader=val_data)
# Pré-estudo ablativo - Alterar aqui 
train = fit(net=net, criterion=criterion, optimizer=optimizer, scheduler=scheduler, train_loader=train_data, val_loader=val_data) 