In [2]:
import os
from google.colab import drive

drive.mount('/content/drive')
images_dir = '/content/drive/MyDrive/Colab Notebooks/JULE/data/images_224x224'
output_dir = '/content/drive/MyDrive/Colab Notebooks/JULE/data/output_multilabel'
os.makedirs(output_dir, exist_ok=True)


Mounted at /content/drive


In [3]:
!pip install torch torchvision matplotlib pandas scikit-learn

import time
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import transforms, models
from sklearn.metrics import hamming_loss, f1_score
import pandas as pd
import matplotlib.pyplot as plt



In [4]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Device:", device)

Device: cuda


In [5]:
class FeatureExtractor(nn.Module):
    def __init__(self, pretrained=True):
        super(FeatureExtractor, self).__init__()
        resnet = models.resnet50(pretrained=pretrained)
        self.features = nn.Sequential(*list(resnet.children())[:-1])

    def forward(self, x):
        return self.features(x).view(x.size(0), -1)

In [6]:
class DEC(nn.Module):
    def __init__(self, feature_extractor, n_labels=14, alpha=1.0):
        super(DEC, self).__init__()
        self.feature_extractor = feature_extractor
        self.n_labels = n_labels
        self.alpha = alpha

        # Determinación de la dimensión de las características
        dummy_input = torch.zeros(1, 3, 224, 224).to(device)
        with torch.no_grad():
            output = self.feature_extractor(dummy_input)
            feature_dim = output.shape[1]

        # Inicialización de los centroides
        self.centroids = nn.Parameter(torch.randn(n_labels, feature_dim).to(device))

    def forward(self, x):
        features = self.feature_extractor(x)  # Extraer características
        distances = torch.cdist(features, self.centroids)  # Calcular distancias a los centroides
        probabilities = torch.sigmoid(-distances)  # Convertir a probabilidades
        return probabilities

    def loss(self, probabilities, targets):
        bce_loss = nn.BCELoss()  # Pérdida binaria para cada etiqueta
        return bce_loss(probabilities, targets)



In [7]:

def train_dec_multilabel(dataloader, model, optimizer, n_epochs=20, output_dir="output"):
    metrics = []
    start_time = time.time()

    model.train()
    for epoch in range(n_epochs):
        total_loss = 0
        total_hamming = 0
        total_f1_macro = 0
        total_f1_micro = 0
        total_samples = 0

        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            probabilities = model(inputs)
            loss = model.loss(probabilities, labels)
            loss.backward()
            optimizer.step()

            predictions = (probabilities > 0.5).float()  # Umbral para predicciones
            hamming = hamming_loss(labels.cpu().numpy(), predictions.cpu().numpy())
            f1_macro = f1_score(labels.cpu().numpy(), predictions.cpu().numpy(), average="macro")
            f1_micro = f1_score(labels.cpu().numpy(), predictions.cpu().numpy(), average="micro")

            total_loss += loss.item()
            total_hamming += hamming * len(inputs)
            total_f1_macro += f1_macro * len(inputs)
            total_f1_micro += f1_micro * len(inputs)
            total_samples += len(inputs)

        # Calcular métricas promedio
        epoch_loss = total_loss / len(dataloader)
        epoch_hamming = total_hamming / total_samples
        epoch_f1_macro = total_f1_macro / total_samples
        epoch_f1_micro = total_f1_micro / total_samples

        metrics.append({
            "Epoch": epoch + 1,
            "Loss": epoch_loss,
            "Hamming Loss": epoch_hamming,
            "F1 Macro": epoch_f1_macro,
            "F1 Micro": epoch_f1_micro
        })

        print(f"Epoch {epoch + 1}/{n_epochs}, Loss: {epoch_loss:.4f}, "
              f"Hamming Loss: {epoch_hamming:.4f}, F1 Macro: {epoch_f1_macro:.4f}, "
              f"F1 Micro: {epoch_f1_micro:.4f}")

    # Guardar métricas y modelo
    metrics_file = os.path.join(output_dir, "metrics.csv")
    pd.DataFrame(metrics).to_csv(metrics_file, index=False)

    model_file = os.path.join(output_dir, "dec_model_multilabel.pth")
    torch.save(model.state_dict(), model_file)

    print(f"Entrenamiento completado en {time.time() - start_time:.2f} segundos. "
          f"Modelo y métricas guardados en {output_dir}.")


In [8]:
def evaluate_dec_multilabel(model, dataloader):
    model.eval()
    total_hamming = 0
    total_f1_macro = 0
    total_f1_micro = 0
    total_samples = 0

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            probabilities = model(inputs)
            predictions = (probabilities > 0.5).float()

            hamming = hamming_loss(labels.cpu().numpy(), predictions.cpu().numpy())
            f1_macro = f1_score(labels.cpu().numpy(), predictions.cpu().numpy(), average="macro")
            f1_micro = f1_score(labels.cpu().numpy(), predictions.cpu().numpy(), average="micro")

            total_hamming += hamming * len(inputs)
            total_f1_macro += f1_macro * len(inputs)
            total_f1_micro += f1_micro * len(inputs)
            total_samples += len(inputs)

    return {
        "Hamming Loss": total_hamming / total_samples,
        "F1 Macro": total_f1_macro / total_samples,
        "F1 Micro": total_f1_micro / total_samples
    }





In [9]:
# Paso 7: Visualizar las Métricas
def plot_metrics(metrics_file):
    metrics_df = pd.read_csv(metrics_file)

    plt.figure(figsize=(10, 6))
    plt.plot(metrics_df["Epoch"], metrics_df["Loss"], label="Loss")
    plt.plot(metrics_df["Epoch"], metrics_df["Hamming Loss"], label="Hamming Loss")
    plt.plot(metrics_df["Epoch"], metrics_df["F1 Macro"], label="F1 Macro")
    plt.plot(metrics_df["Epoch"], metrics_df["F1 Micro"], label="F1 Micro")
    plt.xlabel("Epoch")
    plt.ylabel("Metric Value")
    plt.legend()
    plt.title("Training Metrics")
    plt.show()


In [10]:
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import ImageFolder
from torchvision import transforms

# Transformaciones para las imágenes
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5],std =[0.5])
])
dataset = ImageFolder(root=images_dir, transform=transform)

# Dividir el conjunto de datos en subconjuntos aleatorios
train_size = int(0.8 * len(dataset))  # 80% para entrenamiento
test_size = len(dataset) - train_size  # 20% para prueba
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Crear DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

print(f"Tamaño del conjunto de entrenamiento: {len(train_dataset)}")
print(f"Tamaño del conjunto de prueba: {len(test_dataset)}")

In [None]:
train_dataset = ChestXray8Dataset(
    img_dir=images_dir,
    metadata_file=metadata_dir,
    split_file=train_list_path,
    mode='train',  # Training mode
    transform=transform
)
test_dataset = ChestXray8Dataset(
    img_dir=images_dir,
    metadata_file=metadata_dir,
    split_file=test_list_path,
    mode='test',  # Training mode
    transform=transform
)

In [None]:
train_dataset = ChestXray8Dataset(
    img_dir=images_dir,
    metadata_file=metadata_dir,
    split_file=train_list_path,
    mode='train',  # Training mode
    transform=transform
)
test_dataset = ChestXray8Dataset(
    img_dir=images_dir,
    metadata_file=metadata_dir,
    split_file=test_list_path,
    mode='test',  # Training mode
    transform=transform
)
train_loader =  DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:

feature_extractor = FeatureExtractor(pretrained=True).to(device)
dec_model = DEC(feature_extractor, n_labels=14).to(device)
optimizer = torch.optim.Adam(dec_model.parameters(), lr=1e-4)

# Entrenamiento
train_dec_multilabel(train_loader, dec_model, optimizer, n_epochs=20, output_dir=output_dir)

# Evaluación
metrics = evaluate_dec_multilabel(dec_model, test_loader)
print("Metrics on Test Set:", metrics)

# Graficar métricas
plot_metrics(os.path.join(output_dir, "metrics.csv"))

In [None]:
train_dataset = ChestXray8Dataset(
    img_dir=images_dir,
    metadata_file=metadata_dir,
    split_file=train_list_path,
    mode='train',  # Training mode
    transform=transform
)
test_dataset = ChestXray8Dataset(
    img_dir=images_dir,
    metadata_file=metadata_dir,
    split_file=test_list_path,
    mode='test',  # Training mode
    transform=transform
)
train_loader =  DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
from torchvision import models
model = models.resnet50(pretrained=True)
num_classes = 14  # Número de etiquetas en tu dataset
model.fc = nn.Sequential(
    nn.Linear(model.fc.in_features, num_classes),
    nn.Sigmoid()  # Usamos sigmoid porque es una tarea multietiqueta
)

criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10


for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, labels in dataloader:
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(dataloader)}')



RuntimeError: Input type (torch.cuda.FloatTensor) and weight type (torch.FloatTensor) should be the same