In [None]:
import os
import wandb

import numpy as np 
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt

import torch
from torch import nn, optim
import tensorflow as tf
from torchvision import models
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler

from PIL import Image
import albumentations as A
from albumentations.pytorch import ToTensorV2

wandb.login(key='')

In [None]:
class CheXpertDataset(Dataset):
    '''
    Custom dataset for CheXpert. Returns a tuple of (PIL.Image, torch.Tensor (float32)).
    Args:
        data (pd.Dataframe): dataset with image path as indexes and columns as labels, all values are 0 (negative) or 1 (postivive)
        root_dir (str): root directory of dataset folder
        mode ('train', 'val'): mode for different augmentation
        transforms (albumentations): augmentation techniques to use
    '''
    def __init__(self, data, root_dir, mode='train', transforms=None):
        self.data = data.to_numpy()
        self.labels = torch.tensor(data.values.astype(np.float32))
        self.root_dir = root_dir
        self.img_paths = [os.path.join(root_dir, img_path) for img_path in data.index]
        self.transform = transforms.get(mode)
    
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image=np.array(image))['image']

        return (image, label)
    
def get_weighted_sampler(data):
    '''
    Custom Sampler for weighted sampling to deal with unbalanced labels. The class weights are the inverse of the count of positives of a label.
        Args:
            data (pd.Dataframe): dataset with image path as indexes and columns as labels, all values are 0 (negative) or 1 (postivive)
            batch_size (int): number of indices to use at a time 
    '''
    class_weights = (1/data.sum()).values
    weights = data.dot(class_weights)
    weighted_sampler = WeightedRandomSampler(torch.tensor(weights.values, dtype=torch.float), len(weights), replacement=True)
    return weighted_sampler
'''
Augmentations:
    - Scale 5% with p = 50%
    - Rotate 20° OR shear 5 pixels with p = 50%
    - Translate 5% with p = 50%
    - Resize to 224x224
    - Normalize with mean = 0.506 and std = 0.287, more details in data_preprocessing.ipynb
    - Convert to torch.Tensor
'''
transform = {
    'train': A.Compose([
        A.Affine(scale=(0.95, 1.05), p=0.5),
        A.OneOf([A.Affine(rotate=(-20, 20), p=0.5), A.Affine(shear=(-5, 5), p=0.5)], p=0.5),
        A.Affine(translate_percent=(-0.05, 0.05), p=0.5),
        A.Resize(224, 224),
        A.Normalize([0.506, 0.506, 0.506], [0.287, 0.287, 0.287]),
        ToTensorV2()
    ]),
    'val': A.Compose([
        A.Resize(224, 224),
        A.Normalize([0.506, 0.506, 0.506], [0.287, 0.287, 0.287]),
        ToTensorV2()
    ]),
}

In [None]:
test = pd.read_csv('/kaggle/input/chexpertclean/u1_test.csv',index_col=0)
train = pd.read_csv('/kaggle/input/chexpertclean/u1_train.csv', index_col=0)
val = pd.read_csv('/kaggle/input/chexpertclean/u1_val.csv', index_col=0)

In [None]:
train.index =  train.index.str.replace('CheXpert-v1.0-small', 'chexpert')
test.index = test.index.str.replace('CheXpert-v1.0-small', 'chexpert')
val.index = val.index.str.replace('CheXpert-v1.0-small', 'chexpert')

Confirm no data leakage

In [None]:
print(np.intersect1d(train.index.to_numpy(), test.index.to_numpy()))
print(np.intersect1d(val.index.to_numpy(), test.index.to_numpy()))
print(np.intersect1d(val.index.to_numpy(), train.index.to_numpy()))

Configuration for Bayes Hyperparameter Tuning using Wandb

In [None]:
sweep_config = {
    "method": "bayes",
    "metric": {
        "name": "val_mAUC",
        "goal": "maximize"
    },
    "parameters": {
        "learning_rate": {
            "values": [1e-3, 1e-4, 1e-5]
        },
        "weight_decay": {
            "values": [1e-3, 1e-4, 1e-5]
        },
        "patience": {
            "values": [1, 3]
        }#,
        #"drop_rate": {
        #    "values": [0, 0.25]
        #}
    }
}
sweep_id = wandb.sweep(sweep_config, project="deep_learning")

In [None]:
batch_size = 32

train_dataset = CheXpertDataset(
    data=train,
    root_dir='/kaggle/input/', 
    mode='train',
    transforms = transform
    )

train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    sampler = get_weighted_sampler(train),
    num_workers = 4,
    pin_memory=True
    )

val_dataset = CheXpertDataset(
    val, '/kaggle/input/',
    mode='val',
    transforms = transform)

val_loader = DataLoader(
    val_dataset,
    batch_size = batch_size,
    num_workers = 4,
    pin_memory=True)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def train(config=None):
    with wandb.init(config=config):
        config = wandb.config
        
        model = models.resnet152(num_classes=14).to(device)
        loss_function = nn.BCEWithLogitsLoss()
        optimizer = optim.AdamW(model.parameters(), lr=config.learning_rate, weight_decay=config.weight_decay)
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=config.patience)
        mAUC = tf.keras.metrics.AUC(multi_label=True, from_logits=True)
        
        best_val_mAUC = 0
        epochs_without_improvement = 0
        max_epochs = 50
        val_patience = 5
        for epoch in range(max_epochs):
            
            model.train()
            epoch_loss = 0.0
            for inputs, labels in tqdm((train_loader), desc=f"Epoch {epoch+1}/{max_epochs}", unit="batch"):
                # Compute prediction and loss
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = loss_function(outputs, labels)
        
                loss.backward()
                optimizer.step()
                optimizer.zero_grad()
        
                # Accumulate loss
                epoch_loss += loss.item()
        
            # Compute and print average loss for the epoch
            avg_loss = epoch_loss / len(train_loader)
            scheduler.step(avg_loss)
            print(f"Epoch {epoch+1}/{max_epochs}, Loss: {avg_loss:.4f}")
        
            model.eval()
            mAUC.reset_state()
        
            with torch.no_grad():
                for inputs, labels in tqdm(val_loader, desc="Testing", unit="batch"):
                    inputs, labels = inputs.to(device), labels.to(device)
            
                    # Get the logits from the model
                    outputs = model(inputs)
            

            
                    mAUC.update_state(labels.cpu().numpy(), outputs.cpu().numpy())
        
            val_mAUC = mAUC.result().numpy()
            print(f'Mean AUROC: {val_mAUC:.4f}')
        
            wandb.log({"epoch": epoch, "train_loss": avg_loss, "val_mAUC": val_mAUC})
            
            if val_mAUC > best_val_mAUC:
                best_val_mAUC = val_mAUC
                torch.save(model.state_dict(), "ResNetScratch-U1.pth")
                epochs_without_improvement = 0
            else:
                epochs_without_improvement += 1
                
            # Early stopping
            if epochs_without_improvement >= val_patience:
                print("Early stopping triggered!")
                break


In [None]:
wandb.agent(sweep_id, train, count=5)

In [None]:
# Function to calculate and plot AUROC for multi-label classification
def plot_auroc(model, dataloader, num_classes, device):
    model.eval()
    
    true_labels = []
    predicted_probs = []
    
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Get raw model output (logits)
            outputs = model(inputs)
            
            # Apply sigmoid to get probabilities for each label
            probs = torch.sigmoid(outputs)
            
            true_labels.append(labels.cpu().numpy())
            predicted_probs.append(probs.cpu().numpy())
    
    # Convert to numpy arrays
    true_labels = np.vstack(true_labels)
    predicted_probs = np.vstack(predicted_probs)

    # Calculate AUROC for each label
    fpr = {}
    tpr = {}
    roc_auc = {}

    # Iterate over each label and compute the ROC curve and AUROC
    for i in range(num_classes):
        fpr[i], tpr[i], _ = roc_curve(true_labels[:, i], predicted_probs[:, i])
        roc_auc[i] = roc_auc_score(true_labels[:, i], predicted_probs[:, i])

    # Plot the ROC curve for each label
    plt.figure(figsize=(10, 8))
    for i in range(num_classes):
        plt.plot(fpr[i], tpr[i], label=f'Class {i+1} (AUROC = {roc_auc[i]:.2f})')
    
    # Plot the diagonal (random guess line)
    plt.plot([0, 1], [0, 1], 'k--', label='Random guess (AUROC = 0.5)')
    
    # Add labels and legend
    plt.title('Multi-label (ROC) Curve')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.legend(loc='lower right')
    plt.show()
    
    # Return mean AUROC
    mean_auroc = np.mean(list(roc_auc.values()))
    print(f'Mean AUROC across all classes: {mean_auroc:.2f}')
    return mean_auroc

#model.to(device)
#plot_auroc(model, val_loader, num_classes=14, device='cuda')