In [3]:
import pandas as pd

cassava_df = pd.read_csv('/home/alumno/Desktop/datos/Computer Vision/cassava/cassava_split.csv')

In [4]:
IMAGE_ROOT_DIR = './cassava/images/'

In [10]:
from torchvision import transforms
from torchvision.transforms import InterpolationMode
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import lightning.pytorch as pl
from torchvision import transforms 
import lightning.pytorch as pl
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler

# --- 1. CLASE CASSAVADATASET ---
class CassavaDataset(Dataset):
    """
    Dataset personalizado para el conjunto de datos de Cassava.
    """
    def __init__(self, df: pd.DataFrame, root_dir: str, transform=None):
        self.df = df
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_name = row['image_id']
        label = int(row['label']) 
        
        img_path = os.path.join(self.root_dir, img_name)
        # Cargamos la imagen y aseguramos el formato RGB
        image = Image.open(img_path).convert('RGB') 
        
        if self.transform:
            image = self.transform(image)
        
        return image, torch.tensor(label, dtype=torch.long)

class CassavaDataModule(pl.LightningDataModule):
    def __init__(self, data_df: pd.DataFrame, image_dir: str, batch_size: int = 64):
        super().__init__()
        
        self.data_df = data_df
        self.image_dir = image_dir
        self.batch_size = batch_size
        
        # Now we transform to match the data the model was originally trained on
        self.transform = transforms.Compose([
             transforms.Resize(256),#, interpolation=InterpolationMode.BILINEAR),              
             transforms.CenterCrop(224),          
             transforms.ToTensor(),
             transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) 
        ])


        
        

    def setup(self, stage: str):
        # 1. Filtrar el DataFrame basado en la columna 'set'
        train_df = self.data_df[self.data_df['set'] == 'train'].reset_index(drop=True)
        val_df = self.data_df[self.data_df['set'] == 'val'].reset_index(drop=True)
        test_df = self.data_df[self.data_df['set'] == 'test'].reset_index(drop=True)
        
        # 2. Creación de los objetos Dataset
        self.train_ds = CassavaDataset(train_df, self.image_dir, transform=self.transform)
        self.val_ds = CassavaDataset(val_df, self.image_dir, transform=self.transform)
        self.test_ds = CassavaDataset(test_df, self.image_dir, transform=self.transform)

        print(f"Dataset sizes: Train={len(self.train_ds)}, Val={len(self.val_ds)}, Test={len(self.test_ds)}")

    # Métodos para crear los DataLoaders
    def train_dataloader(self):
        # Extract labels from the dataset dataframe
        labels = self.train_ds.df['label'].values
    
        # Compute class frequencies
        class_sample_count = np.array([len(np.where(labels == t)[0]) for t in np.unique(labels)])
        # Inverse of frequency as weights
        weights = 1. / class_sample_count
        # Assign weight to each sample
        samples_weight = np.array([weights[t] for t in labels])
    
        # Define sampler
        sampler = WeightedRandomSampler(
            weights=torch.DoubleTensor(samples_weight),
            num_samples=len(samples_weight),
            replacement=True
        )
    
        return DataLoader(
            self.train_ds,
            batch_size=self.batch_size,
            sampler=sampler,
            num_workers=2
        )

    def val_dataloader(self):
        return DataLoader(self.val_ds, batch_size=self.batch_size, shuffle=False, num_workers = 2)

    def test_dataloader(self):
        return DataLoader(self.test_ds, batch_size=self.batch_size, shuffle=False, num_workers = 2)

In [12]:
class LightningModule(pl.LightningModule):
    def __init__(self, model, lr=1e-3, wd=0., discriminative_lr=None):
        super().__init__()
        self.model = model
        self.lr = lr
        self.wd = wd
        self.discriminative_lr = discriminative_lr # (lr, lr_mult)

        self.training_step_outputs = defaultdict(float)
        self.validation_step_outputs = defaultdict(float)

    def get_layer_wise_lr(self, lr, lr_mult):

        # Save layer names
        layer_names = []
        for idx, (name, param) in enumerate(self.model.named_parameters()):
            layer_names.append(name)
            print(f'{idx}: {name}')
        
        # Reverse layers
        layer_names.reverse()
        
        # placeholder
        parameters      = []
        prev_group_name = layer_names[0].split('.')[0]
        
        # store params & learning rates
        for idx, name in enumerate(layer_names):
            
            # parameter group name
            cur_group_name = name.split('.')[0]
            
            # update learning rate
            if cur_group_name != prev_group_name:
                lr *= lr_mult
            prev_group_name = cur_group_name
            
            # display info
            print('Using discriminative learning rates')
            print(f'{idx}: lr = {lr:.6f}, {name}')
            
            # append layer parameters
            parameters += [{'params': [p for n, p in model.named_parameters() if n == name and p.requires_grad],
                            'lr':     lr}]

        return parameters
        

    def forward(self, x):
        return self.model(x)

    def configure_optimizers(self):
        
        return torch.optim.Adam(
            self.parameters() if self.discriminative_lr == None else self.get_layer_wise_lr(*self.discriminative_lr), 
            lr=self.lr, weight_decay=self.wd)

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        self.log('train_loss', loss, on_step=False, on_epoch=True)
        
        self.training_step_outputs['loss'] += loss.detach().cpu()
        self.training_step_outputs['steps'] += 1
            
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        self.log('val_loss', loss, on_step=False, on_epoch=True)

        self.validation_step_outputs['loss'] += loss.detach().cpu()
        self.validation_step_outputs['steps'] += 1
        
        return loss

    def on_train_epoch_end(self): 
        avg_loss = self.training_step_outputs['loss'] / self.training_step_outputs['steps']
        print(f"Average training loss for epoch {self.current_epoch}: {avg_loss.item():.4f}")
        self.training_step_outputs.clear() 

    def on_validation_epoch_end(self):
        avg_loss = self.validation_step_outputs['loss'] / self.validation_step_outputs['steps']
        print(f"Average validation loss for epoch {self.current_epoch}: {avg_loss.item():.4f}")
        self.validation_step_outputs.clear()

In [14]:
from torchvision import models
from torchvision.models import ResNet18_Weights
from torch import nn
from collections import defaultdict

data_module = CassavaDataModule(cassava_df, IMAGE_ROOT_DIR)
data_module.setup('fit')

ckpt_path = "/home/alumno/Desktop/datos/Computer Vision/lab 1/cassava_resnet18_tl_ft_disc_lr/version_0/checkpoints/best_valid_loss.ckpt"

model = models.resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)

num_in_feat = model.fc.in_features
model.fc = nn.Linear(num_in_feat, 5)

# Load the best model. Note that we need to include all the LightningMNISTClassifier's __init__ parameters
model = LightningModule.load_from_checkpoint(ckpt_path, model=model)
model.cuda(0) # Move to GPU

# Move to GPU if needed
model.eval()


Dataset sizes: Train=14977, Val=3210, Test=3210


LightningModule(
  (model): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, tra

In [16]:
from sklearn.metrics import accuracy_score, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import os

y_trues, y_hats = [], []

for (x, y_true) in iter(data_module.test_dataloader()):

    with torch.no_grad():
        y_hat_logits = model(x.cuda())
        y_hat = nn.Softmax(1)(y_hat_logits).argmax(1)
        y_hat = y_hat.detach().cpu()

    y_hats.append(y_hat)
    y_trues.append(y_true)

y_trues = torch.cat(y_trues)
y_hats = torch.cat(y_hats)

print(f'Acc.: {accuracy_score(y_trues, y_hats):.4f}')

class_names = {
    0: "Cassava Bacterial Blight (CBB)",
    1: "Cassava Brown Streak Disease (CBSD)",
    2: "Cassava Green Mottle (CGM)",
    3: "Cassava Mosaic Disease (CMD)",
    4: "Healthy"
}
class_labels = list(class_names.values())
cm = confusion_matrix(y_trues, y_hats, labels=torch.arange(5))

plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False, xticklabels=class_labels, yticklabels=class_labels)
plt.title("Confusion Matrix")
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")
plt.savefig("Confusion_matrix_4", dpi=400)
plt.show()

FileNotFoundError: Caught FileNotFoundError in DataLoader worker process 0.
Original Traceback (most recent call last):
  File "/home/alumno/py313ml/.venv/lib/python3.13/site-packages/torch/utils/data/_utils/worker.py", line 349, in _worker_loop
    data = fetcher.fetch(index)  # type: ignore[possibly-undefined]
  File "/home/alumno/py313ml/.venv/lib/python3.13/site-packages/torch/utils/data/_utils/fetch.py", line 52, in fetch
    data = [self.dataset[idx] for idx in possibly_batched_index]
            ~~~~~~~~~~~~^^^^^
  File "/tmp/ipykernel_357319/918004032.py", line 30, in __getitem__
    image = Image.open(img_path).convert('RGB')
            ~~~~~~~~~~^^^^^^^^^^
  File "/home/alumno/py313ml/.venv/lib/python3.13/site-packages/PIL/Image.py", line 3513, in open
    fp = builtins.open(filename, "rb")
FileNotFoundError: [Errno 2] No such file or directory: './cassava/images/3481863039.jpg'


In [6]:
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize
import matplotlib.pyplot as plt
import torch.nn.functional as F
import numpy as np

# Assuming y_trues and y_probs are already collected as in your previous code
n_classes = len(class_labels)

# Binarize true labels
y_true_onehot = F.one_hot(y_trues, num_classes=n_classes).numpy()
y_probs_np = y_probs.numpy()

# Plot ROC curves for each class
plt.figure(figsize=(10, 8))
for i in range(n_classes):
    fpr, tpr, _ = roc_curve(y_true_onehot[:, i], y_probs_np[:, i])
    roc_auc = auc(fpr, tpr)
    plt.plot(fpr, tpr, lw=2, label=f'Class {class_labels[i]} (AUC = {roc_auc:.2f})')

plt.plot([0, 1], [0, 1], 'k--', lw=2)  # Diagonal line
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Multi-class ROC Curves')
plt.legend(loc="lower right")
plt.grid(True)
plt.savefig("ROC_curves.png", dpi=400)
plt.show()


NameError: name 'class_labels' is not defined