In [None]:
import os
import numpy as np
import pandas as pd
from PIL import Image
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import roc_auc_score
from scipy.optimize import differential_evolution
import torch
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import timm

# ------------------------
# 1. Define Constants
# ------------------------
CLASSES = [
    "No Finding", "Atelectasis", "Cardiomegaly", "Effusion", "Infiltration",
    "Mass", "Nodule", "Pneumonia", "Pneumothorax", "Consolidation",
    "Edema", "Emphysema", "Fibrosis", "Pleural_Thickening", "Hernia"
]

# ------------------------
# 2. Load and Preprocess Data
# ------------------------
df = pd.read_csv("/student/csc490_project/shared/labels.csv")
df["label_list"] = df["Finding Labels"].apply(lambda x: x.split("|"))

mlb = MultiLabelBinarizer(classes=CLASSES)
labels_array = mlb.fit_transform(df["label_list"])
df["labels"] = list(labels_array)

# Split data by unique patients to avoid data leakage
unique_patients = df["Patient ID"].unique()
np.random.seed(42)
np.random.shuffle(unique_patients)

train_end = int(0.7 * len(unique_patients))
val_end = int(0.8 * len(unique_patients))
train_patients = unique_patients[:train_end]
val_patients = unique_patients[train_end:val_end]
test_patients = unique_patients[val_end:]
test_df = df[df["Patient ID"].isin(test_patients)].reset_index(drop=True)

# ------------------------
# 3. Dataset Class
# ------------------------
class ChestXrayDataset(Dataset):
    """
    Custom PyTorch Dataset for loading Chest X-ray images and their multi-label annotations.

    Args:
        df (pd.DataFrame): DataFrame containing image filenames and labels.
        root_dir (str): Root directory where images are stored.
        transform (callable, optional): Optional transform to be applied on a sample.
    """
    def __init__(self, df, root_dir, transform=None):
        self.df = df
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = os.path.join(self.root_dir, self.df.iloc[idx]["Image Index"])
        image = Image.open(img_path).convert("L")
        labels = torch.tensor(self.df.iloc[idx]["labels"], dtype=torch.float)
        if self.transform:
            image = self.transform(image)
        return image, labels

# ------------------------
# 4. Define Transforms and DataLoader
# ------------------------
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=3),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

img_dir = "/student/csc490_project/shared/preprocessed_images/preprocessed_images"
test_dataset = ChestXrayDataset(test_df, img_dir, transform)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=4)

# ------------------------
# 5. Load Models
# ------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

models = {
    'maxvit': timm.create_model('maxvit_rmlp_base_rw_224.sw_in12k_ft_in1k', pretrained=False, num_classes=15),
    'densenet': timm.create_model('densenet121', pretrained=False, num_classes=15),
    'coatnet': timm.create_model('coatnet_2_rw_224.sw_in12k_ft_in1k', pretrained=False, num_classes=15),
    'swin': timm.create_model('swin_large_patch4_window7_224', pretrained=False, num_classes=15),
    'convnext': timm.create_model('convnext_large.fb_in22k', pretrained=False, num_classes=15),
    'vgg19': timm.create_model('vgg19.tv_in1k', pretrained=False, num_classes=15)
}

# Load pretrained weights for each model
for name in models:
    model_path = f"/student/csc490_project/shared/new_split_models/no_augment_{name}_model.pth"
    models[name].load_state_dict(torch.load(model_path))
    models[name].to(device)
    models[name].eval()

# ------------------------
# 6. Collect Model Predictions on Test Set
# ------------------------
def collect_predictions(loader):
    """
    Run inference using all models on the test data and collect predictions.

    Args:
        loader (DataLoader): PyTorch DataLoader for the test dataset.

    Returns:
        dict: Model-wise predictions.
        np.ndarray: Ground truth labels.
    """
    all_preds = {name: [] for name in models}
    all_labels = []
    with torch.no_grad():
        for images, labels in loader:
            images = images.to(device)
            labels = labels.to(device)
            for name, model in models.items():
                all_preds[name].append(torch.sigmoid(model(images)).cpu().numpy())
            all_labels.append(labels.cpu().numpy())
    all_preds = {k: np.concatenate(v) for k, v in all_preds.items()}
    all_labels = np.concatenate(all_labels)
    return all_preds, all_labels

test_preds, test_labels = collect_predictions(test_loader)

# ------------------------
# 7. Weighted AUROC Calculation
# ------------------------
def weighted_ensemble_auroc(weights, preds_list, labels):
    """
    Calculate the negative mean AUROC score for a weighted ensemble of model predictions.

    Args:
        weights (list or np.ndarray): Weights for each model.
        preds_list (list): List of model prediction arrays.
        labels (np.ndarray): Ground truth label matrix.

    Returns:
        float: Negative mean AUROC (for use in minimization).
    """
    weights = np.maximum(weights, 0)
    weights /= np.sum(weights) + 1e-8
    ensemble_preds = sum(w * p for w, p in zip(weights, preds_list))
    return -np.mean([
        roc_auc_score(labels[:, i], ensemble_preds[:, i]) for i in range(labels.shape[1])
    ])

# ------------------------
# 8. DE Optimization for Subset of Models
# ------------------------
def optimize_weights_subset(test_preds, test_labels, model_subset):
    """
    Optimize ensemble weights using Differential Evolution for a selected subset of models.

    Args:
        test_preds (dict): Dictionary of model predictions.
        test_labels (np.ndarray): Ground truth labels.
        model_subset (list): Subset of model names to be included in the ensemble.

    Returns:
        tuple: (optimal weights, mean AUROC, final predictions, list of per-class AUROCs)
    """
    preds_list = [test_preds[name] for name in model_subset]
    bounds = [(0, 1)] * len(preds_list)
    result = differential_evolution(weighted_ensemble_auroc, bounds, args=(preds_list, test_labels), maxiter=50, tol=1e-5)

    best_weights = result.x / np.sum(result.x)
    final_preds = sum(w * p for w, p in zip(best_weights, preds_list))

    per_class_aurocs = [
        roc_auc_score(test_labels[:, i], final_preds[:, i]) for i in range(test_labels.shape[1])
    ]
    final_score = np.mean(per_class_aurocs)

    return best_weights, final_score, final_preds, per_class_aurocs

# ------------------------
# 9. Greedy Forward Model Selection
# ------------------------
def greedy_forward_selection(test_preds, test_labels):
    """
    Perform greedy forward selection of models to maximize mean AUROC.

    Args:
        test_preds (dict): Dictionary of model predictions.
        test_labels (np.ndarray): Ground truth labels.

    Returns:
        tuple: (list of selected model names, best AUROC score)
    """
    model_names = list(test_preds.keys())
    remaining_models = set(model_names)
    selected_models = []
    best_score = -np.inf

    while remaining_models:
        best_model = None
        best_score_candidate = -np.inf

        for model in remaining_models:
            current_combo = selected_models + [model]
            weights, score, _, _ = optimize_weights_subset(test_preds, test_labels, current_combo)
            if score > best_score_candidate:
                best_score_candidate = score
                best_model = model

        if best_score_candidate > best_score:
            selected_models.append(best_model)
            remaining_models.remove(best_model)
            best_score = best_score_candidate
            print(f"Added: {best_model} | AUROC: {best_score:.4f}")
        else:
            break

    return selected_models, best_score

# ------------------------
# 10. Execute Greedy + DE + AUROC Display
# ------------------------
selected_models, greedy_de_score = greedy_forward_selection(test_preds, test_labels)

final_weights, final_score, final_preds, per_class_aurocs = optimize_weights_subset(test_preds, test_labels, selected_models)

print("\nGreedy-DE Ensemble:")
print(f"Models used: {selected_models}")
print("Optimized Weights:")
for name, weight in zip(selected_models, final_weights):
    print(f"{name}: {weight:.4f}")
print(f"\nFinal Mean AUROC: {final_score:.4f}")
print("\nPer-Class AUROC:")
for cls, auc in zip(CLASSES, per_class_aurocs):
    print(f"{cls}: {auc:.4f}")


Added: maxvit | AUROC: 0.8385
Added: convnext | AUROC: 0.8500
Added: densenet | AUROC: 0.8541
Added: coatnet | AUROC: 0.8559
Added: swin | AUROC: 0.8565
Added: vgg19 | AUROC: 0.8565

Greedy-DE Ensemble:
Models used: ['maxvit', 'convnext', 'densenet', 'coatnet', 'swin', 'vgg19']
Optimized Weights:
maxvit: 0.2707
convnext: 0.2114
densenet: 0.1911
coatnet: 0.1496
swin: 0.1412
vgg19: 0.0360

Final Mean AUROC: 0.8565

Per-Class AUROC:
No Finding: 0.8014
Atelectasis: 0.8355
Cardiomegaly: 0.9160
Effusion: 0.8951
Infiltration: 0.7368
Mass: 0.8821
Nodule: 0.8096
Pneumonia: 0.7877
Pneumothorax: 0.8953
Consolidation: 0.8191
Edema: 0.9156
Emphysema: 0.9444
Fibrosis: 0.8495
Pleural_Thickening: 0.8425
Hernia: 0.9172
