
# HAM10000 — Failure Analysis of Medical AI (Single Notebook, Full Code)

This notebook provides a **complete, runnable pipeline** for failure analysis on the **HAM10000** dermoscopy dataset only (no toy data). It implements Tasks A–E from your assignment (taxonomy, stress tests, uncertainty & calibration, mitigations, and case studies) **without generating a PDF report**.
All outputs are saved under `outputs/<run_id>/...` for easy inclusion in your final submission. fileciteturn0file0



## Environment (install locally)

```bash
python -m venv .venv && source .venv/bin/activate
pip install --upgrade pip
pip install torch torchvision numpy pandas scikit-learn Pillow matplotlib pyyaml tqdm opencv-python reportlab
```
> If you're on CPU-only, install CPU wheels for torch/torchvision from PyPI.


In [1]:

import os, json, math, random, time, io, sys
from datetime import datetime
from typing import List, Tuple, Dict, Any

import numpy as np
import pandas as pd
from PIL import Image, ImageOps, ImageFilter, ImageEnhance

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import matplotlib.pyplot as plt

# ---- Utilities ----
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "3"

def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# def get_device(kind: str = "auto"):
#     if kind == "auto":
#         return torch.device("cuda" if torch.cuda.is_available() else "cpu")
#     return torch.device(kind)

def now_uid(prefix="run"):
    return f"{prefix}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

def ensure_dir(path: str):
    os.makedirs(path, exist_ok=True)

def save_json(obj, path: str):
    with open(path, "w") as f:
        json.dump(obj, f, indent=2)

def load_json(path: str):
    with open(path, "r") as f:
        return json.load(f)

def log_versions(out_dir: str):
    info = {"python": sys.version}
    try:
        import numpy, torch, torchvision, pandas
        info.update({
            "numpy": numpy.__version__,
            "torch": torch.__version__,
            "torchvision": getattr(torchvision, "__version__", "N/A"),
            "pandas": pandas.__version__,
        })
    except Exception:
        pass
    save_json(info, os.path.join(out_dir, "versions.json"))


In [None]:

# ==== Configuration (EDIT THESE) ====
AUTHOR   = "Ufaq Khan"     # used only for file naming, not for any report
DATA_ROOT = "./CV8502/A1/dataset/HAM10000"  # only used if DATASET == "ham10000"
IMG_SIZE = 224
SEED     = 42
DEVICE   = "cuda"

# Training
EPOCHS       = 10   # keep within ≤10–15 as per assignment guidance
BATCH_SIZE   = 32
LR           = 1e-4
WEIGHT_DECAY = 1e-4
NUM_WORKERS  = 4

# Model
MODEL_NAME = "densenet121"  # "densenet121" (preferred) or "tiny_cnn"

# Uncertainty
MC_DROPOUT_ITERS = 30
TTA_ITERS        = 16
USE_TTA          = True

# Experiment bookkeeping
EXPERIMENT = "ham10000_failure_analysis"
OUT_BASE   = "outputs"
RUN_ID     = now_uid(EXPERIMENT)
OUT_DIR    = os.path.join(OUT_BASE, RUN_ID)
ensure_dir(OUT_DIR)

# Class names (7-class multi-class)
CLASS_NAMES = ["akiec","bcc","bkl","df","mel","nv","vasc"]
NUM_CLASSES = len(CLASS_NAMES)

print("RUN_ID:", RUN_ID)
print("OUT_DIR:", OUT_DIR)


RUN_ID: ham10000_failure_analysis_20251101_163319
OUT_DIR: outputs/ham10000_failure_analysis_20251101_163319



## Dataset — HAM10000 only (no toy data)

- Requires `HAM10000_metadata.csv` in `DATA_ROOT` and the image files (e.g., `HAM10000_images_part_1`, `HAM10000_images_part_2`).
- The loader creates a stratified split file `split_ham.csv` on first run.


In [3]:

class HAM10000(Dataset):
    # Real HAM10000 loader (7-class multi-class). Expects metadata and images.
    # Splits are stratified by 'dx' and cached to split_ham.csv under DATA_ROOT.
    def __init__(self, data_root, split="train", img_size=224, seed=42):
        super().__init__()
        self.data_root = data_root
        self.img_size = img_size
        self.class_names = CLASS_NAMES

        meta_csv = os.path.join(data_root, "HAM10000_metadata.csv")
        assert os.path.exists(meta_csv), f"Missing metadata CSV at {meta_csv}"
        df = pd.read_csv(meta_csv)

        # Create / load split file
        split_csv = os.path.join(data_root, "split_ham.csv")
        if not os.path.exists(split_csv):
            df["split"] = None
            for c in CLASS_NAMES:
                dfc = df[df["dx"]==c].sample(frac=1.0, random_state=seed)  # deterministic shuffle
                n = len(dfc)
                n_train = int(0.7*n); n_val = int(0.15*n)
                idx = dfc.index.tolist()
                tr, va, te = idx[:n_train], idx[n_train:n_train+n_val], idx[n_train+n_val:]
                df.loc[tr, "split"] = "train"
                df.loc[va, "split"] = "val"
                df.loc[te, "split"] = "test"
            df.to_csv(split_csv, index=False)
        else:
            df = pd.read_csv(split_csv)

        self.df = df[df["split"]==split].reset_index(drop=True)

        # Build image path map
        folders = [os.path.join(data_root, d) for d in os.listdir(data_root) if os.path.isdir(os.path.join(data_root,d))]
        img_map = {}
        for folder in folders:
            for name in os.listdir(folder):
                if name.lower().endswith((".jpg",".jpeg",".png")):
                    img_map[os.path.splitext(name)[0]] = os.path.join(folder, name)
        self.paths = []
        self.labels = []
        self.meta = []

        for _, row in self.df.iterrows():
            img_id = row["image_id"]
            p = img_map.get(img_id, None)
            if p is None:
                cand = os.path.join(data_root, f"{img_id}.jpg")
                if os.path.exists(cand):
                    p = cand
            assert p is not None, f"Could not find image for {img_id}"
            self.paths.append(p)
            self.labels.append(CLASS_NAMES.index(row["dx"]))
            self.meta.append({"width": None, "height": None, "brightness": None})  # filled in __getitem__

    def __len__(self): return len(self.paths)
    def _augment(self, img):
        if random.random() < 0.5:
            img = ImageOps.mirror(img)
        img = ImageEnhance.Brightness(img).enhance(0.9 + 0.2*random.random())
        img = ImageEnhance.Contrast(img).enhance(0.9 + 0.2*random.random())
        return img

    def __getitem__(self, idx):
        p = self.paths[idx]
        img = Image.open(p).convert("RGB")
        W, H = img.size
        if self.df.loc[idx, "split"]=="train":
            img = self._augment(img)
        img = img.resize((self.img_size, self.img_size), resample=Image.BILINEAR)
        arr = np.asarray(ImageOps.autocontrast(img), dtype=np.float32) / 255.0
        arr = arr.transpose(2,0,1)  # CHW
        bright = float(arr.mean())
        self.meta[idx] = {"width": W, "height": H, "brightness": bright}
        x = torch.from_numpy(arr).float()
        y = int(self.labels[idx])
        return x, y, self.meta[idx]

def build_datasets():
    tr = HAM10000(DATA_ROOT, split="train", img_size=IMG_SIZE, seed=SEED)
    va = HAM10000(DATA_ROOT, split="val",   img_size=IMG_SIZE, seed=SEED)
    te = HAM10000(DATA_ROOT, split="test",  img_size=IMG_SIZE, seed=SEED)
    return tr, va, te



## Models

- **TinyCNN** (fast) and **DenseNet‑121** (preferred if torchvision is available).

In [4]:

class TinyCNN(nn.Module):
    def __init__(self, in_channels=3, num_classes=7):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(in_channels, 32, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Dropout(p=0.25),
            nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(),
            nn.AdaptiveAvgPool2d(1),
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128, 128), nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(128, num_classes)
        )
    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

def build_model(name: str, num_classes: int, in_channels: int = 3):
    name = (name or "tiny_cnn").lower()
    if name == "densenet121":
        try:
            import torchvision.models as M
            try:
                m = M.densenet121(weights=M.DenseNet121_Weights.DEFAULT)
            except Exception:
                m = M.densenet121(weights=None)
            in_features = m.classifier.in_features
            m.classifier = nn.Linear(in_features, num_classes)
            return m
        except Exception as e:
            print("torchvision not available or failed — falling back to TinyCNN.", e)
            return TinyCNN(in_channels=in_channels, num_classes=num_classes)
    return TinyCNN(in_channels=in_channels, num_classes=num_classes)

def enable_mc_dropout(model: nn.Module):
    for m in model.modules():
        if isinstance(m, (nn.Dropout, nn.Dropout2d, nn.AlphaDropout)):
            m.train()



## Training

Uses CrossEntropyLoss and AdamW. Saves best weights to `model.best.pt` in the current run folder.


In [5]:

def train_model(model, loader_tr, loader_va, device, epochs=10, lr=1e-4, wd=1e-4, out_dir="outputs"):
    model = model.to(device)
    opt = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=wd)
    criterion = nn.CrossEntropyLoss()
    best_val = float("inf")
    hist = {"epoch":[], "train_loss":[], "val_loss":[]}

    for ep in range(1, epochs+1):
        model.train()
        losses = []
        for xb, yb, _ in loader_tr:
            xb = xb.to(device)
            yb = yb.to(device)
            opt.zero_grad()
            logits = model(xb)
            loss = criterion(logits, yb)
            loss.backward()
            opt.step()
            losses.append(loss.item())
        tr_loss = float(np.mean(losses)) if losses else float("nan")

        model.eval()
        vloss = []
        with torch.no_grad():
            for xb, yb, _ in loader_va:
                xb = xb.to(device); yb = yb.to(device)
                logits = model(xb)
                vloss.append(criterion(logits, yb).item())
        va_loss = float(np.mean(vloss)) if vloss else float("nan")

        hist["epoch"].append(ep); hist["train_loss"].append(tr_loss); hist["val_loss"].append(va_loss)
        torch.save({"model": model.state_dict()}, os.path.join(out_dir, "model.pt"))
        if va_loss < best_val:
            best_val = va_loss
            torch.save({"model": model.state_dict()}, os.path.join(out_dir, "model.best.pt"))
        print(f"Epoch {ep}/{epochs}   train_loss={tr_loss:.4f}   val_loss={va_loss:.4f}")

    save_json(hist, os.path.join(out_dir, "history.json"))
    return model



## Metrics (macro)
- AUROC, AUPRC (one-vs-rest)
- Sensitivity@95% specificity (one-vs-rest)
- F1 (macro)
- ECE & Brier (for calibration)


In [6]:

from sklearn.metrics import roc_auc_score, average_precision_score, f1_score, confusion_matrix

def one_hot(y_idx, K):
    y = np.zeros((len(y_idx), K), dtype=np.float32)
    y[np.arange(len(y_idx)), y_idx] = 1.0
    return y

def eval_probs(model, loader, device):
    model.eval()
    ys = []; ps = []
    with torch.no_grad():
        for xb, yb, _ in loader:
            xb = xb.to(device)
            logits = model(xb)
            prob = torch.softmax(logits, dim=1).cpu().numpy()
            ys.extend(yb.numpy().tolist())
            ps.append(prob)
    y_idx = np.array(ys, dtype=int)
    p = np.vstack(ps) if len(ps)>0 else np.zeros((0, NUM_CLASSES), dtype=float)
    return y_idx, p

def auroc_macro(y_idx, p):
    y = one_hot(y_idx, p.shape[1])
    res = []
    for k in range(p.shape[1]):
        try:
            res.append(roc_auc_score(y[:,k], p[:,k]))
        except Exception:
            res.append(np.nan)
    return float(np.nanmean(res))

def auprc_macro(y_idx, p):
    y = one_hot(y_idx, p.shape[1])
    res = []
    for k in range(p.shape[1]):
        try:
            res.append(average_precision_score(y[:,k], p[:,k]))
        except Exception:
            res.append(np.nan)
    return float(np.nanmean(res))

def sensitivity_at_spec_macro(y_idx, p, target_spec=0.95):
    y = one_hot(y_idx, p.shape[1])
    K = p.shape[1]
    sens = []
    for k in range(K):
        yt = y[:,k].astype(int)
        yp = p[:,k]
        thr = np.linspace(0,1,2001)
        best = np.nan
        for t in thr:
            yhat = (yp >= t).astype(int)
            tn, fp, fn, tp = confusion_matrix(yt, yhat, labels=[0,1]).ravel()
            spec = tn / (tn + fp + 1e-12)
            if spec >= target_spec:
                s = tp / (tp + fn + 1e-12)
                best = max(s, best) if not np.isnan(best) else s
        sens.append(best)
    return float(np.nanmean(s))

def f1_macro(y_idx, p):
    yhat = np.argmax(p, axis=1)
    return float(f1_score(y_idx, yhat, average="macro"))

def brier_score_multiclass(y_idx, p):
    y = one_hot(y_idx, p.shape[1])
    return float(np.mean((p - y)**2))

def expected_calibration_error(y_idx, p, n_bins=10):
    yhat = np.argmax(p, axis=1)
    conf = np.max(p, axis=1)
    corr = (yhat == y_idx).astype(float)
    bins = np.linspace(0.0, 1.0, n_bins+1)
    ece = 0.0
    for i in range(n_bins):
        m = (conf >= bins[i]) & (conf < bins[i+1])
        if np.any(m):
            acc = float(np.mean(corr[m]))
            c = float(np.mean(conf[m]))
            ece += (np.sum(m)/len(conf)) * abs(acc - c)
    return float(ece)

def metrics_table(y_idx, p):
    return {
        "AUROC": auroc_macro(y_idx, p),
        "AUPRC": auprc_macro(y_idx, p),
        "Sens@95Spec": sensitivity_at_spec_macro(y_idx, p, 0.95),
        "F1_macro": f1_macro(y_idx, p),
    }



## Stress tests (corruptions & domain shifts) — 3 severities each

- Corruptions: gaussian noise, gaussian blur, JPEG, brightness/contrast.  
- Shifts: color cast, downsample→upsample.


In [7]:

def _to_pil_from_tensor(xb_chw):
    arr = (xb_chw.transpose(1,2,0) * 255.0).clip(0,255).astype(np.uint8)  # CHW -> HWC
    return Image.fromarray(arr, mode="RGB")

def _to_tensor_from_pil(img):
    arr = np.asarray(img, dtype=np.float32)/255.0
    arr = arr.transpose(2,0,1)
    return torch.from_numpy(arr).float()

def corruption(img: Image.Image, ctype: str, severity: int):
    severity = int(np.clip(severity, 1, 3))
    if ctype == "gaussian_noise":
        arr = np.asarray(img).astype(np.float32)
        sigma = {1:5, 2:12, 3:25}[severity]
        noise = np.random.normal(0, sigma, arr.shape)
        out = np.clip(arr + noise, 0, 255).astype(np.uint8)
        return Image.fromarray(out)
    elif ctype == "gaussian_blur":
        r = {1:1, 2:2, 3:3}[severity]
        return img.filter(ImageFilter.GaussianBlur(radius=float(r)))
    elif ctype == "jpeg":
        q = {1:60, 2:35, 3:20}[severity]
        buf = io.BytesIO(); img.save(buf, format="JPEG", quality=q); buf.seek(0)
        return Image.open(buf).convert("RGB")
    elif ctype == "brightness_contrast":
        b = {1:0.9, 2:0.75, 3:0.6}[severity]
        c = {1:1.1, 2:1.3, 3:1.5}[severity]
        return ImageEnhance.Contrast(ImageEnhance.Brightness(img).enhance(b)).enhance(c)
    else:
        return img

def domain_shift(img: Image.Image, stype: str):
    if stype == "color_cast":
        arr = np.asarray(img).astype(np.float32)
        scales = np.array([1.1, 0.9, 0.95])
        out = np.clip(arr * scales, 0, 255).astype(np.uint8)
        return Image.fromarray(out)
    elif stype == "down_up_sample":
        s = 0.5
        small = img.resize((int(img.width*s), int(img.height*s)), resample=Image.BILINEAR)
        back = small.resize((img.width, img.height), resample=Image.BILINEAR)
        return back
    else:
        return img

def eval_with_transform(model, dataset, device, transform_fn):
    ys = []; ps = []
    model.eval()
    with torch.no_grad():
        for i in range(len(dataset)):
            x, y, meta = dataset[i]
            pil = _to_pil_from_tensor(x.numpy())
            pil2 = transform_fn(pil)
            x2 = _to_tensor_from_pil(pil2).unsqueeze(0).to(device)
            prob = torch.softmax(model(x2), dim=1).cpu().numpy()
            ys.append(y)
            ps.append(prob[0])
    y_idx = np.array(ys, dtype=int)
    p = np.vstack(ps) if len(ps)>0 else np.zeros((0, NUM_CLASSES))
    return y_idx, p

def image_size_quartile_list(metas):
    sizes = [m.get("width", IMG_SIZE) * m.get("height", IMG_SIZE) if (m.get("width") and m.get("height")) else IMG_SIZE*IMG_SIZE for m in metas]
    qs = np.quantile(sizes, [0.25,0.5,0.75]) if len(sizes)>0 else [0,0,0]
    qlab = []
    for s in sizes:
        if s <= qs[0]: qlab.append("Q1")
        elif s <= qs[1]: qlab.append("Q2")
        elif s <= qs[2]: qlab.append("Q3")
        else: qlab.append("Q4")
    return qlab

def brightness_quartile_list(metas):
    br = [m.get("brightness", 0.5) for m in metas]
    qs = np.quantile(br, [0.25,0.5,0.75]) if len(br)>0 else [0,0,0]
    qlab = []
    for v in br:
        if v <= qs[0]: qlab.append("Q1")
        elif v <= qs[1]: qlab.append("Q2")
        elif v <= qs[2]: qlab.append("Q3")
        else: qlab.append("Q4")
    return qlab



## Evaluation runner

Computes clean metrics, corruptions/shifts (with Δ vs clean), and subgroup slices (size & brightness quartiles).


In [8]:

def evaluate_all(model, ds_test, device, out_dir):
    eval_dir = os.path.join(out_dir, "eval"); ensure_dir(eval_dir)

    # Clean
    loader_te = DataLoader(ds_test, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)
    y_clean, p_clean = eval_probs(model, loader_te, device)
    res_clean = metrics_table(y_clean, p_clean)
    np.save(os.path.join(eval_dir, "y_clean.npy"), y_clean)
    np.save(os.path.join(eval_dir, "p_clean.npy"), p_clean)
    save_json(res_clean, os.path.join(eval_dir, "metrics_clean.json"))

    # Corruptions
    corr_types = ["gaussian_noise","gaussian_blur","jpeg","brightness_contrast"]
    severities = [1,2,3]
    corr_results = {}
    for ct in corr_types:
        for sv in severities:
            y_c, p_c = eval_with_transform(model, ds_test, device, lambda im, c=ct, s=sv: corruption(im, c, s))
            res_c = metrics_table(y_c, p_c)
            corr_results[f"{ct}_s{sv}"] = res_c
            np.save(os.path.join(eval_dir, f"y_{ct}_s{sv}.npy"), y_c)
            np.save(os.path.join(eval_dir, f"p_{ct}_s{sv}.npy"), p_c)
    save_json(corr_results, os.path.join(eval_dir, "metrics_corruptions.json"))

    # Domain shifts
    shifts = ["color_cast","down_up_sample"]
    shift_results = {}
    for st in shifts:
        y_s, p_s = eval_with_transform(model, ds_test, device, lambda im, s=st: domain_shift(im, s))
        res_s = metrics_table(y_s, p_s)
        shift_results[st] = res_s
        np.save(os.path.join(eval_dir, f"y_shift_{st}.npy"), y_s)
        np.save(os.path.join(eval_dir, f"p_shift_{st}.npy"), p_s)
    save_json(shift_results, os.path.join(eval_dir, "metrics_shifts.json"))

    # Subgroup slices: size & brightness quartiles
    metas = []
    ys = []; ps = []
    model.eval()
    with torch.no_grad():
        for i in range(len(ds_test)):
            x, y, m = ds_test[i]
            metas.append(m)
            ys.append(y)
            prob = torch.softmax(model(x.unsqueeze(0).to(device)), dim=1).cpu().numpy()[0]
            ps.append(prob)
    y_all = np.array(ys, dtype=int); p_all = np.vstack(ps)

    size_q = image_size_quartile_list(metas)
    bright_q = brightness_quartile_list(metas)

    def idx_where(lst, label):
        return [i for i,v in enumerate(lst) if v==label]

    slices = {}
    for q in ["Q1","Q2","Q3","Q4"]:
        idx = idx_where(size_q, q)
        if idx:
            slices[f"ImageSize_{q}"] = metrics_table(y_all[idx], p_all[idx])
        idx = idx_where(bright_q, q)
        if idx:
            slices[f"Brightness_{q}"] = metrics_table(y_all[idx], p_all[idx])
    save_json(slices, os.path.join(eval_dir, "metrics_slices.json"))

    # Deltas vs clean
    def delta(res):
        return {k: float(res.get(k, np.nan) - res_clean.get(k, np.nan)) for k in res_clean.keys()}
    corr_delta = {k: delta(v) for k,v in corr_results.items()}
    shift_delta = {k: delta(v) for k,v in shift_results.items()}
    slice_delta = {k: delta(v) for k,v in slices.items()}
    save_json(corr_delta, os.path.join(eval_dir, "delta_corruptions.json"))
    save_json(shift_delta, os.path.join(eval_dir, "delta_shifts.json"))
    save_json(slice_delta, os.path.join(eval_dir, "delta_slices.json"))

    print("Evaluation complete. Results under", eval_dir)
    return {
        "clean": res_clean, "corruptions": corr_results, "shifts": shift_results, "slices": slices
    }



## Uncertainty & calibration

- Baseline, Temperature Scaling, MC‑Dropout, and optional TTA.  
- Saves reliability diagrams and risk–coverage curves (each plot is a separate figure).


In [9]:

class TemperatureScalerCE(nn.Module):
    def __init__(self):
        super().__init__()
        self.logT = nn.Parameter(torch.zeros(1))
    def forward(self, logits):
        T = torch.exp(self.logT)
        return logits / T

def fit_temperature_ce(model, loader_val, device):
    model.eval()
    ts = TemperatureScalerCE().to(device)
    opt = torch.optim.LBFGS(ts.parameters(), lr=0.01, max_iter=50)
    criterion = nn.CrossEntropyLoss()

    xb_cached = []
    yb_cached = []
    with torch.no_grad():
        for xb, yb, _ in loader_val:
            xb_cached.append(xb.to(device))
            yb_cached.append(yb.to(device))
    xb_cached = torch.cat(xb_cached, dim=0)
    yb_cached = torch.cat(yb_cached, dim=0)

    def closure():
        opt.zero_grad()
        with torch.no_grad():
            logits = model(xb_cached)
        loss = criterion(ts(logits), yb_cached)
        loss.backward()
        return loss
    opt.step(closure)
    return ts

def predict_proba(model, loader, device, scaler=None, mc_dropout=False, iters=1, tta=False):
    model.eval()
    if mc_dropout:
        enable_mc_dropout(model)
    ys = []; ps = []
    with torch.no_grad():
        for xb, yb, _ in loader:
            xb = xb.to(device)
            if tta and iters>1:
                preds = []
                for i in range(iters):
                    if i % 2 == 1:
                        xb2 = torch.flip(xb, dims=[-1])
                    else:
                        xb2 = xb
                    logits = model(xb2)
                    if scaler is not None: logits = scaler(logits)
                    preds.append(torch.softmax(logits, dim=1).cpu().numpy())
                prob = np.mean(preds, axis=0)
            elif mc_dropout and iters>1:
                preds = []
                for _ in range(iters):
                    logits = model(xb)
                    if scaler is not None: logits = scaler(logits)
                    preds.append(torch.softmax(logits, dim=1).cpu().numpy())
                prob = np.mean(preds, axis=0)
            else:
                logits = model(xb)
                if scaler is not None: logits = scaler(logits)
                prob = torch.softmax(logits, dim=1).cpu().numpy()
            ys.extend(yb.numpy().tolist())
            ps.append(prob)
    y_idx = np.array(ys, dtype=int)
    p = np.vstack(ps) if len(ps)>0 else np.zeros((0, NUM_CLASSES))
    return y_idx, p

def reliability_diagram(y_idx, p, out_png, n_bins=10):
    conf = np.max(p, axis=1)
    yhat = np.argmax(p, axis=1)
    corr = (yhat == y_idx).astype(float)
    bins = np.linspace(0.0, 1.0, n_bins+1)
    xs, ys, ws = [], [], []
    for i in range(n_bins):
        m = (conf >= bins[i]) & (conf < bins[i+1])
        if np.any(m):
            xs.append(float(np.mean(conf[m])))
            ys.append(float(np.mean(corr[m])))
            ws.append(int(np.sum(m)))
    plt.figure()
    plt.plot([0,1],[0,1],'--')
    plt.scatter(xs, ys, s=np.array(ws))
    plt.xlabel("Confidence"); plt.ylabel("Accuracy"); plt.title("Reliability diagram")
    plt.savefig(out_png, bbox_inches="tight"); plt.close()

def risk_coverage_curve(y_idx, p, out_png):
    conf = np.max(p, axis=1)
    yhat = np.argmax(p, axis=1)
    corr = (yhat == y_idx).astype(float)
    order = np.argsort(-conf)
    cov = []; risk = []
    for k in range(1, len(order)+1):
        idx = order[:k]
        cov.append(k/len(order))
        risk.append(1.0 - float(np.mean(corr[idx])))
    plt.figure()
    plt.plot(cov, risk)
    plt.xlabel("Coverage"); plt.ylabel("Risk (1-acc)"); plt.title("Risk–Coverage")
    plt.savefig(out_png, bbox_inches="tight"); plt.close()

def run_calibration_suite(model, ds_val, ds_test, device, out_dir):
    eval_dir = os.path.join(out_dir, "eval"); ensure_dir(eval_dir)
    loader_val = DataLoader(ds_val, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)
    loader_te  = DataLoader(ds_test, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)

    # baseline
    y_b, p_b = predict_proba(model, loader_te, device, scaler=None, mc_dropout=False, iters=1, tta=False)
    # temp scaling
    ts = fit_temperature_ce(model, loader_val, device)
    y_t, p_t = predict_proba(model, loader_te, device, scaler=ts, mc_dropout=False, iters=1, tta=False)
    # MC-Dropout
    y_m, p_m = predict_proba(model, loader_te, device, scaler=None, mc_dropout=True, iters=MC_DROPOUT_ITERS, tta=False)
    # TTA (optional)
    if USE_TTA:
        y_a, p_a = predict_proba(model, loader_te, device, scaler=None, mc_dropout=False, iters=TTA_ITERS, tta=True)
    else:
        y_a, p_a = y_b, None

    stats = {}
    for name, (yy, pp) in {
        "baseline": (y_b, p_b),
        "temp_scaling": (y_t, p_t),
        "mc_dropout": (y_m, p_m),
        **({"tta": (y_a, p_a)} if p_a is not None else {})
    }.items():
        ece = expected_calibration_error(yy, pp, n_bins=10)
        brier = brier_score_multiclass(yy, pp)
        stats[name] = {"ECE": float(ece), "Brier": float(brier)}
        reliability_diagram(yy, pp, os.path.join(out_dir, f"calib_reldiag_{name}.png"))
        risk_coverage_curve(yy, pp, os.path.join(out_dir, f"risk_coverage_{name}.png"))

    save_json(stats, os.path.join(eval_dir, "calibration_stats.json"))
    print("Calibration suite complete.")
    return stats



## Case studies (2 failures) + Grad‑CAM

Selects two highest‑confidence errors and saves Grad‑CAM overlays to `outputs/<run_id>/case_studies/`.


In [10]:

def find_last_conv_layer(model):
    last = None
    for m in model.modules():
        if isinstance(m, nn.Conv2d):
            last = m
    return last

def grad_cam(model, x_tensor_bchw, target_layer=None):
    model.eval()
    if target_layer is None:
        target_layer = find_last_conv_layer(model)
    activations = {}
    gradients = {}

    def fwd_hook(m, inp, out): activations["a"] = out.detach()
    def bwd_hook(m, gin, gout): gradients["g"] = gout[0].detach()
    h1 = target_layer.register_forward_hook(fwd_hook)
    h2 = target_layer.register_full_backward_hook(bwd_hook)

    x = x_tensor_bchw.requires_grad_(True)
    logits = model(x)
    probs = torch.softmax(logits, dim=1)
    top = probs.max(dim=1).indices
    sel = logits[range(logits.shape[0]), top]
    sel.sum().backward()

    A = activations["a"]            # [B,C,H,W]
    dA = gradients["g"]             # [B,C,H,W]
    weights = dA.mean(dim=(2,3), keepdim=True)
    cam = (weights * A).sum(dim=1, keepdim=True)
    cam = torch.relu(cam)
    cam -= cam.amin(dim=(2,3), keepdim=True)
    cam /= (cam.amax(dim=(2,3), keepdim=True) + 1e-6)

    h1.remove(); h2.remove()
    return cam, probs

def _to_pil_from_tensor(xb_chw):
    arr = (xb_chw.transpose(1,2,0) * 255.0).clip(0,255).astype(np.uint8)  # CHW -> HWC
    return Image.fromarray(arr, mode="RGB")

def overlay_cam_grayscale(pil_img, cam_01):
    base = pil_img.convert("L").convert("RGBA")
    cam_uint8 = (cam_01 * 255).astype(np.uint8)
    ov = Image.fromarray(cam_uint8, mode="L").resize(base.size, resample=Image.BILINEAR)
    white = Image.new("RGBA", base.size, (255,255,255,255))
    blended = Image.composite(white, base, ov)
    return blended.convert("RGB")

def build_case_studies(model, ds_test, device, out_dir, num_cases=2):
    ensure_dir(os.path.join(out_dir, "case_studies"))
    loader = DataLoader(ds_test, batch_size=32, shuffle=False, num_workers=NUM_WORKERS)
    y_idx, p = eval_probs(model, loader, device)
    yhat = np.argmax(p, axis=1)
    conf = np.max(p, axis=1)
    wrong = np.where(yhat != y_idx)[0]
    if len(wrong) == 0:
        wrong = np.arange(min(2, len(y_idx)))

    order = wrong[np.argsort(-conf[wrong])]
    cases = []
    for k,i in enumerate(order[:num_cases]):
        x, y, meta = ds_test[i]
        pil = _to_pil_from_tensor(x.numpy())
        cam, probs = grad_cam(model, x.unsqueeze(0).to(device))
        hm = cam[0,0].cpu().numpy()
        over = overlay_cam_grayscale(pil, hm)

        fig_path = os.path.join(out_dir, "case_studies", f"case_{k+1}.png")
        over.save(fig_path)

        y_prob = {CLASS_NAMES[j]: float(p[i,j]) for j in range(NUM_CLASSES)}
        case = {
            "image": fig_path,
            "y_true": CLASS_NAMES[int(y)],
            "y_pred": CLASS_NAMES[int(yhat[i])],
            "confidence": float(conf[i]),
            "y_prob": y_prob,
            "diagnosis": "Salient region mislocalized; consider temperature scaling + selective prediction." if k==0 else "Contrast/scale-induced error; consider domain-adaptive preprocessing."
        }
        cases.append(case)

    with open(os.path.join(out_dir, "case_studies", "cases.json"), "w") as f:
        json.dump(cases, f, indent=2)
    print(f"Saved {len(cases)} case studies to", os.path.join(out_dir, "case_studies"))
    return cases



## Run the full pipeline (HAM10000 only)

**Steps:** datasets → model → train → evaluate → calibration → case studies → reproducibility logs.


In [11]:

set_seed(SEED)
device = DEVICE
print("Device:", device)

# 1) Datasets & loaders
ds_tr, ds_va, ds_te = build_datasets()
loader_tr = DataLoader(ds_tr, batch_size=BATCH_SIZE, shuffle=True,  num_workers=NUM_WORKERS, pin_memory=True)
loader_va = DataLoader(ds_va, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, pin_memory=True)

# 2) Model
model = build_model(MODEL_NAME, num_classes=NUM_CLASSES, in_channels=3)
print("Model:", model.__class__.__name__)

# 3) Train
train_model(model, loader_tr, loader_va, device, epochs=EPOCHS, lr=LR, wd=WEIGHT_DECAY, out_dir=OUT_DIR)

# Load best weights
state = torch.load(os.path.join(OUT_DIR, "model.best.pt"), map_location=device)
model.load_state_dict(state["model"])

# 4) Evaluate (clean + stress + slices)
results = evaluate_all(model, ds_te, device, OUT_DIR)

# 5) Calibration & Uncertainty
calib_stats = run_calibration_suite(model, ds_va, ds_te, device, OUT_DIR)

# 6) Case studies
cases = build_case_studies(model, ds_te, device, OUT_DIR, num_cases=2)

# 7) Reproducibility logs
log_versions(OUT_DIR)
with open(os.path.join(OUT_DIR, "COMMANDS.txt"), "a") as f:
    f.write(f"RUN_ID={RUN_ID}\n")
    f.write(f"AUTHOR={AUTHOR}\n")
    f.write(f"DATA_ROOT={DATA_ROOT}\n")
    f.write(f"MODEL_NAME={MODEL_NAME}\n")
    f.write(f"EPOCHS={EPOCHS}\n")
    f.write(f"BATCH_SIZE={BATCH_SIZE}\n")
    f.write(f"LR={LR}\n")
    f.write(f"IMG_SIZE={IMG_SIZE}\n")
print("Wrote versions.json and COMMANDS.txt")


Device: cuda
Model: DenseNet
Epoch 1/10   train_loss=0.8001   val_loss=0.5650
Epoch 2/10   train_loss=0.4576   val_loss=0.4770
Epoch 3/10   train_loss=0.3004   val_loss=0.5429
Epoch 4/10   train_loss=0.2178   val_loss=0.4880
Epoch 5/10   train_loss=0.1416   val_loss=0.5955
Epoch 6/10   train_loss=0.1142   val_loss=0.6026
Epoch 7/10   train_loss=0.0919   val_loss=0.5735
Epoch 8/10   train_loss=0.0718   val_loss=0.6014
Epoch 9/10   train_loss=0.0681   val_loss=0.5564
Epoch 10/10   train_loss=0.0492   val_loss=0.5225
Evaluation complete. Results under outputs/ham10000_failure_analysis_20251101_163319/eval
Calibration suite complete.
Saved 2 case studies to outputs/ham10000_failure_analysis_20251101_163319/case_studies
Wrote versions.json and COMMANDS.txt
