In [1]:
import os
from pathlib import Path
import re
import random
from sklearn.model_selection import KFold
from PIL import Image
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import json
from tqdm import tqdm
import numpy as np
import cv2
import matplotlib.pyplot as plt
import torch
from models.unet import UNet
import torch.nn as nn
import torch.optim as optim
import pandas as pd

In [2]:
def draw_contours(contours):
    # Criar uma imagem binária inicializada com valores 0 (preto)
    imagem_binaria = np.zeros((256, 256), dtype=np.uint8)

    # Verificar o número de contornos
    if len(contours) == 1:
        # Caso haja apenas um contorno, converte e desenha diretamente
        contornos_np = np.array(contours[0], dtype=np.int32)
        cv2.drawContours(imagem_binaria, [contornos_np], -1, (255, 255, 255), thickness=cv2.FILLED)
    elif len(contours) == 0:
        return imagem_binaria
    else:
        # Caso haja mais de um contorno, converte cada um e desenha
        for contorno in contours:
            contornos_np = np.array(contorno, dtype=np.int32)
            cv2.drawContours(imagem_binaria, [contornos_np], -1, (255, 255, 255), thickness=cv2.FILLED)

    return imagem_binaria
    

def organize_masks(dataset_masks, data, camera, frame):
    
    dict_human = dataset_masks[f'{data}']
    dict_robot = dataset_masks[f'{data}_robot']
    
    for _, all_masks_found in dict_human.items():
        masks_data_camera = all_masks_found[f'subimage_{camera}']
        #print(len(masks_data_camera))
        contours_human = masks_data_camera[frame]
    
    for _, all_masks_found in dict_robot.items():
        masks_data_camera = all_masks_found[f'subimage_{camera}']
        #print(len(masks_data_camera))
        contours_robot = masks_data_camera[frame]
        
        #print(contours_human)
        
    
    return draw_contours(contours_human), draw_contours(contours_robot)
        
        
def transform_masks(mask_human, mask_robot, mask_mode=None):
    
    mask_human = np.where(mask_human > 0, 1, 0)
    mask_robot = np.where(mask_robot > 0, 1, 0)
    
    # Processar a máscara conforme o modo selecionado
    if mask_mode == "entropy":
        # Resolver sobreposição: prioridade para robô (ou humano, se preferir)
        mask = np.maximum(mask_robot, mask_human)

        # Converte a máscara para um tensor PyTorch e adiciona uma dimensão para o canal
        mask_tensor = torch.tensor(mask, dtype=torch.long).unsqueeze(0)
    else:
        # Cria uma máscara onde cada canal representa uma classe
        m, n = mask_human.shape
        mask = np.zeros((3, m, n), dtype=np.float32)
        mask[0] = 1 - (mask_human + mask_robot)  # Background
        mask[1] = mask_human  # Humano
        mask[2] = mask_robot  # Robô
        # Converte a máscara para um tensor PyTorch
        mask_tensor = torch.tensor(mask, dtype=torch.float32)
        
    return mask_tensor


# Define a custom dataset class
class CustomImageDataset(Dataset):
    def __init__(self, image_paths, masks, transform=None, mask_mode=None):
        self.image_paths = image_paths
        self.masks = masks
        self.transform = transform
        self.mask_mode = mask_mode

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        #_, file_base_name = os.path.split(img_path)
        file_base_name = os.path.split(img_path)[1].split(".")[0]
        data, frame, camera = self.__get_mask_info__(file_base_name)
        #print(data, frame, camera)
        
        mask_human, mask_robot = organize_masks(self.masks, data, int(camera), int(frame))
        mask_tensor = transform_masks(mask_human, mask_robot, self.mask_mode)
        
        
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, mask_tensor
    
    def __get_mask_info__(self, strx):
        sub, act, rout, frame, camera = strx.split("_")
        return f"{sub}_{act}_{rout}", frame, camera

In [3]:
def extrair_numero_regex(texto):
    padrao = r'\d+'
    numeros = re.findall(padrao, texto)
    if numeros:
        return numeros[0]
    else:
        return None


def process_json_namefile(strx):

    splited = strx.split("_")
    formated = f"{int(extrair_numero_regex(splited[0]))}_{int(extrair_numero_regex(splited[1]))}_{int(extrair_numero_regex(splited[2]))}"

    if "robot" in strx:
        formated += "_robot"
        
    return formated

    
def load_dataset_masks(pasta):
    # Inicializa o dicionário para armazenar os dados
    dados_json = {}

    # Lista todos os arquivos na pasta
    arquivos = os.listdir(pasta)

    # Filtra apenas os arquivos JSON
    arquivos_json = [arquivo for arquivo in arquivos if arquivo.endswith('.json')]

    # Processa cada arquivo JSON encontrado
    for arquivo_json in tqdm(arquivos_json):
        caminho_completo = os.path.join(pasta, arquivo_json)
        nome_arquivo = os.path.basename(arquivo_json)

        # Carrega o conteúdo do arquivo JSON como um dicionário
        with open(caminho_completo, 'r', encoding='utf-8') as f:
            conteudo = json.load(f)
        
        # Adiciona ao dicionário final usando o nome do arquivo como chave
        dados_json[process_json_namefile(nome_arquivo)] = conteudo
    
    return dados_json


def load_image_paths(directory):
    """Loads all image paths from the specified directory."""
    path = Path(directory)
    image_paths = list(path.glob('*.jpg'))
    return [str(img) for img in image_paths]


def group_images_by_prefix(image_paths):
    """Groups images by their prefix NUMSUBJECT_NUMACTIVITY_NUM_ROUTINE."""
    pattern = re.compile(r'(\d+)_(\d+)_(\d+)_\d+_\d+.jpg')
    grouped = {}
    for img_path in image_paths:
        match = pattern.search(os.path.basename(img_path))
        if match:
            prefix = f"{match.group(1)}_{match.group(2)}_{match.group(3)}"
            if prefix not in grouped:
                grouped[prefix] = []
            grouped[prefix].append(img_path)
    return list(grouped.values())


def create_dataloaders(image_groups, dataset_masks, n_splits=5, batch_size=32, transform=None, mask_mode=None):
    """Creates DataLoaders for cross-validation."""
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    dataloaders = []
    
    for train_index, val_index in kf.split(image_groups):
        train_images = [img for i in train_index for img in image_groups[i]]
        val_images = [img for i in val_index for img in image_groups[i]]
        
        train_dataset = CustomImageDataset(train_images, dataset_masks, transform, mask_mode)
        val_dataset = CustomImageDataset(val_images, dataset_masks, transform, mask_mode)
        
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
        
        dataloaders.append((train_loader, val_loader))
    
    return dataloaders


def show_masks(mask):
    plt.imshow(mask, cmap='gray')  # cmap='gray' garante que a imagem será mostrada em tons de cinza
    plt.title('Imagem de um canal')
    plt.colorbar()  # Adiciona uma barra de cores para referência
    plt.show()

In [4]:
def load_checkpoint(model, optimizer, checkpoint_path):
    if os.path.isfile(checkpoint_path):
        print(f"Loading checkpoint: {checkpoint_path}")
        checkpoint = torch.load(checkpoint_path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        epoch = checkpoint['epoch']
        fold = checkpoint['fold']
        best_val_loss = checkpoint.get('best_val_loss', float('inf'))
        return epoch, fold, best_val_loss
    else:
        print(f"No checkpoint found at: {checkpoint_path}")
        return 0, 0, float('inf')
    

# Função para salvar checkpoints
def save_checkpoint(model, optimizer, epoch, fold, is_best, checkpoint_dir, model_type):
    checkpoint_path = os.path.join(checkpoint_dir, f"{model_type}_epoch{epoch}_fold{fold}.pth")
    torch.save({
        'epoch': epoch,
        'fold': fold,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }, checkpoint_path)

    if is_best:
        best_checkpoint_path = os.path.join(checkpoint_dir, f"best_{model_type}.pth")
        torch.save({
            'epoch': epoch,
            'fold': fold,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
        }, best_checkpoint_path)
        

# Função para calcular métricas
def calculate_metrics(outputs, targets):
    # Placeholder function: Implement Dice, IoU, etc.
    mean_dice = 0.0
    mean_iou = 0.0
    dice_per_class = [0.0, 0.0, 0.0]
    iou_per_class = [0.0, 0.0, 0.0]
    return mean_dice, mean_iou, dice_per_class, iou_per_class


# Função para salvar métricas em CSV
def save_metrics_to_csv(metrics, loss, phase, checkpoint_dir, fold):
    metrics_df = pd.DataFrame(metrics, columns=["mean_dice", "mean_iou", "dice_per_class", "iou_per_class"])
    metrics_df["loss"] = loss
    metrics_df["phase"] = phase

    csv_path = os.path.join(checkpoint_dir, f"fold{fold+1}_report.csv")
    if not os.path.isfile(csv_path):
        metrics_df.to_csv(csv_path, index=False)
    else:
        metrics_df.to_csv(csv_path, mode='a', header=False, index=False)
        

# Função para treinar e validar o modelo
def train_and_validate(model, dataloaders, num_epochs, criterion, optimizer, checkpoint_dir):
    
    # Inicialize a variável best_val_loss no início da função train_and_validate
    best_val_loss = float('inf')
    
    # Dentro do loop de treinamento e validação
    for fold, (train_loader, val_loader) in enumerate(dataloaders):
        print(f"Fold {fold+1}/{len(dataloaders)}")
        start_epoch, _, best_val_loss = load_checkpoint(model, optimizer, os.path.join(checkpoint_dir, f"fold{fold+1}.pth"))

        for epoch in range(start_epoch, num_epochs):
            model.train()
            running_loss = 0.0
            train_metrics = []

            train_pbar = tqdm(train_loader, desc=f"Fold {fold+1}, Epoch {epoch+1}/{num_epochs}", leave=False)
            for images, masks in train_pbar:
                optimizer.zero_grad()
                outputs = model(images)
                masks = masks.long()
                loss = criterion(outputs, masks.squeeze(1))
                loss.backward()
                optimizer.step()
                running_loss += loss.item() * images.size(0)

                # Calculate metrics
                mean_dice, mean_iou, dice_per_class, iou_per_class = calculate_metrics(outputs, masks)
                train_metrics.append((mean_dice, mean_iou, dice_per_class, iou_per_class))

            epoch_loss = running_loss / len(train_loader.dataset)
            train_pbar.set_postfix({"loss": epoch_loss})
            save_checkpoint(model, optimizer, epoch, fold, False, checkpoint_dir, "train")

            # Validation phase
            model.eval()
            val_loss = 0.0
            val_metrics = []
            with torch.no_grad():
                val_pbar = tqdm(val_loader, desc=f"Fold {fold+1}, Epoch {epoch+1}/{num_epochs} [Validation]", leave=False)
                for images, masks in val_pbar:
                    outputs = model(images)
                    masks = masks.long()
                    loss = criterion(outputs, masks.squeeze(1))
                    val_loss += loss.item() * images.size(0)

                    # Calculate metrics
                    mean_dice, mean_iou, dice_per_class, iou_per_class = calculate_metrics(outputs, masks)
                    val_metrics.append((mean_dice, mean_iou, dice_per_class, iou_per_class))

            val_loss /= len(val_loader.dataset)
            val_pbar.set_postfix({"loss": val_loss})

            # Save best model based on validation loss
            is_best = val_loss < best_val_loss
            if is_best:
                best_val_loss = val_loss
            save_checkpoint(model, optimizer, epoch, fold, is_best, checkpoint_dir, "val")

            # Save metrics to CSV
            save_metrics_to_csv(train_metrics, epoch_loss, "train", checkpoint_dir, fold)
            save_metrics_to_csv(val_metrics, val_loss, "val", checkpoint_dir, fold)

In [5]:
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)

In [6]:
image_directory = "C:/Users/iagor/Documents/git/data-definer/out/"
mask_directory = "C:/Users/iagor/Documents/git/human-segmentation-sam/out/"
checkpoint_dir = "checkpoints/"
batch_size = 4
n_splits = 5
    
transform = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
    ])

In [7]:
dataset_masks = load_dataset_masks(mask_directory)
mask_mode = "entropy"

 25%|██▍       | 53/216 [00:15<00:21,  7.47it/s]

In [None]:
#hyper
num_epochs = 10
learning_rate = 0.001

# Model, loss, optimizer
model = UNet(n_channels=3, n_classes=3, bilinear=True)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
image_paths = load_image_paths(image_directory)
image_groups = group_images_by_prefix(image_paths)
dataloaders = create_dataloaders(image_groups, dataset_masks, n_splits, batch_size, transform, mask_mode) 
train_and_validate(model, dataloaders, num_epochs, criterion, optimizer, checkpoint_dir)

Fold 1/5
No checkpoint found at: checkpoints/fold1.pth


Fold 1, Epoch 1/10:   0%|          | 0/56 [00:00<?, ?it/s]