In [None]:
import os
import numpy as np
import torch
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import f1_score, precision_score
import matplotlib.pyplot as plt
from torch.amp import GradScaler, autocast
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
class EarlyStopper:  # We define this class so that it performs the EarlyStopping check at each epoch
    def __init__(self, patience=1, min_delta=0, save_path="./best_model_with_label.pt"):
        self.patience = patience
        self.min_delta = min_delta
        self.save_path = save_path
        self.counter = 0
        self.min_validation_loss = float('inf')

    def early_stop(self, validation_loss, model):
        """Returns whether the training should stop or not. If stopping criterion is not met returns False."""
        if validation_loss < self.min_validation_loss:
            self.min_validation_loss = validation_loss
            self.counter = 0
            torch.save(model.state_dict(), self.save_path)
        elif validation_loss > (self.min_validation_loss + self.min_delta):
            self.counter += 1
            if self.counter >= self.patience:
                return True
        return False
def plot_history(history: dict, plot_list=[], scale="linear"):
    fig = plt.figure(figsize=(14, 7))
    plt.xlabel("Epoch")
    for plot in plot_list:
        plt.plot(history["epoch"], history[plot], label=plot)
    plt.yscale(scale)
    plt.legend(fontsize=30)
    plt.show()
def calculate_iou(y_true, y_pred, threshold=0.5):
    """
    Calcula el IoU entre las predicciones y las etiquetas reales.
    
    Args:
        y_true (torch.Tensor): Tensor de etiquetas reales.
        y_pred (torch.Tensor): Tensor de predicciones del modelo.
        threshold (float): Umbral para binarizar las predicciones.
        
    Returns:
        float: Valor promedio de IoU.
    """
    y_pred = (y_pred > threshold).float()  # Binarizar predicciones
    intersection = (y_true * y_pred).sum(dim=(1, 2, 3))
    union = y_true.sum(dim=(1, 2, 3)) + y_pred.sum(dim=(1, 2, 3)) - intersection
    iou = (intersection + 1e-6) / (union + 1e-6)  # Evitar división por 0
    return iou.mean().item()
def calculate_dice(y_true, y_pred, threshold=0.5):
    """
    Calcula el coeficiente de Dice entre las predicciones y las etiquetas reales.
    
    Args:
        y_true (torch.Tensor): Tensor de etiquetas reales.
        y_pred (torch.Tensor): Tensor de predicciones del modelo.
        threshold (float): Umbral para binarizar las predicciones.
        
    Returns:
        float: Valor promedio de Dice.
    """
    y_pred = (y_pred > threshold).float()  # Binarizar predicciones
    intersection = (y_true * y_pred).sum(dim=(1, 2, 3))
    dice = (2 * intersection + 1e-6) / (y_true.sum(dim=(1, 2, 3)) + y_pred.sum(dim=(1, 2, 3)) + 1e-6)
    return dice.mean().item()
def train_model(model,
                train_dataloader=None,
                val_dataloader=None,
                criterion=None,
                lr=None,
                optimizer=None,
                epochs=None,
                early_stopper=None):
    history = {"loss": [], "val_loss": [], "epoch": [], "avg_iou": [], "avg_dice": []}
    optimizer = optimizer(model.parameters(), lr=lr)
    scaler = GradScaler('cuda')  # Inicializar el escalador de gradientes

    for epoch in range(epochs):
        model.train()
        running_loss = 0

        for data in train_dataloader:
            inputs, targets = data[0].to(DEVICE), data[1].to(DEVICE)
            optimizer.zero_grad()

            # Usar autocast para operaciones de precisión mixta
            with autocast('cuda'):
                outputs = model(inputs)
                loss = criterion(outputs, targets)

            # Escalar la pérdida y retropropagar
            scaler.scale(loss).backward()

            # Escalar el optimizador para actualizar los parámetros
            scaler.step(optimizer)
            scaler.update()

            running_loss += loss.item()

        avg_loss = running_loss / len(train_dataloader)
        history["loss"].append(avg_loss)

        # Validación
        model.eval()
        running_vloss = 0.0
        iou_scores = []
        dice_scores = []
        with torch.no_grad():
            for vdata in val_dataloader:
                vinputs, vtargets = vdata[0].to(DEVICE), vdata[1].to(DEVICE)

                with autocast('cuda'):
                    voutputs = model(vinputs)
                    vloss = criterion(voutputs, vtargets)

                running_vloss += vloss.item()

                # Calcular IoU y Dice
                iou = calculate_iou(vtargets, torch.sigmoid(voutputs))
                dice = calculate_dice(vtargets, torch.sigmoid(voutputs))
                iou_scores.append(iou)
                dice_scores.append(dice)

        avg_vloss = running_vloss / len(val_dataloader)
        avg_iou = sum(iou_scores) / len(iou_scores)
        avg_dice = sum(dice_scores) / len(dice_scores)

        history["val_loss"].append(avg_vloss)  # Store current val_loss value
        history["epoch"].append(epoch + 1)  # Store current epoch
        history["avg_iou"].append(avg_iou)
        history["avg_dice"].append(avg_dice)
        # Early stopping
        if early_stopper.early_stop(avg_vloss, model):
            break
        print(
            f"Epoch {epoch + 1}: IoU = {avg_iou:.4f}, Dice = {avg_dice:.4f}" + 'LOSS train {} valid {}'.format(avg_loss,
                                                                                                               avg_vloss))

    # Cargar el mejor modelo
    model.load_state_dict(torch.load(early_stopper.save_path, weights_only=True))

    return history

def load_dataset(root_dir):
    """
    Carga todas las imágenes y etiquetas desde una carpeta raíz con múltiples subcarpetas.

    Args:
        root_dir (str): Ruta de la carpeta raíz que contiene las carpetas kosXX.

    Returns:
        images (list): Lista de arrays de imágenes.
        labels (list): Lista de arrays de mapas de defectos.
    """
    images = []
    labels = []
    images_augmented = []
    labels_augmented = []
    # Iterar por cada carpeta (e.g., kos01, kos02, ...)
    for folder in sorted(os.listdir(root_dir)):
        folder_path = os.path.join(root_dir, folder)

        if os.path.isdir(folder_path):  # Asegurarse de que sea una carpeta
            # Iterar por cada archivo en la carpeta
            for file in sorted(os.listdir(folder_path)):
                file_path = os.path.join(folder_path, file)

                # Cargar imágenes y etiquetas
                if file.endswith(".jpg"):  # Es una imagen
                    if file.endswith(("aug_0.jpg", "aug_1.jpg", "aug_2.jpg", "aug_3.jpg", "aug_4.jpg", "aug_5.jpg",
                                      "aug_6.jpg")):
                        img = Image.open(file_path).convert("L")  # Convertir a escala de grises
                        images_augmented.append(np.array(img))  # Convertir a array NumPy
                    else:
                        img = Image.open(file_path).convert("L")  # Convertir a escala de grises
                        images.append(np.array(img))  # Convertir a array NumPy
                elif file.endswith(".bmp"):  # Es un mapa de etiquetas
                    if file.endswith(("aug_0_label.bmp", "aug_1_label.bmp", "aug_2_label.bmp", "aug_3_label.bmp",
                                      "aug_4_label.bmp", "aug_5_label.bmp", "aug_6_label.bmp")):
                        label = Image.open(file_path).convert("L")  # Convertir a escala de grises
                        labels_augmented.append(np.array(label))
                    else:
                        label = Image.open(file_path).convert("L")  # Convertir a escala de grises
                        labels.append(np.array(label))  # Convertir a array NumPy

    return images, labels, images_augmented, labels_augmented

In [None]:
# Ruta de la carpeta raíz
root_dir = r"C:\Users\jerij\Carpetas\Clases Formaciones Curro\Master\2Semestre\Proyecto\Data\Imagenes_defectos"
images, labels, images_augmented, labels_augmented = load_dataset(root_dir)

# Imprimir información del dataset
print(f"Número de imágenes cargadas: {len(images)}")
print(f"Número de etiquetas cargadas: {len(labels)}")
print(f"Número de imágenes augmented cargadas: {len(images_augmented)}")
print(f"Número de etiquetas augmented cargadas: {len(labels_augmented)}")
images = [img / 255.0 for img in images]
labels = [label / 255.0 for label in labels]
images_augmented = [img / 255.0 for img in images_augmented]
labels_augmented = [label / 255.0 for label in labels_augmented]
# Dividir en entrenamiento y prueba
train_images, test_images, train_labels, test_labels = train_test_split(
    images, labels, test_size=0.3, random_state=42
)

# Dividir prueba en validación y conjunto final de prueba
val_images, test_images, val_labels, test_labels = train_test_split(
    test_images, test_labels, test_size=0.5, random_state=42
)
# Agregar datos aumentados solo al conjunto de entrenamiento
train_images += images_augmented
train_labels += labels_augmented
from torchvision import transforms

# Transformaciones para las imágenes
transform = transforms.Compose([
    transforms.Resize((1024, 512)),
    transforms.ToTensor(),  # Convertir a tensor
])

# Transformaciones para las etiquetas
label_transform = transforms.Compose([
    transforms.Resize((1024, 512)),
    transforms.ToTensor(),  # Convertir a tensor
])

In [None]:
class CustomDataset(Dataset):
    def __init__(self, images, labels, transform=None, label_transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform
        self.label_transform = label_transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        # Convertir las imágenes y etiquetas de NumPy a PIL
        image = Image.fromarray(self.images[idx])
        label = Image.fromarray(self.labels[idx])

        # Aplicar transformaciones
        if self.transform:
            image = self.transform(image)
        if self.label_transform:
            label = self.label_transform(label)

        return image, label

# Crear datasets
train_dataset = CustomDataset(train_images, train_labels, transform, label_transform)
val_dataset = CustomDataset(val_images, val_labels, transform, label_transform)
test_dataset = CustomDataset(test_images, test_labels, transform, label_transform)

class UNet(nn.Module):
    def __init__(self, in_channels=1, out_channels=1):
        super(UNet, self).__init__()
        self.enc1 = self.conv_block(in_channels, 64)
        self.enc2 = self.conv_block(64, 128)
        self.enc3 = self.conv_block(128, 256)
        self.enc4 = self.conv_block(256, 512)

        self.bottleneck = self.conv_block(512, 1024)

        self.up4 = self.upconv(1024, 512)
        self.dec4 = self.conv_block(1024, 512)
        self.up3 = self.upconv(512, 256)
        self.dec3 = self.conv_block(512, 256)
        self.up2 = self.upconv(256, 128)
        self.dec2 = self.conv_block(256, 128)
        self.up1 = self.upconv(128, 64)
        self.dec1 = self.conv_block(128, 64)

        self.final = nn.Conv2d(64, out_channels, kernel_size=1)

    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
        )

    def upconv(self, in_channels, out_channels):
        return nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)

    def forward(self, x):
        enc1 = self.enc1(x)
        enc2 = self.enc2(F.max_pool2d(enc1, 2))
        enc3 = self.enc3(F.max_pool2d(enc2, 2))
        enc4 = self.enc4(F.max_pool2d(enc3, 2))

        bottleneck = self.bottleneck(F.max_pool2d(enc4, 2))

        dec4 = self.up4(bottleneck)
        dec4 = torch.cat((dec4, enc4), dim=1)
        dec4 = self.dec4(dec4)
        dec3 = self.up3(dec4)
        dec3 = torch.cat((dec3, enc3), dim=1)
        dec3 = self.dec3(dec3)
        dec2 = self.up2(dec3)
        dec2 = torch.cat((dec2, enc2), dim=1)
        dec2 = self.dec2(dec2)
        dec1 = self.up1(dec2)
        dec1 = torch.cat((dec1, enc1), dim=1)
        dec1 = self.dec1(dec1)

        return self.final(dec1)

# Instanciar el modelo
model = UNet()

In [None]:
# Calcular el peso de la clase positiva
num_positivos = sum(np.sum(label > 0) for label in labels)
num_negativos = sum(np.sum(label == 0) for label in labels)
pos_weight = torch.tensor([num_negativos / num_positivos]).to(DEVICE)

criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)  # Cambiar a BCEWithLogitsLoss para trabajar con autocast
optimizer = optim.Adam
learning_rate = 1e-5

from torch.utils.data import WeightedRandomSampler

# Calcular pesos
class_counts = [num_negativos, num_positivos]
class_weights = 1.0 / torch.tensor(class_counts, dtype=torch.float)
sample_weights = [class_weights[0] if label.sum() == 0 else class_weights[1] for label in train_labels]

sampler = WeightedRandomSampler(sample_weights, len(sample_weights))
batch_size = 4
# Crear DataLoader con el sampler
train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=sampler)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

model = model.to(DEVICE)

early_stopper = EarlyStopper(patience=3, min_delta=0.00005)
history = train_model(model, 
                      train_dataloader=train_loader,
                      val_dataloader=val_loader,
                      criterion = criterion,
                      lr = learning_rate,
                      optimizer = optimizer,
                      epochs=25,
                      early_stopper=early_stopper)

In [None]:
plot_history(history, plot_list=["loss", "val_loss"])

In [None]:
def find_optimal_threshold(model, val_dataloader, thresholds=np.linspace(0.01, 0.9, 50)):
    """
    Encuentra el umbral óptimo basado en el coeficiente de Dice.
    """
    best_threshold = 0.5
    best_dice = 0.0

    model.eval()
    with torch.no_grad():
        for threshold in thresholds:
            dice_scores = []
            for vinputs, vtargets in val_dataloader:
                vinputs, vtargets = vinputs.to(DEVICE), vtargets.to(DEVICE)
                voutputs = torch.sigmoid(model(vinputs))
                dice = calculate_dice(vtargets, voutputs, threshold)
                dice_scores.append(dice)

            avg_dice = sum(dice_scores) / len(dice_scores)
            if avg_dice > best_dice:
                best_dice = avg_dice
                best_threshold = threshold

    print(f"Optimal threshold: {best_threshold} with Dice: {best_dice:.4f}")
    return best_threshold
optimal_threshold = find_optimal_threshold(model, val_loader)

In [None]:
def visualize_predictions(model, data_loader, num_samples=3, threshold=0.01):
    """
    Visualiza las imágenes, etiquetas reales y predicciones del modelo.
    
    Args:
        model (torch.nn.Module): Modelo entrenado.
        data_loader (DataLoader): DataLoader con las imágenes y etiquetas.
        num_samples (int): Número de muestras a visualizar.
        threshold (float): Umbral para binarizar las predicciones.
    """
    model.eval()
    fig, axes = plt.subplots(num_samples, 3, figsize=(15, 5 * num_samples))

    for i, (image, label) in enumerate(data_loader):
        if i >= num_samples:
            break

        with torch.no_grad():
            pred = torch.sigmoid(model(image.to(DEVICE))).cpu()  # Aplicar Sigmoid
            pred = (pred > threshold).float()  # Binarizar las predicciones

        # Convertir tensores a arrays NumPy
        image_np = image[0].squeeze().cpu().numpy()
        label_np = label[0].squeeze().cpu().numpy()
        pred_np = pred[0].squeeze().cpu().numpy()

        # Mostrar imágenes, etiquetas y predicciones
        axes[i, 0].imshow(image_np, cmap='gray')
        axes[i, 0].set_title('Input Image')
        axes[i, 1].imshow(label_np, cmap='gray')
        axes[i, 1].set_title('Ground Truth')
        axes[i, 2].imshow(pred_np, cmap='gray')
        axes[i, 2].set_title('Prediction')

    plt.tight_layout()
    plt.show()
    
visualize_predictions(model, val_loader, num_samples=10, threshold=0.77)