> **Note:** This notebook contains only code.  
> All outputs, plots, and data-derived tensors from the Sleep Heart Health Study (SHHS) dataset have been removed  
> to comply with the NSRR data use agreement.  
> To reproduce results, please obtain the dataset from [https://sleepdata.org/datasets/shhs](https://sleepdata.org/datasets/shhs).

In [None]:
import os, sys, platform, subprocess, time, warnings, logging, pathlib
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns
from tf_keras_vis.utils.model_modifiers import ReplaceToLinear
from tf_keras_vis.utils.scores import CategoricalScore
from tf_keras_vis.gradcam import Gradcam, GradcamPlusPlus
from tf_keras_vis.saliency import Saliency
from tf_explain.core.integrated_gradients import IntegratedGradients
import shap
import lime.lime_tabular as lime_tab

tf.get_logger().setLevel("ERROR")
warnings.filterwarnings("ignore", category=UserWarning, module="shap")
logging.getLogger("shap").setLevel(logging.ERROR)

np.random.seed(99)

from datetime import datetime
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Libraries imported.")


In [None]:
def run(cmd):
    try:
        return subprocess.check_output(cmd, shell=True, text=True).strip()
    except Exception:
        return "N/A"

# -- CPU-Bezeichnung --
cpu_model = run("lscpu | grep -m1 'Model name' | cut -d ':' -f2- | xargs")
if cpu_model == "N/A":
    cpu_model = run("grep -m1 'model name' /proc/cpuinfo | cut -d ':' -f2- | xargs")

info = {
    "Timestamp"        : datetime.now().isoformat(timespec="seconds"),
    "Host"             : platform.node(),
    "OS"               : f"{platform.system()} {platform.release()}",
    "Python"           : platform.python_version(),
    "TensorFlow"       : tf.__version__,
    "TensorFlow Build" : tf.sysconfig.get_build_info().get("build_type","N/A"),
    "CUDA Built w/"    : tf.sysconfig.get_build_info().get("cuda_version","N/A"),
    "cuDNN Built w/"   : tf.sysconfig.get_build_info().get("cudnn_version","N/A"),
    "Num GPUs visible" : len(tf.config.experimental.list_physical_devices('GPU')),
    "GPU Name(s)"      : run("nvidia-smi --query-gpu=name --format=csv,noheader"),
    "GPU Driver"       : run("nvidia-smi --query-gpu=driver_version --format=csv,noheader | head -1"),
    "CUDA Runtime"     : run("nvcc --version | grep release | awk '{print $6}'"),
    "cuDNN Runtime"    : run("grep -oP 'CUDNN_MAJOR\\s*=\\s*\\K[0-9]+' /usr/include/cudnn_version.h 2>/dev/null") + "." +
                          run("grep -oP 'CUDNN_MINOR\\s*=\\s*\\K[0-9]+' /usr/include/cudnn_version.h 2>/dev/null"),
    "CPU Model"        : cpu_model,
    "CPU (logical)"    : os.cpu_count(),
    "CPU (physical)"   : os.cpu_count()//2 if os.cpu_count() else "N/A",
    "RAM (free/total)" : run("free -h | awk '/Mem:/ {print $3\"/\"$2}'")
}

print("\n".join(f"{k:<16}: {v}" for k,v in info.items()))


In [None]:
#  SHHS-1  4-Channel-Preprocessing
import os, gc, re, pathlib, concurrent.futures as cf
from itertools import islice, groupby
from tqdm import tqdm

import numpy as np
import mne
from pyedflib import EdfReader   # Fallback
from lxml import etree

WIN_SEC, STEP_SEC, MIN_APNEA_S = 60, 30, 10          # Sliding-Window Settings
ORDER = ["SAO2", "HR", "THOR RES", "ABDO RES"]       # order in X
ALIAS = {                                            # possible channel-synonyms
    "SAO2":     ["SAO2", "SPO2", "SATS"],
    "HR":       ["H.R.", "HR", "PR", "PULSERATE"],
    "THOR RES": ["THOR RES", "THOR", "CHEST", "THOR RIP"],
    "ABDO RES": ["ABDO RES", "ABD", "ABD RIP", "ABDOMEN"],
}

DATA_DIR = pathlib.Path("~/MSI-HTWG/Seminar/data/nsrr/shhs/polysomnography").expanduser()
EDF_DIR  = DATA_DIR / "edfs/shhs1"
ANN_DIR  = DATA_DIR / "annotations-events-nsrr/shhs1"

OUT_DIR  = pathlib.Path("~/MSI-HTWG/Seminar/blocks_4signale_1Hz_SHHS1").expanduser()
OUT_DIR.mkdir(exist_ok=True)

subj_id = lambda p: p.name.split("-")[1].split(".")[0]

def _norm(s: str) -> str:
    """unifies channel names (capitatlization, special characters)"""
    return re.sub(r'[\W_]+', '', s.upper())

def find_ch(raw, keys):
    """find first available channel from list of aliases"""
    up = {_norm(c): c for c in raw.ch_names}
    for k in keys:
        n = _norm(k)
        if n in up:
            return up[n]

def apnea_events(xml_path):
    """returns (start, duration) of all relevant apnea events"""
    ok = {"obstructive apnea", "central apnea", "mixed apnea", "hypopnea"}
    for ev in etree.parse(str(xml_path)).findall(".//ScoredEvent"):
        label = f"{ev.findtext('EventType','').lower()}|" \
                f"{ev.findtext('EventConcept','').lower()}"
        if any(k in label for k in ok):
            yield float(ev.findtext("Start")), float(ev.findtext("Duration"))

def contig(a, n):
    """True, if >= n consecutive 1s in Binary vector"""
    return any(len(list(g)) >= n for k, g in groupby(a) if k)

# Fallback-Loader (pyEDFlib)
def load_with_pyedflib(edf_path, alias_list):
    """reads EDF via pyEDFlib, resamples to 1 Hz"""
    with EdfReader(str(edf_path)) as f:
        lbl_up = {lbl.upper(): i for i, lbl in enumerate(f.getSignalLabels())}
        idxs   = [next((lbl_up.get(k.upper()) for k in ks if k.upper() in lbl_up), None)
                  for ks in alias_list]
        if None in idxs:
            return None

        sfs    = f.getSampleFrequencies()
        dur    = int(f.file_duration)
        fs_max = int(max(sfs[i] for i in idxs))      # highest frequency
        data   = []
        for i in idxs:
            sig = f.readSignal(i).astype(np.float32)
            if len(sig) < fs_max * dur:              # fill up missing samples
                sig = np.pad(sig, (0, fs_max * dur - len(sig)), mode='edge')
            data.append(sig)

        X_raw = np.stack(data, axis=1)               # (T_raw,4)
        X_1hz = X_raw.reshape(-1, fs_max, 4).mean(1) # (T,4) @1 Hz
        return X_1hz

def process_one(edf_path):
    sid   = subj_id(edf_path)
    out_p = OUT_DIR / f"block_{sid}.npz"
    if out_p.exists():
        return "skip"

    # MNE
    try:
        raw = mne.io.read_raw_edf(edf_path, preload=False, verbose='ERROR')
        chs = [find_ch(raw, ALIAS[c]) for c in ORDER]
        if None in chs:
            return "missing"
        raw.pick(chs).load_data()
        raw.resample(1, npad="auto")
        X = raw.get_data().T.astype(np.float32)      # (T,4)
    except AssertionError:
        # pyEDFlib
        X = load_with_pyedflib(edf_path, [ALIAS[c] for c in ORDER])
        if X is None:
            return "missing"
    except Exception:
        return "corrupt"

    # Annotations
    ann_file = ANN_DIR / f"shhs1-{sid}-nsrr.xml"
    if not ann_file.exists():
        return "no_xml"

    y_sec = np.zeros(len(X), np.int8)
    for s, d in apnea_events(ann_file):
        y_sec[int(s): int(min(s + d, len(X)))] = 1

    # Sliding window blocks
    X_blk, y_blk = [], []
    for t0 in range(0, len(X) - WIN_SEC + 1, STEP_SEC):
        seg = X[t0:t0 + WIN_SEC]
        seg = (seg - seg.mean(0)) / (seg.std(0) + 1e-8)
        X_blk.append(seg.astype(np.float16))
        y_blk.append(int(contig(y_sec[t0:t0 + WIN_SEC], MIN_APNEA_S)))

    if X_blk:
        np.savez_compressed(out_p,
            X=np.stack(X_blk),
            y=np.asarray(y_blk, np.int8),
            subj_id=np.asarray([sid] * len(y_blk), '<U10'))
    gc.collect()
    return "done"

all_edfs  = sorted(EDF_DIR.glob("shhs1-*.edf"))
done_ids  = {p.stem.split('_')[1] for p in OUT_DIR.glob("block_*.npz")}
todo_edfs = [p for p in all_edfs if subj_id(p) not in done_ids][:100]  # max. 100 new

print(f"Already loaded: {len(done_ids):>4} Blocks")
print(f"TODO: {len(todo_edfs):>4} EDFs\n")

N_WORKER = min(4, os.cpu_count() or 1)
stats = {k: 0 for k in ["done", "skip", "missing", "no_xml", "corrupt"]}

def chunked(it, n):
    it = iter(it)
    while (chunk := list(islice(it, n))):
        yield chunk

for bi, batch in enumerate(chunked(todo_edfs, 50), 1):
    print(f"Batch {bi} – {len(batch)} Dateien")
    with cf.ProcessPoolExecutor(max_workers=N_WORKER) as pool:
        for res in tqdm(pool.map(process_one, batch), total=len(batch)):
            stats[res] += 1

print("\Done .npz-Blöcke in:", OUT_DIR)
print("Summary:", stats)


In [None]:
# Load model & SHHS-1 Data
import pathlib, numpy as np, tensorflow as tf

# Global Metrics
ALPHA   = 0.10      # highest 10 % of points
K_STEPS = 60        # number of steps for Deletion-/Insertion-AUC

# load model
MODEL_DIR = pathlib.Path("apnea_cnn_saved")
model = tf.keras.models.load_model(MODEL_DIR / "net.keras")

# collect shhs1 blocks
BLK_DIR   = pathlib.Path("~/MSI-HTWG/Seminar/blocks_4signale_1Hz_SHHS1").expanduser()
blk_files = sorted(BLK_DIR.glob("block_2*.npz"))
assert blk_files, f"No SHHS-1-Blocks in {BLK_DIR}!"

X_list, y_list = [], []
for f in blk_files:
    d = np.load(f)
    X_list.append(d["X"].astype("float32"))
    y_list.append(d["y"])
X = np.concatenate(X_list, axis=0)    # (N, 60, 4)
y = np.concatenate(y_list, axis=0)    # (N,)

print("Windows :", X.shape, "| Apnea rate:", y.mean().round(3))

N_SAMPLES_VIS = 3
rng           = np.random.default_rng()

idx_apnea   = int(rng.choice(np.where(y == 1)[0]))

other_idx   = rng.choice(
    np.setdiff1d(np.arange(len(X)), [idx_apnea]),
    size=N_SAMPLES_VIS - 1,
    replace=False
)

samples_vis = [idx_apnea] + other_idx.tolist()

print("Heatmap-Examples (idx):", samples_vis,
      "| y_true:", y[samples_vis])


N_EVAL_SAMPLES = 100
rng       = np.random.default_rng(42)
idx_eval  = rng.choice(len(X), size=N_EVAL_SAMPLES, replace=False)
X_eval    = X[idx_eval]
print("Benchmark-samples:", X_eval.shape)

def random_attrib(shape: tuple[int, ...]) -> np.ndarray:
    return np.random.randn(*shape).astype("float32")


In [None]:
# Grad-CAM
import matplotlib.pyplot as plt
from tf_keras_vis.gradcam import Gradcam
from tf_keras_vis.utils.model_modifiers import ReplaceToLinear
import tensorflow as tf

tf.keras.backend.mean = tf.math.reduce_mean

# find last conv1d-layer
conv_layers  = [l for l in model.layers if isinstance(l, tf.keras.layers.Conv1D)]
target_layer = conv_layers[-1]
print("nutze Layer:", target_layer.name)

gradcam = Gradcam(model,
                  model_modifier=ReplaceToLinear(),
                  clone=True)

# score-function
score_fn = lambda out: out[:, 0]      # apnea score

heatmaps = gradcam(score_fn,
                   X[samples_vis],
                   penultimate_layer=target_layer.name,
                   seek_penultimate_conv_layer=False)

for idx, hm in zip(samples_vis, heatmaps):
    plt.figure(figsize=(10, 2))
    plt.title(f"Grad-CAM idx={idx} y_true={y[idx]}", fontsize=13)
    plt.imshow(hm[np.newaxis, :], cmap='viridis',
               aspect='auto', extent=[0, 60, 0, 1])
    plt.xlabel("Second in 60-s-window");  plt.yticks([])
    plt.colorbar(fraction=0.03, pad=0.01)
    plt.tight_layout();  plt.show()


In [None]:
# Integrated Gradients
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

def integrated_gradients(model,
                         inputs,          # (N, 60, 4)
                         baseline=None,
                         steps=64,
                         alpha_batch=128):

    x = tf.convert_to_tensor(inputs, tf.float32)
    N = tf.shape(x)[0]

    if baseline is None:
        baseline = tf.zeros_like(x)
    else:
        baseline = tf.convert_to_tensor(baseline, tf.float32)

    alphas = tf.linspace(0., 1., steps + 1)  # (steps+1,)
    ig_accum = tf.zeros_like(x)

    for a0 in range(0, steps + 1, alpha_batch):
        a1 = min(a0 + alpha_batch, steps + 1)
        a = alphas[a0:a1]                       # (b,)
        b = a1 - a0

        a = a[:, None, None, None]              # (b,1,1,1)
        inter = baseline[None] + a * (x - baseline)[None]   # (b,N,60,4)
        inter = tf.reshape(inter, (b * N, 60, 4))           # (b·N,60,4)

        with tf.GradientTape() as tape:
            tape.watch(inter)
            preds = model(inter, training=False)[:, 0]      # (b·N,)
        grads = tape.gradient(preds, inter)                 # (b·N,60,4)

        grads = tf.reshape(grads, (b, N, 60, 4))            # (b,N,60,4)
        ig_accum += tf.reduce_mean(grads, axis=0)           # (N,60,4)

    ig = (x - baseline) * ig_accum                         # (N,60,4)
    return ig.numpy()

baseline = np.zeros_like(X[samples_vis])
igrads   = integrated_gradients(model,
                                X[samples_vis],
                                baseline=baseline,
                                steps=64)     # (3,60,4)

for idx, ig_arr in zip(samples_vis, igrads):
    plt.figure(figsize=(10, 2))
    ŷ = model.predict(X[idx:idx+1], verbose=0)[0, 0]
    plt.title(f"Integrated Gradients  |  idx={idx}   ŷ={ŷ:.3f}", fontsize=13)
    plt.plot(ig_arr.mean(-1), lw=2)
    plt.axhline(0, color='k', lw=.8)
    plt.xlim(0, 59)
    plt.xlabel("Second in 60-s-window")
    plt.ylabel("Attribution (Σ channels)")
    plt.tight_layout()
    plt.show()


In [None]:
# Deep SHAP
import matplotlib.pyplot as plt
import numpy as np

# Explainer for demo
bg   = X[np.random.choice(len(X), 100, replace=False)]
shx  = shap.DeepExplainer(model, bg)
raw  = shx.shap_values(X[samples_vis])[0]   # (3,60,4,4) oder (60,4,4)

if raw.ndim == 4:
    maps = raw.mean(-1)                     # (3,60,4)
else:
    maps = raw.mean(-1)[None, ...]          # (1,60,4)

for idx, hm in zip(samples_vis, maps):
    # hm.shape == (60,4)
    hm1 = hm.mean(-1)[None, :]              # (1,60)

    v = np.max(np.abs(hm1))
    plt.figure(figsize=(10,2))
    plt.title(f"SHAP idx={idx} y_true={y[idx]}", fontsize=13)
    plt.imshow(hm1, cmap='coolwarm',
               aspect='auto',
               extent=[0,60,0,1],
               vmin=-v, vmax= v)
    plt.xlabel("Seconds in 60-s-window")
    plt.yticks([])
    plt.colorbar(label="Attribution", fraction=0.03, pad=0.01)
    plt.tight_layout()
    plt.show()


In [None]:
# LIME
import matplotlib.pyplot as plt

# build explainler
lime_exp = lime_tab.LimeTabularExplainer(
    training_data         = X.reshape(len(X), -1),
    mode                  = 'classification',
    class_names           = ['NoApnea', 'Apnea'],
    discretize_continuous = False)

def _model_predict(flat):
    batch = flat.reshape(-1, 60, 4)
    p1 = model.predict(batch, verbose=0)
    return np.hstack([1 - p1, p1])

# run explainer
lime_maps = []
for s in X[samples_vis]:
    exp = lime_exp.explain_instance(s.flatten(),
                                   predict_fn=_model_predict,
                                   num_features=240)
    w = np.zeros(240)
    for idx_feat, val in exp.local_exp[1]:
        w[idx_feat] = val
    lime_maps.append(w.reshape(60,4))
lime_maps = np.stack(lime_maps)    # (3,60,4)

for idx, hm in zip(samples_vis, lime_maps):
    plt.figure(figsize=(10,2))
    plt.title(f"LIME  idx={idx} y_true={y[idx]}")
    plt.imshow(hm.mean(-1)[None,:], cmap='viridis',
               aspect='auto', extent=[0,60,0,1])
    plt.xlabel("Second in 60-s-window"); plt.yticks([])
    plt.colorbar(fraction=0.03, pad=0.01)
    plt.tight_layout(); plt.show()


In [None]:
import numpy as np, shap, lime.lime_tabular as lime_tab, tensorflow as tf
from tf_keras_vis.gradcam import Gradcam
from tf_keras_vis.utils.model_modifiers import ReplaceToLinear
from tf_explain.core.integrated_gradients import IntegratedGradients

# Helper
def _ensure_60_4(arr, n_chan=4):
    arr = np.asarray(arr, dtype="float32")

    # remove trailing singleton-axe
    if arr.ndim == 4 and arr.shape[-1] == 1:      # (N,60,4,1)
        arr = arr[..., 0]                         # zu (N,60,4)
    if arr.ndim == 3 and arr.shape[-1] == 1:      # (60,4,1)
        arr = arr[..., 0]                         # zu (60,4)

    # (1,N,60,4) zu (N,60,4)
    if arr.ndim == 4 and arr.shape[0] == 1 and arr.shape[2:] == (60, n_chan):
        arr = arr[0]
    # (N,60) zu (N,60,4)
    if arr.ndim == 2 and arr.shape[1] == 60:
        arr = np.repeat(arr[..., None], n_chan, axis=-1)

    # (60,4) → (1,60,4)
    if arr.ndim == 2 and arr.shape == (60, n_chan):
        arr = arr[None, ...]

    # (N,4,60)  zu (N,60,4)
    if arr.ndim == 3 and arr.shape[1:] == (n_chan, 60):
        arr = arr.swapaxes(1, 2)

    assert arr.ndim == 3 and arr.shape[2] == n_chan, f"Shape‐Fehler: {arr.shape}"
    return arr

# IG (tf-explain)
ig_exp = IntegratedGradients()

def map_igrads(x, steps=64, batch_size=32):
    attrs = []
    for i in range(0, len(x), batch_size):
        xb = x[i:i+batch_size]
        ig_raw = ig_exp.get_integrated_gradients(
            interpolated_images=IntegratedGradients.generate_interpolations(xb, steps),
            model=model,
            class_index=0,
            n_steps=steps,
        )                       # (batch, 60, 4)
        attrs.append(ig_raw)
    ig = np.concatenate(attrs, axis=0)        # (N, 60, 4)
    return _ensure_60_4(ig)

# Grad-CAM
pen_layer = next(l for l in reversed(model.layers)
                 if isinstance(l, tf.keras.layers.Conv1D))
gradcam = Gradcam(model, model_modifier=ReplaceToLinear(), clone=True)
score_fn = lambda y: y[:, 0]

def map_gradcam(x):
    hm = gradcam(score_fn, x,
                 penultimate_layer=pen_layer.name,
                 seek_penultimate_conv_layer=False)   # (N,60)
    return _ensure_60_4(hm)

# SHAP
bg   = X[np.random.choice(len(X), 100, replace=False)]
shxp = shap.DeepExplainer(model, bg)

def map_shap(x):
    sv = shxp.shap_values(x)[0]          # evtl. (N,60,4)
    return _ensure_60_4(sv)

# LIME
lime_exp = lime_tab.LimeTabularExplainer(
    training_data=X.reshape(len(X), -1),
    mode='classification',
    class_names=['NoApnea', 'Apnea'],
    discretize_continuous=False)

def _model_predict(flat):
    batch = flat.reshape(-1, 60, 4)
    p1 = model.predict(batch, verbose=0)
    return np.hstack([1 - p1, p1])

def map_lime(x):
    outs = []
    for s in x:
        exp = lime_exp.explain_instance(
            s.flatten(), predict_fn=_model_predict, num_features=240)
        w = np.zeros(240)
        for idx, val in exp.local_exp[1]:
            w[idx] = val
        outs.append(w.reshape(60, 4))
    return _ensure_60_4(np.stack(outs))

methods = {
    "Grad-CAM": map_gradcam,
    "IntGrad" : map_igrads,
    "SHAP"    : map_shap,
    "LIME"    : map_lime,
}


In [None]:
from tf_explain.core.integrated_gradients import IntegratedGradients
ig_explain = IntegratedGradients()

def map_igrads(x, steps=64):
    # generate interpolations (N*(steps+1), 60, 4)
    inter = IntegratedGradients.generate_interpolations(np.array(x), steps)

    # raw IG-Score
    ig_raw = IntegratedGradients.get_integrated_gradients(
        interpolated_images  = inter,
        model                = model,
        class_index          = 0,
        n_steps              = steps,
    )                                   # (N, 60, 4)

    return _ensure_60_4(ig_raw)


In [None]:
A_ig  = map_igrads(X_eval[:5])
A_cam = map_gradcam(X_eval[:5])
print("IG :", A_ig.shape, A_ig.min(), A_ig.max())
print("CAM:", A_cam.shape, A_cam.min(), A_cam.max())


In [None]:
def _focus_mask_order(attrib):
    """returns sorted indexes (important to unimportant) and threshold"""
    a = np.abs(attrib).mean(0)          # (60,4)
    flat = np.argsort(-a, axis=None)    # decending
    n_focus = int(ALPHA * flat.size)    # Top 10 percent
    return flat[:n_focus], n_focus

def deletion_auc(attrib, k_steps=K_STEPS):
    flat, n_focus = _focus_mask_order(attrib)
    masks = np.ones_like(X_eval, bool)
    auc, prev = 0., model.predict(X_eval, verbose=0).mean()
    step = max(1, n_focus // k_steps)
    for k in range(0, n_focus, step):
        blk = flat[k:k+step]
        masks.reshape(-1)[blk] = False
        conf = model.predict(X_eval * masks, verbose=0).mean()
        auc += (prev + conf) / 2
        prev = conf
    return auc / (n_focus // step)

def insertion_auc(attrib, k_steps=K_STEPS):
    flat, n_focus = _focus_mask_order(attrib)
    masks = np.zeros_like(X_eval, bool)
    auc, prev = 0., model.predict(np.zeros_like(X_eval), verbose=0).mean()
    step = max(1, n_focus // k_steps)
    for k in range(0, n_focus, step):
        blk = flat[k:k+step]
        masks.reshape(-1)[blk] = True
        conf = model.predict(X_eval * masks, verbose=0).mean()
        auc += (prev + conf) / 2
        prev = conf
    return auc / (n_focus // step)

def sparsity_entropy(attrib):
    # attrib: (N,60,4)
    p = np.abs(attrib).sum((1,2))
    p = p / p.sum()
    return -(p * np.log(p + 1e-9)).sum()

def latency_ms(fn):
    t0 = time.time()
    fn(X_eval)
    return (time.time() - t0) * 1000



In [None]:
K_STEPS  = 60
MASK_VAL = -10.0 

In [None]:
# Auto-Benchmark Full Run (4 Methods, Shape-Fix)
import numpy as np, time, math
from tqdm import tqdm

rng = np.random.default_rng(42)

K_STEPS   = 60
MASK_VAL  = -10.0
N_EVAL    = 100          # Windows per iterations
N_REPEAT  = 3            # Iterations

# Eval-Subset
yh_full  = model.predict(X, verbose=0)[:, 0]
mid_idx = np.where((yh_full > .2) & (yh_full < .8))[0]

# Shape & Mask helper
to_3d  = lambda A: A[:, :, None] if A.ndim == 2 else A        # (B,60) → (B,60,1)
def align_maps(A, Xb):
    A = to_3d(A)
    if A.shape[0] != Xb.shape[0]:
        A = np.repeat(A, Xb.shape[0], axis=0)
    return A
def mask_ts(x, t_idx):
    x = x.copy()
    x[:, t_idx, :] = MASK_VAL
    return x

# Deletion-AUC
def deletion_auc(A, Xb):
    A = align_maps(A, Xb)
    B, T, C = Xb.shape
    order = np.argsort(A.reshape(B, -1), axis=1)[:, ::-1]
    step  = max(1, order.shape[1] // K_STEPS)
    aucs  = []
    for b in range(B):
        x_mod = Xb[b:b+1].copy()
        base  = model.predict(x_mod, verbose=0)[0, 0] + 1e-8
        scores = [1.0]
        for i in range(0, order.shape[1], step):
            tids = order[b, i:i+step] // C
            x_mod = mask_ts(x_mod, tids)
            scores.append(model.predict(x_mod, verbose=0)[0, 0] / base)
        aucs.append(np.trapz(scores, dx=1/K_STEPS))
    return np.mean(aucs)

# Insertion-AUC
def insertion_auc(A, Xb):
    A = align_maps(A, Xb)
    B, T, C = Xb.shape
    order = np.argsort(A.reshape(B, -1), axis=1)[:, ::-1]
    step  = max(1, order.shape[1] // K_STEPS)
    aucs  = []
    for b in range(B):
        x_mod = np.full_like(Xb[b:b+1], MASK_VAL)
        base  = model.predict(x_mod, verbose=0)[0, 0]
        scores = [0.0]
        for i in range(0, order.shape[1], step):
            tids = order[b, i:i+step] // C
            x_mod[:, tids, :] = Xb[b:b+1, tids, :]
            scores.append((model.predict(x_mod, verbose=0)[0, 0] - base) /
                          (1 - base + 1e-8))
        aucs.append(np.trapz(scores, dx=1/K_STEPS))
    return np.mean(aucs)

# normalized Sparsity (0 … 1)
def sparsity_norm(A):
    p = np.abs(A).ravel()
    p /= p.sum() + 1e-8
    H = -(p * np.log2(p + 1e-12)).sum()
    return (math.log2(A.size) - H) / math.log2(A.size)

# Latency
def lat_ms(f, X1):
    f(X1)
    t0 = time.perf_counter_ns()
    _  = f(X1)
    return (time.perf_counter_ns() - t0) / 1_000_000

all_methods = {
    "Grad-CAM": methods["Grad-CAM"],
    "IntGrad" : methods["IntGrad"],
    "SHAP"    : methods["SHAP"],
    "LIME"    : methods["LIME"],
    "Random"  : lambda x: rng.standard_normal(x.shape)
}

results = []
for name, func in all_methods.items():
    del_s, ins_s, spa_s, lat_s = [], [], [], []
    for _ in tqdm(range(N_REPEAT), desc=f"{name} runs"):
        idx   = rng.choice(mid_idx, size=N_EVAL, replace=False)
        X_ev  = X[idx]
        A_map = func(X_ev)

        del_s.append(deletion_auc(A_map, X_ev))
        ins_s.append(insertion_auc(A_map, X_ev))
        spa_s.append(sparsity_norm(A_map))
        lat_s.append(0.0 if name == "Random" else lat_ms(func, X_ev[:1]))
    results.append(dict(
        method   = name,
        del_auc  = np.mean(del_s),
        ins_auc  = np.mean(ins_s),
        sparsity = np.mean(spa_s),
        lat_ms   = np.mean(lat_s)
    ))

print(f"\n=== Benchmark ({N_REPEAT}x{N_EVAL} Windows | {K_STEPS} Steps) ===")
for r in results:
    print(f"{r['method']:<8}  Del↓ {r['del_auc']:.3f}  "
          f"Ins↑ {r['ins_auc']:.3f}  "
          f"Spar↑ {r['sparsity']:.3f}  "
          f"Lat {r['lat_ms']:.0f} ms")
