In [3]:
#Standard Libraries
import os
import random
from typing import Dict, List

#Third-Party Libraries
import numpy as np
import pandas as pd

#PyTorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import seaborn as sns

#Evaluation
from sklearn.metrics import roc_auc_score
from sklearn.metrics import confusion_matrix,  precision_score, recall_score, f1_score
from sklearn.metrics import classification_report

# Load train, validation, test dataset
import pickle

In [2]:
data = np.load("mimic_embed_data.npz")
train_embeddings = data["train_embeddings"]
train_labels = data["train_labels"]
valid_embeddings = data["valid_embeddings"]
valid_labels = data["valid_labels"]
test_embeddings = data["test_embeddings"]
test_labels = data["test_labels"]

In [4]:
# Define pickle data loading function
def load_pickle(path):
    with open(path, 'rb') as f:
        return pickle.load(f)

# Define all pathologies list
pathologies = [
        "Enlarged Cardiomediastinum",
        "Cardiomegaly",
        "Lung Opacity",
        "Lung Lesion",
        "Edema",
        "Consolidation",
        "Pneumonia",
        "Atelectasis",
        "Pneumothorax",
        "Pleural Effusion",
        "Pleural Other",
        "Fracture",
        "Support Devices",
    ]

# Data cleaning (transform all nan to 0)
def fill_nan_labels_with_zero(y):
    y_clean = np.nan_to_num(y, nan=0.0)  # replace all nan to 0
    return y_clean

# # Replace nan labels
# y_train_clean = fill_nan_labels_with_zero(y_train)
# y_valid_clean = fill_nan_labels_with_zero(y_valid)
# y_test_clean = fill_nan_labels_with_zero(y_test)


# Raw Train, Validation, Test dataset
train_embeddings = load_pickle("../data/X_train_50d.pkl")
train_labels = load_pickle("../data/y_train.pkl")
valid_embeddings = load_pickle("../data/X_valid_50d.pkl")
valid_labels = load_pickle("../data/y_valid.pkl")
test_embeddings = load_pickle("../data/X_test_50d.pkl")
test_labels = load_pickle("../data/y_test.pkl")

In [5]:
class SingleLabelDataset(Dataset):
    def __init__(self, embeddings, labels):
        self.embeddings = torch.tensor(embeddings, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.float32)

    def __len__(self):
        return len(self.embeddings)

    def __getitem__(self, idx):
        return {
            "embedding": self.embeddings[idx],
            "lab": self.labels[idx].unsqueeze(0)  #  shape: (1,)
        }

In [6]:
def get_single_label_data(embeddings, labels, label_idx):
    mask = ~np.isnan(labels[:, label_idx])
    return embeddings[mask], labels[mask, label_idx]

In [7]:
class TabTransformer(nn.Module):
    def __init__(self, input_dim=50, hidden_dim=32, output_dim=1, nhead=8, nlayers=4, dropout=0.1):
        super(TabTransformer, self).__init__()

        self.input_proj = nn.Linear(input_dim, hidden_dim)
        self.norm = nn.LayerNorm(hidden_dim)

        encoder_layer = self._build_encoder_layer(hidden_dim, nhead, dropout)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=nlayers)

        self.classifier = nn.Linear(hidden_dim, output_dim)

    def _build_encoder_layer(self, hidden_dim, nhead, dropout):
        return nn.TransformerEncoderLayer(
            d_model=hidden_dim,
            nhead=nhead,
            dim_feedforward=hidden_dim * 8,
            dropout=dropout,
            activation='gelu',
            batch_first=True
        )

    def forward(self, x):
        x = self.input_proj(x)
        x = self.norm(x)
        x = x.unsqueeze(1)
        x = self.encoder(x)
        x = x.squeeze(1)
        return self.classifier(x)

In [8]:
class MaskedAsymmetricLoss(nn.Module):
    def __init__(self, gamma_pos=0, gamma_neg=4):
        super().__init__()
        self.gamma_pos = gamma_pos
        self.gamma_neg = gamma_neg

    def forward(self, logits, labels, mask):
        probs = torch.sigmoid(logits)
        # Positive loss
        pos_loss = labels * torch.pow(1 - probs, self.gamma_pos) * torch.log(probs + 1e-8)
        # Negative loss
        neg_loss = (1 - labels) * torch.pow(probs, self.gamma_neg) * torch.log(1 - probs + 1e-8)
        # Combined loss
        loss = - (pos_loss + neg_loss)
        # Apply mask
        return (loss * mask).sum() / mask.sum()

In [9]:
def train_single_label_model(train_loader, val_loader, label_name, save_path, gamma_pos, gamma_neg):
    print(f" Training with gamma_pos={gamma_pos}, gamma_neg={gamma_neg}")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = TabTransformer().to(device)
    criterion = MaskedAsymmetricLoss(gamma_pos=gamma_pos, gamma_neg=gamma_neg)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)

    for epoch in range(15):
        model.train()
        total_loss = 0

        for batch in train_loader:
            x = batch["embedding"].to(device)
            y = batch["lab"].to(device).float().view(-1, 1)  # 保证 batch_size x 1
            mask = torch.ones_like(y)

            logits = model(x)
            loss = criterion(logits, y, mask)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / len(train_loader)
        print(f"[{label_name}] Epoch {epoch+1} - Training loss: {avg_loss:.4f}")

    torch.save(model.state_dict(), save_path)
    print(f" Model for {label_name} saved to {save_path}")

    return model


In [10]:

from sklearn.metrics import average_precision_score

def grid_search_asl_for_label(label_index, label_name, gamma_pos_list, gamma_neg_list):
    print(f"\n Grid search for label: {label_name}")
    
    # Get training and validation data for the specific label
    train_X, train_y = get_single_label_data(train_embeddings, train_labels, label_index)
    val_X, val_y = get_single_label_data(valid_embeddings, valid_labels, label_index)

    train_ds = SingleLabelDataset(train_X, train_y)
    val_ds = SingleLabelDataset(val_X, val_y)
    train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=32, shuffle=False)

    best_ap = 0
    best_config = None

    for gamma_pos in gamma_pos_list:
        for gamma_neg in gamma_neg_list:
            print(f"→ gamma_pos={gamma_pos}, gamma_neg={gamma_neg}")

            # Initialize model
            model = TabTransformer().to(device)
            criterion = MaskedAsymmetricLoss(gamma_pos=gamma_pos, gamma_neg=gamma_neg)
            optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)

            # Train for a few epochs to evaluate config
            for epoch in range(8):
                model.train()
                for batch in train_loader:
                    x = batch["embedding"].to(device)
                    y = batch["lab"].to(device).float().view(-1, 1) 
                    mask = torch.ones_like(y)
                    logits = model(x)
                    loss = criterion(logits, y, mask)
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()

            # Evaluate on validation set using Average Precision
            model.eval()
            all_probs = []
            all_labels = []
            with torch.no_grad():
                for batch in val_loader:
                    x = batch["embedding"].to(device)
                    y = batch["lab"].to(device).float().view(-1, 1) 
                    logits = model(x)
                    probs = torch.sigmoid(logits).squeeze()
                    all_probs.extend(probs.cpu().numpy())
                    all_labels.extend(y.cpu().numpy())

            ap = average_precision_score(all_labels, all_probs)
            print(f"    → AP={ap:.4f}")

            if ap > best_ap:
                best_ap = ap
                best_config = (gamma_pos, gamma_neg)

    print(f" Best config for {label_name}: gamma_pos={best_config[0]}, gamma_neg={best_config[1]} with AP={best_ap:.4f}")
    return best_config

In [11]:
label_names = [
    'Atelectasis', 'Cardiomegaly', 'Consolidation', 'Edema',
    'Enlarged Cardiomediastinum', 'Fracture', 'Lung Lesion',
    'Lung Opacity', 'Pleural Effusion', 'Pleural Other',
    'Pneumonia', 'Pneumothorax', 'Support Devices'
]

gamma_pos_list = [0.0, 0.5]
gamma_neg_list = [3.0, 4.0, 5.0, 6.0]

search_results = []

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for i, label_name in enumerate(label_names):
    print(f"\n Running ASL grid search for label: {label_name}")

    best_gamma_pos, best_gamma_neg = grid_search_asl_for_label(
        i, label_name, gamma_pos_list, gamma_neg_list
    )

    # Train final model with best gamma combination
    train_X, train_y = get_single_label_data(train_embeddings, train_labels, i)
    val_X, val_y = get_single_label_data(valid_embeddings, valid_labels, i)

    train_ds = SingleLabelDataset(train_X, train_y)
    val_ds = SingleLabelDataset(val_X, val_y)

    train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=32, shuffle=False)

    model_path = f"transformer_label_{i}_{label_name}.pt"
    model = train_single_label_model(train_loader, val_loader, label_name, model_path,
                                    gamma_pos=best_gamma_pos, gamma_neg=best_gamma_neg)

    # Save best config for this label
    search_results.append({
        "label": label_name,
        "gamma_pos": best_gamma_pos,
        "gamma_neg": best_gamma_neg
    })

# Save all best gamma configurations to CSV
import pandas as pd
pd.DataFrame(search_results).to_csv("best_gamma_config_per_label.csv", index=False)
print("\n All best gamma configs saved to best_gamma_config_per_label.csv")



 Running ASL grid search for label: Atelectasis

 Grid search for label: Atelectasis
→ gamma_pos=0.0, gamma_neg=3.0
    → AP=0.5290
→ gamma_pos=0.0, gamma_neg=4.0
    → AP=0.5324
→ gamma_pos=0.0, gamma_neg=5.0
    → AP=0.5375
→ gamma_pos=0.0, gamma_neg=6.0
    → AP=0.5407
→ gamma_pos=0.5, gamma_neg=3.0
    → AP=0.5162
→ gamma_pos=0.5, gamma_neg=4.0
    → AP=0.5328
→ gamma_pos=0.5, gamma_neg=5.0
    → AP=0.5453
→ gamma_pos=0.5, gamma_neg=6.0
    → AP=0.5225
 Best config for Atelectasis: gamma_pos=0.5, gamma_neg=5.0 with AP=0.5453
 Training with gamma_pos=0.5, gamma_neg=5.0
[Atelectasis] Epoch 1 - Training loss: 0.0661
[Atelectasis] Epoch 2 - Training loss: 0.0588


KeyboardInterrupt: 

In [None]:
def evaluate_model(model, dataloader, device):
    model.eval()
    all_probs = []
    all_targets = []

    with torch.no_grad():
        for batch in dataloader:
            x = batch["embedding"].to(device)
            y = batch["lab"].to(device).squeeze()  # shape: (batch_size,)

            logits = model(x).squeeze()  # shape: (batch_size,)
            probs = torch.sigmoid(logits)

            all_probs.extend(probs.cpu().numpy())
            all_targets.extend(y.cpu().numpy())

    try:
        auc = roc_auc_score(all_targets, all_probs)
    except ValueError:
        auc = float('nan')

    return auc

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for i, label_name in enumerate(label_names):
    print(f"\n Testing model for: {label_name}")

    # 1. Prepare test data for the current label
    test_X, test_y = get_single_label_data(test_embeddings, test_labels, i)
    test_ds = SingleLabelDataset(test_X, test_y)
    test_loader = DataLoader(test_ds, batch_size=32, shuffle=False)

    # 2. Rebuild the model architecture and load saved weights
    model = TabTransformer(input_dim=1376, hidden_dim=128, output_dim=1)
    model.load_state_dict(torch.load(f"transformer_label_{i}_{label_name}.pt"))
    model = model.to(device)

    # 3. Evaluate on the test set
    test_auc = evaluate_model(model, test_loader, device)
    print(f"[{label_name}] Test AUC: {test_auc:.4f}")

In [None]:
def evaluate_full_report(model, dataloader, device):
    model.eval()
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for batch in dataloader:
            x = batch["embedding"].to(device)
            y = batch["lab"].to(device)

            logits = model(x)
            probs = torch.sigmoid(logits)
            preds = (probs > 0.5).float()

            all_labels.extend(y.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

    return classification_report(all_labels, all_preds, digits=2)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for i, label_name in enumerate(label_names):
    print("=" * 30)
    print(f"🧪 Test Evaluation Report for: {label_name}")
    print("=" * 30)

    # 1. Load test data for current label
    test_X, test_y = get_single_label_data(test_embeddings, test_labels, i)
    test_ds = SingleLabelDataset(test_X, test_y)
    test_loader = DataLoader(test_ds, batch_size=32, shuffle=False)

    # 2. Load corresponding model
    model = TabTransformer(input_dim=1376, hidden_dim=128, output_dim=1)
    model.load_state_dict(torch.load(f"transformer_label_{i}_{label_name}_best.pt"))
    model = model.to(device)

    # 3. Run classification report on test set
    report = evaluate_full_report(model, test_loader, device)
    print(report)

In [None]:
from sklearn.metrics import classification_report
import pandas as pd
import numpy as np

def safe_get(report, class_label, metric):
    """
    安全提取某个 class 或 avg 指标。
    - 对于 class_label 为 0 或 1，尝试多种可能的字符串 key；
    - 对于 avg 项，如 'macro avg'，直接查找。
    """
    if isinstance(class_label, (int, float)):
        # 尝试三种数字形式的 key
        keys_to_try = [class_label, str(class_label), f"{float(class_label):.1f}"]
    else:
        # 字符串类，如 'macro avg'
        keys_to_try = [class_label]

    for key in keys_to_try:
        if key in report and metric in report[key]:
            return report[key][metric]
    return np.nan


def evaluate_full_report(model, dataloader, device):
    model.eval()
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for batch in dataloader:
            x = batch["embedding"].to(device)
            y = batch["lab"].to(device)

            logits = model(x)
            probs = torch.sigmoid(logits)
            preds = (probs > 0.5).float()

            all_labels.extend(y.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

    # accuracy: how many predictions match ground truth
    accuracy = (np.array(all_labels) == np.array(all_preds)).mean()

    return classification_report(all_labels, all_preds, digits=4, output_dict=True), accuracy

# === 收集所有标签的报告 ===
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
report_dict = {}

for i, label_name in enumerate(label_names):
    print("=" * 30)
    print(f"🧪 Test Evaluation Report for: {label_name}")
    print("=" * 30)

    # 1. Load test data for current label
    test_X, test_y = get_single_label_data(test_embeddings, test_labels, i)
    test_ds = SingleLabelDataset(test_X, test_y)
    test_loader = DataLoader(test_ds, batch_size=32, shuffle=False)

    # 2. Load corresponding model
    model = TabTransformer(input_dim=1376, hidden_dim=128, output_dim=1)
    model.load_state_dict(torch.load(f"transformer_label_{i}_{label_name}.pt"))
    model = model.to(device)

    # 3. Get classification report + accuracy
    report, acc = evaluate_full_report(model, test_loader, device)

    report_dict[label_name] = {
        # Class 0
        'precision_0': safe_get(report, 0, 'precision'),
        'recall_0': safe_get(report, 0, 'recall'),
        'f1-score_0': safe_get(report, 0, 'f1-score'),

        # Class 1
        'precision_1': safe_get(report, 1, 'precision'),
        'recall_1': safe_get(report, 1, 'recall'),
        'f1-score_1': safe_get(report, 1, 'f1-score'),

        # Macro avg
        'precision': safe_get(report, 'macro avg', 'precision'),
        'recall': safe_get(report, 'macro avg', 'recall'),
        'f1-score': safe_get(report, 'macro avg', 'f1-score'),
        'support': safe_get(report, 'macro avg', 'support'),

        # Accuracy
        'accuracy': acc
    }

# === 保存为 DataFrame ===
df = pd.DataFrame(report_dict)
df.index.name = "Metric"
df = df.round(4)
df.to_csv("test_metrics_per_label4.csv")
print("\n✅ Saved report with accuracy to: test_metrics_per_label4.csv")


🧪 Test Evaluation Report for: Atelectasis
🧪 Test Evaluation Report for: Cardiomegaly
🧪 Test Evaluation Report for: Consolidation
🧪 Test Evaluation Report for: Edema
🧪 Test Evaluation Report for: Enlarged Cardiomediastinum
🧪 Test Evaluation Report for: Fracture
🧪 Test Evaluation Report for: Lung Lesion
🧪 Test Evaluation Report for: Lung Opacity
🧪 Test Evaluation Report for: Pleural Effusion
🧪 Test Evaluation Report for: Pleural Other
🧪 Test Evaluation Report for: Pneumonia
🧪 Test Evaluation Report for: Pneumothorax
🧪 Test Evaluation Report for: Support Devices

✅ Saved report with accuracy to: test_metrics_per_label4.csv


In [None]:
from sklearn.metrics import roc_auc_score, average_precision_score

def evaluate_auc_ap(model, dataloader, device):
    model.eval()
    all_probs = []
    all_targets = []

    with torch.no_grad():
        for batch in dataloader:
            x = batch["embedding"].to(device)
            y = batch["lab"].to(device).squeeze()

            logits = model(x).squeeze()
            probs = torch.sigmoid(logits)

            all_probs.extend(probs.cpu().numpy())
            all_targets.extend(y.cpu().numpy())

    try:
        auc = roc_auc_score(all_targets, all_probs)
    except ValueError:
        auc = float('nan')

    try:
        ap = average_precision_score(all_targets, all_probs)
    except ValueError:
        ap = float('nan')

    return auc, ap

# ========================
# 🧪 Evaluate on Test Set
# ========================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for i, label_name in enumerate(label_names):
    print("=" * 40)
    print(f"🧪 Test Metrics for: {label_name}")
    print("=" * 40)

    # 1. Load test data
    test_X, test_y = get_single_label_data(test_embeddings, test_labels, i)
    test_ds = SingleLabelDataset(test_X, test_y)
    test_loader = DataLoader(test_ds, batch_size=32, shuffle=False)

    # 2. Load model
    model = TabTransformer(input_dim=1376, hidden_dim=128, output_dim=1)
    model.load_state_dict(torch.load(f"transformer_label_{i}_{label_name}.pt"))
    model = model.to(device)

    # 3. Evaluate
    auc, ap = evaluate_auc_ap(model, test_loader, device)
    print(f"AUC Score:              {auc:.4f}")
    print(f"Average Precision (AP): {ap:.4f}\n")


🧪 Test Metrics for: Atelectasis
AUC Score:              0.9010
Average Precision (AP): 0.5798

🧪 Test Metrics for: Cardiomegaly
AUC Score:              0.9141
Average Precision (AP): 0.5808

🧪 Test Metrics for: Consolidation
AUC Score:              0.9047
Average Precision (AP): 0.3551

🧪 Test Metrics for: Edema
AUC Score:              0.9660
Average Precision (AP): 0.5419

🧪 Test Metrics for: Enlarged Cardiomediastinum
AUC Score:              0.7954
Average Precision (AP): 0.1204

🧪 Test Metrics for: Fracture
AUC Score:              0.7926
Average Precision (AP): 0.1352

🧪 Test Metrics for: Lung Lesion
AUC Score:              0.8927
Average Precision (AP): 0.3622

🧪 Test Metrics for: Lung Opacity
AUC Score:              0.8656
Average Precision (AP): 0.5682

🧪 Test Metrics for: Pleural Effusion
AUC Score:              0.9473
Average Precision (AP): 0.6958

🧪 Test Metrics for: Pleural Other
AUC Score:              0.9122
Average Precision (AP): 0.2117

🧪 Test Metrics for: Pneumonia
AUC

In [None]:
# ✅ Evaluate multi-label classification models on the test set
# ✅ Strategy: Per-label evaluation by skipping NaNs individually (Option 2)

from sklearn.metrics import accuracy_score, average_precision_score
import numpy as np
import torch
from torch.utils.data import DataLoader

# === Step 1: Predict each label individually, skipping NaNs ===
all_true = []
all_pred = []

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for i, label_name in enumerate(label_names):
    print(f"Evaluating label: {label_name}")

    # Select non-NaN samples for this label
    mask = ~np.isnan(test_labels[:, i])
    X = test_embeddings[mask]
    y = test_labels[mask, i]

    test_ds = SingleLabelDataset(X, y)
    test_loader = DataLoader(test_ds, batch_size=32, shuffle=False)

    # Load model
    model = TabTransformer(input_dim=1376, hidden_dim=128, output_dim=1)
    model.load_state_dict(torch.load(f"transformer_label_{i}_{label_name}.pt"))
    model = model.to(device)
    model.eval()

    preds = []
    true_vals = []

    with torch.no_grad():
        for batch in test_loader:
            x = batch["embedding"].to(device)
            y_batch = batch["lab"].to(device)

            logits = model(x)
            probs = torch.sigmoid(logits)
            preds.extend(probs.cpu().numpy())  # Use raw probabilities for PR-AUC
            true_vals.extend(y_batch.cpu().numpy())

    all_pred.append(np.array(preds).flatten())
    all_true.append(np.array(true_vals).flatten())

# === Step 2: Compute metrics ===
micro_acc = accuracy_score(
    np.concatenate([t.round() for t in all_true]),
    np.concatenate([p > 0.5 for p in all_pred])
)
macro_acc = np.mean([
    accuracy_score(all_true[i].round(), (all_pred[i] > 0.5).astype(int)) for i in range(len(label_names))
])

# === Step 3: Compute PR-AUC per label ===
pr_auc_per_label = []
for i in range(len(label_names)):
    if len(np.unique(all_true[i])) > 1:
        auc = average_precision_score(all_true[i], all_pred[i])
        pr_auc_per_label.append(auc)
    else:
        pr_auc_per_label.append(np.nan)  # Cannot compute AUC with one class only

# === Step 4: Print results ===
print("\n✅ Evaluation on Test Set using per-label skipping of NaNs:")
print(f"Micro Accuracy (label-wise overall):     {micro_acc:.4f}")
print(f"Macro Accuracy (avg per label):          {macro_acc:.4f}")

print("\n✅ Per-Label PR-AUC (Precision-Recall AUC):")
for i, label_name in enumerate(label_names):
    auc = pr_auc_per_label[i]
    print(f"{label_name:<25} → PR-AUC: {auc:.4f}" if not np.isnan(auc) else f"{label_name:<25} → PR-AUC: N/A")

macro_pr_auc = np.nanmean(pr_auc_per_label)
print(f"\nMacro PR-AUC (avg across labels): {macro_pr_auc:.4f}")

print("\n✅ Label-wise Distribution After Skipping NaNs Individually (per-label view):\n")

for i, label_name in enumerate(label_names):
    label_vals = test_labels[:, i]
    valid_mask = ~np.isnan(label_vals)
    label_clean = label_vals[valid_mask]

    count_0 = np.sum(label_clean == 0)
    count_1 = np.sum(label_clean == 1)
    print(f"{label_name:<25} → 0: {count_0:<5} | 1: {count_1:<5} | total: {len(label_clean):<5}")