In [None]:
# 把語音 wav 檔整理成可訓練的資料流程：
# 1) 讀取 CSV（包含 wav 檔名與 outcome 標籤）
# 2) 只保留術前資料（status = S）
# 3) 每個 wav 轉成兩種影像：Mel 與 STFT（PNG）
# 4) 用分層方式做 5-fold 切分，讓每折正負比例接近
#
# 輸入
# - /home/jovyan/114-1_HA/data/0604_final_REFCV.csv
#   欄位必須有 sound.files（wav 檔名）、outcome（0/1）
# - /home/jovyan/114-1_HA/data/a_slide/（wav 檔案資料夾）
#
# 輸出
# - Step1：manifest_S_only.csv（術前 S 的有效清單）
# - Step2：Mel / STFT PNG（存到 data/pic_v4/mel 與 data/pic_v4/stft）
# - Step3：splits（五折的 train/val index；分布會寫進 log）
# - Log：/home/jovyan/114-1_HA/log_v4/run_YYYYMMDD_HHMMSS.log
# =============================================================================


# =============================================================================
# Step0：全域設定 + 路徑 + logger + 小工具
# -----------------------------------------------------------------------------
# 在這一段把常用設定集中起來，把資料夾與輸出位置先準備好，
# 這樣後面每一步都能用同一套路徑與紀錄方式跑完。
# =============================================================================
import os, re, time, json, math
from pathlib import Path
import logging
import numpy as np
import pandas as pd


# -------------------------
# 專案根目錄放在這裡，後面所有路徑都從這裡往下接
# -------------------------
BASE_DIR = Path("/home/jovyan/114-1_HA")


# -------------------------
# 輸入與輸出路徑集中管理
# -------------------------
DATA_DIR      = BASE_DIR / "data"
CSV_PATH      = DATA_DIR / "0604_final_REFCV.csv"
AUDIO_DIR     = DATA_DIR / "a_slide"

PIC_DIR       = DATA_DIR / "pic_v4"
MEL_PIC_DIR   = PIC_DIR / "mel"
STFT_PIC_DIR  = PIC_DIR / "stft"

OUT_DIR       = DATA_DIR / "out"
CM_DIR        = OUT_DIR / "cm"
ROC_DIR       = OUT_DIR / "roc"
GRADCAM_DIR   = OUT_DIR / "gradcam"
GRADCAM_MEL_DIR   = GRADCAM_DIR / "mel"
GRADCAM_STFT_DIR  = GRADCAM_DIR / "stft"
GRADCAM_DUAL_DIR  = GRADCAM_DIR / "dual"
PROB_DIR      = OUT_DIR / "prob_margin"
CSV_OUT_DIR   = OUT_DIR / "csv"

LOG_DIR       = BASE_DIR / "log_v4"


# -------------------------
# 先把會用到的資料夾都建好
# -------------------------
for d in [
    MEL_PIC_DIR, STFT_PIC_DIR,
    CM_DIR, ROC_DIR,
    GRADCAM_MEL_DIR, GRADCAM_STFT_DIR, GRADCAM_DUAL_DIR,
    PROB_DIR, CSV_OUT_DIR,
    LOG_DIR
]:
    d.mkdir(parents=True, exist_ok=True)


# -------------------------
# 把 Step1 產出的清單固定成一個檔案，後面都直接讀它
# -------------------------
MANIFEST_PATH = DATA_DIR / "manifest_S_only.csv"


# -------------------------
# 常調的參數，之後要調整不會漏
# -------------------------
# 音訊讀取設定
SR = 16000
DURATION_SEC = 10.0
N_SAMPLES = int(SR * DURATION_SEC)

# 圖像輸入大小（後面 CNN
IMG_H, IMG_W = 224, 224

# 交叉驗證設定
N_FOLDS = 5
SEED = 42

# 影像 CNN 的訓練參數（如果 Step4 另外有一套，這裡就當預設）
EPOCHS = 20
BATCH_SIZE = 32
LR = 1e-3

# 只取術前資料
USE_STATUS = "S"

# 在每一折做 Grad-CAM 會取多少張
GRADCAM_N_SAMPLES_PER_FOLD = 4


# -------------------------
# 把 logger 做成同時輸出到螢幕與檔案
# -------------------------
RUN_TAG = time.strftime("%Y%m%d_%H%M%S")
LOG_PATH = LOG_DIR / f"run_{RUN_TAG}.log"

def build_logger(log_path: Path) -> logging.Logger:
    logger = logging.getLogger(f"voice_proj_{log_path.stem}")
    logger.setLevel(logging.INFO)
    logger.propagate = False
    while logger.handlers:
        logger.handlers.pop()

    fmt = logging.Formatter(
        fmt="%(asctime)s | %(levelname)s | %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S"
    )

    fh = logging.FileHandler(log_path, mode="w", encoding="utf-8")
    fh.setFormatter(fmt)
    logger.addHandler(fh)

    sh = logging.StreamHandler()
    sh.setFormatter(fmt)
    logger.addHandler(sh)

    return logger

logger = build_logger(LOG_PATH)

logger.info("==== Start run ====")
logger.info(f"BASE_DIR={BASE_DIR}")
logger.info(f"CSV_PATH={CSV_PATH}")
logger.info(f"AUDIO_DIR={AUDIO_DIR}")
logger.info(f"MEL_PIC_DIR={MEL_PIC_DIR}")
logger.info(f"STFT_PIC_DIR={STFT_PIC_DIR}")
logger.info(f"OUT_DIR={OUT_DIR}")
logger.info(f"LOG_PATH={LOG_PATH}")
logger.info(f"HP: SR={SR}, DURATION_SEC={DURATION_SEC}, IMG={IMG_H}x{IMG_W}, folds={N_FOLDS}")
logger.info(f"Use only status={USE_STATUS}")


# -------------------------
# 用檔名規則解析 wav 的資訊，例如 5_S1_3.wav
# 格式是 pid_statusvowel_clip.wav
# -------------------------
FNAME_RE = re.compile(
    r"^(?P<pid>\d+)_(?P<status>[SA])(?P<vowel>\d+)_(?P<clip>\d+)\.wav$",
    re.IGNORECASE
)

def parse_wav_name(fname: str):
    m = FNAME_RE.match(fname)
    if not m:
        return None
    return {
        "patient_id": int(m.group("pid")),
        "status": m.group("status").upper(),
        "vowel_id": int(m.group("vowel")),
        "clip_id": int(m.group("clip")),
    }


# -------------------------
#兩個小工具，後面做統計與 log 會一直用到
# -------------------------
def count_pos_neg(y_arr):
    y_arr = np.asarray(y_arr).astype(int)
    n_pos = int((y_arr == 1).sum())
    n_neg = int((y_arr == 0).sum())
    return n_pos, n_neg

def pos_rate(n_pos, n_neg):
    total = n_pos + n_neg
    return float(n_pos / total) if total > 0 else float("nan")

logger.info("Step0 ready.")




In [None]:

# =============================================================================
# Step1：讀 CSV -> 篩術前 S -> 建 manifest
# -----------------------------------------------------------------------------
# 在這一段把資料先洗乾淨，做成一份可直接用的清單：
# - outcome 只接受 0/1
# - 檔名要符合命名規則
# - 只保留術前 S
# - wav 檔必須存在
#
# 最後會輸出 manifest_S_only.csv，後面流程都以它為準
# =============================================================================
df = pd.read_csv(CSV_PATH)

need_cols = {"sound.files", "outcome"}
missing = need_cols - set(df.columns)
if missing:
    raise ValueError(f"CSV 缺欄位：{missing}；目前欄位={list(df.columns)}")

rows = []
bad_names = 0
missing_wav = 0
filtered_not_S = 0
bad_label = 0

for _, r in df.iterrows():
    wav_name = str(r["sound.files"]).strip()

    # 我只接受 outcome 是 0/1，其他值我直接跳過，避免標籤混亂
    try:
        label = int(r["outcome"])
    except Exception:
        bad_label += 1
        continue
    if label not in (0, 1):
        bad_label += 1
        continue

    info = parse_wav_name(wav_name)
    if info is None:
        bad_names += 1
        continue

    # 我只保留術前
    if info["status"] != USE_STATUS:
        filtered_not_S += 1
        continue

    # 我確認 wav 檔真的存在
    wav_path = AUDIO_DIR / wav_name
    if not wav_path.exists():
        missing_wav += 1
        continue

    rows.append({
        "wav_name": wav_name,
        "wav_path": str(wav_path),
        "patient_id": info["patient_id"],
        "status": info["status"],
        "vowel_id": info["vowel_id"],
        "clip_id": info["clip_id"],
        "label": label
    })

df_m = (
    pd.DataFrame(rows)
      .sort_values(["patient_id", "clip_id"])
      .reset_index(drop=True)
)

df_m.to_csv(MANIFEST_PATH, index=False, encoding="utf-8")

seg_pos, seg_neg = count_pos_neg(df_m["label"].values)
pat_dist = df_m.drop_duplicates("patient_id")["label"].value_counts().to_dict()

logger.info("======== [Step1] Manifest built ========")
logger.info(f"[Step1] CSV rows={len(df)}")
logger.info(f"[Step1] bad filename format={bad_names}")
logger.info(f"[Step1] bad label rows={bad_label} (non 0/1 or parse fail)")
logger.info(f"[Step1] filtered_not_S={filtered_not_S} (只保留 status={USE_STATUS})")
logger.info(f"[Step1] missing wav={missing_wav}")
logger.info(f"[Step1] manifest segments={len(df_m)} | pos(1)={seg_pos} neg(0)={seg_neg} pos_rate={pos_rate(seg_pos, seg_neg):.4f}")
logger.info(f"[Step1] unique patients={df_m['patient_id'].nunique()} | patient label dist={pat_dist}")
logger.info(f"[Step1] saved: {MANIFEST_PATH}")

df_m.head()




In [None]:
# Step2：WAV -> Mel / STFT PNG
# -----------------------------------------------------------------------------
# 這一段把每個 wav 轉成兩張 PNG：
# - Mel：比較像人耳的頻帶表示
# - STFT：更原始的頻譜能量
#
# 特別處理短音檔：
# - 會根據音檔長度調整 n_fft/hop，避免 n_fft 超過訊號長度
# - 把時間軸固定到 TARGET_FRAMES，太短就用最後一欄延展補齊
# - 用分位數決定顯示對比，避免整張圖變單色
# =============================================================================
import matplotlib.pyplot as plt

try:
    import librosa
except Exception as e:
    raise RuntimeError("缺少 librosa，請先安裝：pip install librosa") from e


# -------------------------
# 用這些門檻去過濾幾乎沒訊號的音檔
# -------------------------
SILENCE_STD_TH = 1e-6
SILENCE_RMS_TH = 1e-6

# 我用 dB 壓縮讓圖更穩定，TOP_DB 控制動態範圍
TOP_DB = 80

# 我準備一組保底的顯示範圍，避免對比太小導致整張單色
VMIN_DB = -60
VMAX_DB = 0


# -------------------------
# 這一段是短音檔常用的頻譜參數
# N_FFT 越大頻率解析越細，但短音檔越容易踩到長度不足
# HOP 越小時間解析越密，但 frame 數會更多
# N_MELS 決定 Mel 頻帶數量
# TARGET_FRAMES 決定最後輸出統一的時間寬度
# -------------------------
N_FFT = 128
HOP = 16
N_MELS = 64
TARGET_FRAMES = 64


def load_audio_raw(path: str, sr=SR):
    y, _ = librosa.load(path, sr=sr, mono=True)
    if y.size == 0:
        return y
    if not np.isfinite(y).all():
        return np.array([], dtype=np.float32)

    y = y.astype(np.float32)
    y = y - np.mean(y)
    peak = np.max(np.abs(y)) + 1e-9
    y = y / peak
    return y


def is_silent(y: np.ndarray, std_th=SILENCE_STD_TH, rms_th=SILENCE_RMS_TH):
    if y.size == 0:
        return True, "empty"
    if not np.isfinite(y).all():
        return True, "nan_or_inf"
    std = float(np.std(y))
    rms = float(np.sqrt(np.mean(y ** 2)))
    if std < std_th:
        return True, f"std<{std_th} (std={std:.2e})"
    if rms < rms_th:
        return True, f"rms<{rms_th} (rms={rms:.2e})"
    return False, "ok"


def fix_frames(mat_2d: np.ndarray, target_frames: int = TARGET_FRAMES):
    if mat_2d.ndim != 2:
        raise ValueError(f"fix_frames expects 2D matrix, got shape={mat_2d.shape}")
    T = mat_2d.shape[1]
    if T == target_frames:
        return mat_2d
    if T > target_frames:
        return mat_2d[:, :target_frames]

    pad = target_frames - T
    last_col = mat_2d[:, -1:]
    pad_block = np.repeat(last_col, repeats=pad, axis=1)
    return np.concatenate([mat_2d, pad_block], axis=1)


def _robust_vmin_vmax(db_mat, default_vmin=VMIN_DB, default_vmax=VMAX_DB):
    finite = db_mat[np.isfinite(db_mat)]
    if finite.size == 0:
        return default_vmin, default_vmax
    vmin = float(np.percentile(finite, 5))
    vmax = float(np.percentile(finite, 95))
    if vmax - vmin < 10:
        vmin = min(vmin, default_vmin)
        vmax = max(vmax, default_vmax)
    return vmin, vmax


def _adapt_nfft_hop(y_len: int):
    if y_len <= 0:
        return N_FFT, HOP
    n_fft = min(N_FFT, max(64, 2 ** int(np.floor(np.log2(y_len)))))
    n_fft = min(n_fft, y_len)
    hop = min(HOP, max(8, n_fft // 8))
    return int(n_fft), int(hop)


def save_mel_png(y, out_png: Path, sr=SR):
    if y.size == 0:
        return
    n_fft, hop = _adapt_nfft_hop(len(y))
    mel = librosa.feature.melspectrogram(
        y=y, sr=sr, n_fft=n_fft, hop_length=hop, n_mels=N_MELS, power=2.0
    )
    mel_db = librosa.power_to_db(mel, ref=1.0, top_db=TOP_DB)
    mel_db = fix_frames(mel_db, TARGET_FRAMES)
    vmin, vmax = _robust_vmin_vmax(mel_db)

    plt.figure(figsize=(4, 4), dpi=120)
    plt.axis("off")
    plt.imshow(mel_db, aspect="auto", origin="lower", vmin=vmin, vmax=vmax)
    plt.tight_layout(pad=0)
    plt.savefig(out_png, bbox_inches="tight", pad_inches=0)
    plt.close()


def save_stft_png(y, out_png: Path, sr=SR):
    if y.size == 0:
        return
    n_fft, hop = _adapt_nfft_hop(len(y))
    stft = librosa.stft(y, n_fft=n_fft, hop_length=hop, win_length=n_fft, center=False)
    mag = (np.abs(stft) ** 2).astype(np.float32)
    spec_db = librosa.power_to_db(mag, ref=1.0, top_db=TOP_DB)
    spec_db = fix_frames(spec_db, TARGET_FRAMES)
    vmin, vmax = _robust_vmin_vmax(spec_db)

    plt.figure(figsize=(4, 4), dpi=120)
    plt.axis("off")
    plt.imshow(spec_db, aspect="auto", origin="lower", vmin=vmin, vmax=vmax)
    plt.tight_layout(pad=0)
    plt.savefig(out_png, bbox_inches="tight", pad_inches=0)
    plt.close()


made = 0
skipped_exist = 0
skipped_silent = 0
skipped_error = 0

for _, r in df_m.iterrows():
    wav_name = str(r["wav_name"]).strip()
    wav_path = str(r["wav_path"]).strip()

    mel_png  = MEL_PIC_DIR  / f"{wav_name}.png"
    stft_png = STFT_PIC_DIR / f"{wav_name}.png"

    if mel_png.exists() and stft_png.exists():
        skipped_exist += 1
        continue

    try:
        y_raw = load_audio_raw(wav_path)
        silent, reason = is_silent(y_raw)
        if silent:
            skipped_silent += 1
            logger.info(f"[Step2] skip silent/invalid wav: {wav_name} | reason={reason}")
            continue

        save_mel_png(y_raw, mel_png)
        save_stft_png(y_raw, stft_png)
        made += 1

    except Exception as e:
        skipped_error += 1
        logger.info(f"[Step2] error on {wav_name}: {repr(e)}")

logger.info("======== [Step2] Images generated ========")
logger.info(f"[Step2] made={made}")
logger.info(f"[Step2] skipped(existing)={skipped_exist}")
logger.info(f"[Step2] skipped(silent/invalid)={skipped_silent}")
logger.info(f"[Step2] skipped(error)={skipped_error}")
logger.info(f"[Step2] mel_dir={MEL_PIC_DIR} | count={len(list(MEL_PIC_DIR.glob('*.png')))}")
logger.info(f"[Step2] stft_dir={STFT_PIC_DIR} | count={len(list(STFT_PIC_DIR.glob('*.png')))}")

step2_report = {
    "made": made,
    "skipped_exist": skipped_exist,
    "skipped_silent": skipped_silent,
    "skipped_error": skipped_error,
    "mel_count": len(list(MEL_PIC_DIR.glob("*.png"))),
    "stft_count": len(list(STFT_PIC_DIR.glob("*.png"))),
    "target_frames": TARGET_FRAMES,
    "n_fft": N_FFT,
    "hop": HOP,
    "n_mels": N_MELS
}
step2_report




In [None]:
# Step3：五折交叉驗證切分（Segment-level StratifiedKFold）
# -----------------------------------------------------------------------------
# 在這一段用分層切分，讓每一折的正負比例接近
# 每折病人重疊數印出來，讓我知道重疊程度
# =============================================================================
from sklearn.model_selection import StratifiedKFold


def log_fold_distribution(fold: int, train_idx: np.ndarray, val_idx: np.ndarray,
                          y_all: np.ndarray, df_meta: pd.DataFrame):
    tr_pos, tr_neg = count_pos_neg(y_all[train_idx])
    va_pos, va_neg = count_pos_neg(y_all[val_idx])

    logger.info(f"[Fold {fold}] SEG train: pos={tr_pos}, neg={tr_neg}, pos_rate={pos_rate(tr_pos, tr_neg):.4f}")
    logger.info(f"[Fold {fold}] SEG  val : pos={va_pos}, neg={va_neg}, pos_rate={pos_rate(va_pos, va_neg):.4f}")

    tr_pat_dist = (df_meta.iloc[train_idx].drop_duplicates("patient_id")["label"].value_counts().to_dict())
    va_pat_dist = (df_meta.iloc[val_idx].drop_duplicates("patient_id")["label"].value_counts().to_dict())

    tr_pat_pos = int(tr_pat_dist.get(1, 0)); tr_pat_neg = int(tr_pat_dist.get(0, 0))
    va_pat_pos = int(va_pat_dist.get(1, 0)); va_pat_neg = int(va_pat_dist.get(0, 0))

    logger.info(f"[Fold {fold}] PAT train: pos={tr_pat_pos}, neg={tr_pat_neg}, pos_rate={pos_rate(tr_pat_pos, tr_pat_neg):.4f} | dist={tr_pat_dist}")
    logger.info(f"[Fold {fold}] PAT  val : pos={va_pat_pos}, neg={va_pat_neg}, pos_rate={pos_rate(va_pat_pos, va_pat_neg):.4f} | dist={va_pat_dist}")


def make_5fold_cv_splits_segment_stratified(y: np.ndarray, df_meta: pd.DataFrame,
                                            n_folds: int = 5, seed: int = 42):
    y = np.asarray(y).astype(int)
    all_idx = np.arange(len(y), dtype=int)

    skf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=seed)

    splits = []
    logger.info("======== [Step3] Start 5-fold CV split ========")

    for fold, (train_idx, val_idx) in enumerate(skf.split(all_idx, y), start=1):
        logger.info("------------------------------------------------------------")
        logger.info(f"[Fold {fold}] split created")

        inter = np.intersect1d(train_idx, val_idx)
        if len(inter) != 0:
            raise RuntimeError(f"train/val overlap detected in fold {fold}: n={len(inter)}")

        log_fold_distribution(fold, train_idx, val_idx, y, df_meta)

        tr_pats = set(df_meta.iloc[train_idx]["patient_id"].unique().tolist())
        va_pats = set(df_meta.iloc[val_idx]["patient_id"].unique().tolist())
        overlap_pats = tr_pats.intersection(va_pats)
        logger.info(f"[Fold {fold}] PAT overlap(train∩val)={len(overlap_pats)}")

        splits.append({
            "fold": fold,
            "train_idx": train_idx.astype(int),
            "val_idx": val_idx.astype(int),
        })

    logger.info("======== [Step3] CV split finished ========")
    return splits


y = df_m["label"].to_numpy()
splits = make_5fold_cv_splits_segment_stratified(
    y=y,
    df_meta=df_m,
    n_folds=N_FOLDS,
    seed=SEED
)

logger.info(f"[Step3] splits prepared: {len(splits)} folds")
logger.info(f"[Step3] keys example: {list(splits[0].keys())}")

len(splits), splits[0].keys()


In [None]:
# 用已經做好的頻譜圖（Mel + STFT）來訓練一個雙輸入 CNN 模型，並用 segment-level 的五折分層交叉驗證評估。
# 把每一折都跑出每個 epoch 的 Train/Val/Test 指標，並用驗證集的 AUROC_prob 來挑最佳 epoch。
#
# 處理的資料型態
# - 輸入資料：CSV 內的 sound.files 與 outcome（0/1）
# - 影像輸入：data/pic_v4/mel 與 data/pic_v4/stft 內的 png
# - 模型輸出：二分類機率 prob（0~1）
#
# 會產出的檔案
# - log：log_v4/run_v2_*.log
# - 每折每 epoch 指標：data/out/csv/fold**_epoch_metrics.csv
# - 每折最佳 epoch 指標：data/out/csv/fold**_best_metrics.csv
# - 五折彙總（含 mean/std）：data/out/csv/cv_best_summary.csv
# - 曲線圖：data/out/plots/fold**_(loss|acc|auroc).png
# - 混淆矩陣：data/out/cm/fold**_cm_best.png
# =============================================================================


In [None]:
import os, re, time, math
from pathlib import Path
import logging
import numpy as np
import pandas as pd


# =============================================================================
# Step0：把路徑、資料夾、logger、以及主要超參數集中在同一段
# -----------------------------------------------------------------------------
# =============================================================================

BASE_DIR = Path("/home/jovyan/114-1_HA")

DATA_DIR      = BASE_DIR / "data"
CSV_PATH      = DATA_DIR / "0604_final_REFCV.csv"
AUDIO_DIR     = DATA_DIR / "a_slide"     # 我這版不會用 wav 產圖，但仍保留檢查 wav 存在用
PIC_DIR       = DATA_DIR / "pic_v4"      # 我用的頻譜圖版本
MEL_PIC_DIR   = PIC_DIR / "mel"
STFT_PIC_DIR  = PIC_DIR / "stft"

# 所有輸出都統一丟到 out 底下，包含 csv、曲線圖、混淆矩陣
OUT_DIR       = DATA_DIR / "out"
CSV_OUT_DIR   = OUT_DIR / "csv"
PLOT_DIR      = OUT_DIR / "plots"
CM_DIR        = OUT_DIR / "cm"


LOG_DIR       = BASE_DIR / "log_v4"

# 先把會用到的資料夾都建起來，避免跑到一半才發現缺資料夾
for d in [MEL_PIC_DIR, STFT_PIC_DIR, OUT_DIR, CSV_OUT_DIR, PLOT_DIR, CM_DIR, LOG_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# 把 Step1 的清單輸出固定成一個檔案，後面資料切分與訓練都以它為準
MANIFEST_PATH = DATA_DIR / "manifest_S_only.csv"

# 影像的輸入尺寸放在這裡，資料集讀圖時會用到
IMG_H, IMG_W = 224, 224

# 我用分層五折交叉驗證，seed 固定住讓切分可重現
N_FOLDS = 5
SEED = 42

# 我把 Step4 的主要訓練設定放在這裡
# warmup：先凍結 backbone，只訓練 head，讓分類頭先收斂
# finetune：再解凍 backbone 後段，讓特徵更貼近資料分布
EPOCHS_WARMUP   = 20
EPOCHS_FINETUNE = 60
BATCH_SIZE      = 32

# warmup 的學習率我會設高一點讓 head 收斂快
LR_WARMUP       = 3e-4

# finetune 我用很低的學習率避免大幅破壞預訓練權重
LR_FINETUNE     = 1e-5

# 用 L2 來壓制 head 權重，降低過擬合
WEIGHT_DECAY_L2 = 1e-4

# 我用 dropout 讓 head 不要太快記住訓練集細節
DROPOUT         = 0.35

# head 的容量，我用它控制分類頭的表達能力
HEAD_UNITS      = 64

# early stopping 的耐心值，我用它決定多久沒進步就停
PATIENCE        = 15

# 在每折的 train 裡再切一個內層 val，用來 early stop 與挑 best epoch
VAL_RATIO_IN_TRAIN = 0.15

# 在驗證集上掃 threshold，找到讓 F1 最好的 cut point
THR_SWEEP_STEPS = 201

RUN_TAG = time.strftime("%Y%m%d_%H%M%S")
LOG_PATH = LOG_DIR / f"run_v2_{RUN_TAG}.log"

def build_logger(log_path: Path) -> logging.Logger:
    logger = logging.getLogger(f"voice_proj_{log_path.stem}")
    logger.setLevel(logging.INFO)
    logger.propagate = False
    while logger.handlers:
        logger.handlers.pop()

    fmt = logging.Formatter(
        fmt="%(asctime)s | %(levelname)s | %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S"
    )

    fh = logging.FileHandler(log_path, mode="w", encoding="utf-8")
    fh.setFormatter(fmt)
    logger.addHandler(fh)

    sh = logging.StreamHandler()
    sh.setFormatter(fmt)
    logger.addHandler(sh)
    return logger

logger = build_logger(LOG_PATH)

def count_pos_neg(y_arr):
    y_arr = np.asarray(y_arr).astype(int)
    n_pos = int((y_arr == 1).sum())
    n_neg = int((y_arr == 0).sum())
    return n_pos, n_neg

def pos_rate(n_pos, n_neg):
    total = n_pos + n_neg
    return float(n_pos / total) if total > 0 else float("nan")

logger.info("==== Start run (v2) ====")
logger.info(f"BASE_DIR={BASE_DIR}")
logger.info(f"CSV_PATH={CSV_PATH}")
logger.info(f"AUDIO_DIR={AUDIO_DIR}")
logger.info(f"MEL_PIC_DIR={MEL_PIC_DIR}")
logger.info(f"STFT_PIC_DIR={STFT_PIC_DIR}")
logger.info(f"OUT_DIR={OUT_DIR}")
logger.info(f"LOG_PATH={LOG_PATH}")

logger.info("==== Outputs will be saved to ====")
logger.info(f"[LOG]   {LOG_PATH}")
logger.info(f"[CSV]   per-epoch metrics: {CSV_OUT_DIR}/fold**_epoch_metrics.csv")
logger.info(f"[CSV]   per-fold best:     {CSV_OUT_DIR}/fold**_best_metrics.csv")
logger.info(f"[CSV]   cv summary:        {CSV_OUT_DIR}/cv_best_summary.csv")
logger.info(f"[PLOT]  curves:           {PLOT_DIR}/fold**_(loss|acc|auroc).png")
logger.info(f"[CM]    confusion:        {CM_DIR}/fold**_cm_best.png")


In [None]:
# Step1：從 CSV 建一份乾淨的 manifest
# -----------------------------------------------------------------------------
# 這一步做三件事：
# 1) 檢查 outcome 只接受 0/1
# 2) 用檔名規則解析出 patient_id 等資訊
# 3) 只保留術前資料（status = S），並確認 wav 檔存在
# =============================================================================

USE_STATUS = "S"

FNAME_RE = re.compile(
    r"^(?P<pid>\d+)_(?P<status>[SA])(?P<vowel>\d+)_(?P<clip>\d+)\.wav$",
    re.IGNORECASE
)

def parse_wav_name(fname: str):
    m = FNAME_RE.match(fname)
    if not m:
        return None
    return {
        "patient_id": int(m.group("pid")),
        "status": m.group("status").upper(),
        "vowel_id": int(m.group("vowel")),
        "clip_id": int(m.group("clip")),
    }

df = pd.read_csv(CSV_PATH)
need_cols = {"sound.files", "outcome"}
missing = need_cols - set(df.columns)
if missing:
    raise ValueError(f"CSV 缺欄位：{missing}；目前欄位={list(df.columns)}")

rows = []
bad_names = 0
missing_wav = 0
filtered_not_S = 0
bad_label = 0

for _, r in df.iterrows():
    wav_name = str(r["sound.files"]).strip()

    # 我只允許 outcome 是 0 或 1，其他值我先排除，避免標籤污染
    try:
        label = int(r["outcome"])
    except Exception:
        bad_label += 1
        continue
    if label not in (0, 1):
        bad_label += 1
        continue

    info = parse_wav_name(wav_name)
    if info is None:
        bad_names += 1
        continue

    # 只拿術前 S，術後 A 先不進來
    if info["status"] != USE_STATUS:
        filtered_not_S += 1
        continue

    # 檢查 wav 是否存在，避免 CSV 指到不存在的檔
    wav_path = AUDIO_DIR / wav_name
    if not wav_path.exists():
        missing_wav += 1
        continue

    rows.append({
        "wav_name": wav_name,
        "wav_path": str(wav_path),
        "patient_id": info["patient_id"],
        "status": info["status"],
        "vowel_id": info["vowel_id"],
        "clip_id": info["clip_id"],
        "label": label
    })

df_m = (
    pd.DataFrame(rows)
      .sort_values(["patient_id", "clip_id"])
      .reset_index(drop=True)
)

df_m.to_csv(MANIFEST_PATH, index=False, encoding="utf-8")

seg_pos, seg_neg = count_pos_neg(df_m["label"].values)
pat_dist = df_m.drop_duplicates("patient_id")["label"].value_counts().to_dict()

logger.info("======== [Step1] Manifest built ========")
logger.info(f"[Step1] CSV rows={len(df)}")
logger.info(f"[Step1] bad filename format={bad_names}")
logger.info(f"[Step1] bad label rows={bad_label}")
logger.info(f"[Step1] filtered_not_S={filtered_not_S} (keep status={USE_STATUS})")
logger.info(f"[Step1] missing wav={missing_wav}")
logger.info(f"[Step1] manifest segments={len(df_m)} | pos(1)={seg_pos} neg(0)={seg_neg} pos_rate={pos_rate(seg_pos, seg_neg):.4f}")
logger.info(f"[Step1] unique patients={df_m['patient_id'].nunique()} | patient label dist={pat_dist}")
logger.info(f"[Step1] saved: {MANIFEST_PATH}")


In [None]:
# Step2：只做基本檢查，確認頻譜圖數量跟 manifest 對得上
# -----------------------------------------------------------------------------
# 這版不在這裡產圖，我假設 mel/stft png 已經在 pic_v4 裡準備好
# 如果數量對不上，後面讀圖很可能會直接報錯
# =============================================================================

mel_count = len(list(MEL_PIC_DIR.glob("*.png")))
stft_count = len(list(STFT_PIC_DIR.glob("*.png")))
logger.info("======== [Step2] PNG count check ========")
logger.info(f"[Step2] mel_count={mel_count}, stft_count={stft_count}, manifest={len(df_m)}")
if mel_count != len(df_m) or stft_count != len(df_m):
    logger.info("[Step2] WARNING: png count mismatch, training may fail if missing images")


# =============================================================================
# Step3：做 segment-level 的五折分層切分
# -----------------------------------------------------------------------------
# 目標是讓每一折 train/val 的正負比例接近
# =============================================================================

from sklearn.model_selection import StratifiedKFold, StratifiedShuffleSplit

def log_fold_distribution(fold: int, train_idx: np.ndarray, val_idx: np.ndarray,
                          y_all: np.ndarray, df_meta: pd.DataFrame):
    tr_pos, tr_neg = count_pos_neg(y_all[train_idx])
    va_pos, va_neg = count_pos_neg(y_all[val_idx])
    logger.info(f"[Fold {fold}] SEG train: pos={tr_pos}, neg={tr_neg}, pos_rate={pos_rate(tr_pos, tr_neg):.4f}")
    logger.info(f"[Fold {fold}] SEG  val : pos={va_pos}, neg={va_neg}, pos_rate={pos_rate(va_pos, va_neg):.4f}")

    tr_pat_dist = (df_meta.iloc[train_idx].drop_duplicates("patient_id")["label"].value_counts().to_dict())
    va_pat_dist = (df_meta.iloc[val_idx].drop_duplicates("patient_id")["label"].value_counts().to_dict())
    tr_pat_pos = int(tr_pat_dist.get(1, 0)); tr_pat_neg = int(tr_pat_dist.get(0, 0))
    va_pat_pos = int(va_pat_dist.get(1, 0)); va_pat_neg = int(va_pat_dist.get(0, 0))
    logger.info(f"[Fold {fold}] PAT train: pos={tr_pat_pos}, neg={tr_pat_neg}, pos_rate={pos_rate(tr_pat_pos, tr_pat_neg):.4f} | dist={tr_pat_dist}")
    logger.info(f"[Fold {fold}] PAT  val : pos={va_pat_pos}, neg={va_pat_neg}, pos_rate={pos_rate(va_pat_pos, va_pat_neg):.4f} | dist={va_pat_dist}")

def make_5fold_cv_splits_segment_stratified(y: np.ndarray, df_meta: pd.DataFrame,
                                            n_folds: int = 5, seed: int = 42):
    y = np.asarray(y).astype(int)
    all_idx = np.arange(len(y), dtype=int)
    skf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=seed)

    splits = []
    logger.info("======== [Step3] Start 5-fold CV split (Segment-level StratifiedKFold) ========")

    for fold, (train_idx, val_idx) in enumerate(skf.split(all_idx, y), start=1):
        logger.info("------------------------------------------------------------")
        logger.info(f"[Fold {fold}] split created")

        inter = np.intersect1d(train_idx, val_idx)
        if len(inter) != 0:
            raise RuntimeError(f"train/val overlap detected in fold {fold}: n={len(inter)}")

        log_fold_distribution(fold, train_idx, val_idx, y, df_meta)

        tr_pats = set(df_meta.iloc[train_idx]["patient_id"].unique().tolist())
        va_pats = set(df_meta.iloc[val_idx]["patient_id"].unique().tolist())
        overlap_pats = tr_pats.intersection(va_pats)
        logger.info(f"[Fold {fold}] PAT overlap(train∩val)={len(overlap_pats)}")

        splits.append({"fold": fold, "train_idx": train_idx.astype(int), "val_idx": val_idx.astype(int)})

    logger.info("======== [Step3] CV split finished ========")
    return splits

y_all = df_m["label"].to_numpy()
splits = make_5fold_cv_splits_segment_stratified(y=y_all, df_meta=df_m, n_folds=N_FOLDS, seed=SEED)
logger.info(f"[Step3] splits prepared: {len(splits)} folds")


In [None]:
# Step4：我用雙輸入模型訓練並完整記錄每個 epoch 的 Train/Val/Test 指標
# -----------------------------------------------------------------------------
# 這一段做的事
# - 用兩張圖（mel + stft）當作兩個輸入
# - 兩個輸入共用同一個 EfficientNetB0 backbone
# - 把圖片讀成灰階再 tile 成 3 通道，避免模型去學顏色映射造成的假紋理
# - 用 class_weight 來處理類別不平衡，避免 batch 分布被硬扭成 50/50
# - 每個 epoch 結束時，用 val 掃 threshold 找 F1 最佳的 thr，再把同一個 thr 套用到 train/test
# - 我用 val_auroc_prob 來挑 best epoch，先把機率排序能力救起來，再談 F1
# =============================================================================

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score, confusion_matrix
from tensorflow.keras.applications.efficientnet import EfficientNetB0, preprocess_input as effnet_preprocess


# -----------------------------------------------------------------------------
# 統一讀圖流程：png -> 灰階 -> resize -> 轉 float -> tile 三通道 -> EfficientNet 預處理
# -----------------------------------------------------------------------------
def _read_png_gray_as_3ch_preprocessed(path):
    img = tf.io.read_file(path)
    img = tf.image.decode_png(img, channels=1)
    img = tf.image.resize(img, [IMG_H, IMG_W])
    img = tf.cast(img, tf.float32)
    img3 = tf.tile(img, [1, 1, 3])
    img3 = effnet_preprocess(img3)
    return img3


# -----------------------------------------------------------------------------
# 只在訓練資料上做輕量增強，讓模型對亮度對比變化更穩
# -----------------------------------------------------------------------------
def _augment_img(img3):
    img3 = tf.image.random_brightness(img3, max_delta=0.08)
    img3 = tf.image.random_contrast(img3, lower=0.9, upper=1.1)
    return img3


# -----------------------------------------------------------------------------
# 把 dataset 製作包成一個函式，方便不同 split 直接重用
# training=True 時我會做增強，shuffle=True 時我會洗牌
# -----------------------------------------------------------------------------
def make_dataset(df_meta, idx, mel_dir, stft_dir, batch_size=32, training=False, shuffle=False, seed=42):
    wav_names = df_meta.iloc[idx]["wav_name"].astype(str).tolist()
    y = df_meta.iloc[idx]["label"].astype(int).to_numpy()

    mel_paths = [str(Path(mel_dir) / f"{w}.png") for w in wav_names]
    stft_paths = [str(Path(stft_dir) / f"{w}.png") for w in wav_names]

    ds = tf.data.Dataset.from_tensor_slices((mel_paths, stft_paths, y))

    def _map(mel_p, stft_p, label):
        mel = _read_png_gray_as_3ch_preprocessed(mel_p)
        stft = _read_png_gray_as_3ch_preprocessed(stft_p)
        if training:
            mel = _augment_img(mel)
            stft = _augment_img(stft)
        return ({"mel": mel, "stft": stft}, tf.cast(label, tf.float32))

    ds = ds.map(_map, num_parallel_calls=tf.data.AUTOTUNE)
    if shuffle:
        ds = ds.shuffle(buffer_size=len(idx), seed=seed, reshuffle_each_iteration=True)
    ds = ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return ds


# -----------------------------------------------------------------------------
# 建雙輸入模型：兩支輸入共用一個 EfficientNetB0 backbone
# 我用 GAP 把特徵變成向量，再串接，最後接一個小 head 做二分類
# -----------------------------------------------------------------------------
def build_dual_shared_effnet(l2=1e-4, dropout=0.35, head_units=64):
    mel_in  = keras.Input(shape=(IMG_H, IMG_W, 3), name="mel")
    stft_in = keras.Input(shape=(IMG_H, IMG_W, 3), name="stft")

    backbone = EfficientNetB0(include_top=False, weights="imagenet", input_shape=(IMG_H, IMG_W, 3))
    backbone.trainable = False

    z_mel  = backbone(mel_in, training=False)
    z_stft = backbone(stft_in, training=False)

    z_mel  = layers.GlobalAveragePooling2D()(z_mel)
    z_stft = layers.GlobalAveragePooling2D()(z_stft)

    z = layers.Concatenate()([z_mel, z_stft])
    z = layers.Dense(head_units, activation="relu", kernel_regularizer=keras.regularizers.l2(l2))(z)
    z = layers.Dropout(dropout)(z)
    out = layers.Dense(1, activation="sigmoid", name="prob")(z)

    model = keras.Model(inputs={"mel": mel_in, "stft": stft_in}, outputs=out, name="dual_effnet_shared")
    return model


# -----------------------------------------------------------------------------
# 用這個函式控制 backbone 的可訓練狀態
# warmup：整個 backbone 凍結
# finetune：只解凍最後一段層，並把 BatchNorm 固定住，通常比較穩
# -----------------------------------------------------------------------------
def set_backbone_trainable(model, trainable: bool, unfreeze_ratio: float = 0.2):
    backbone = None
    for layer in model.layers:
        if isinstance(layer, keras.Model) and layer.name.startswith("efficientnet"):
            backbone = layer
            break
    if backbone is None:
        for layer in model.layers:
            if isinstance(layer, keras.Model) and "efficientnet" in layer.name:
                backbone = layer
                break
    if backbone is None:
        raise RuntimeError("Cannot find EfficientNet backbone inside the model.")

    if not trainable:
        backbone.trainable = False
        return

    backbone.trainable = True
    layers_all = backbone.layers
    n = len(layers_all)
    cut = int((1.0 - float(unfreeze_ratio)) * n)

    for i, lyr in enumerate(layers_all):
        if i < cut:
            lyr.trainable = False
        else:
            if isinstance(lyr, layers.BatchNormalization):
                lyr.trainable = False
            else:
                lyr.trainable = True


# -----------------------------------------------------------------------------
# 在驗證集上掃 threshold，找到讓 F1 最大的 cut point
# -----------------------------------------------------------------------------
def sweep_best_threshold(y_true, y_prob, steps=201):
    y_true = np.asarray(y_true).astype(int)
    y_prob = np.asarray(y_prob).astype(float)

    thr_list = np.linspace(0.0, 1.0, steps)
    best = {"thr": 0.5, "f1": -1.0, "p": 0.0, "r": 0.0}
    eps = 1e-9

    for thr in thr_list:
        y_pred = (y_prob >= thr).astype(int)
        tp = int(((y_pred == 1) & (y_true == 1)).sum())
        fp = int(((y_pred == 1) & (y_true == 0)).sum())
        fn = int(((y_pred == 0) & (y_true == 1)).sum())

        p = tp / (tp + fp + eps)
        r = tp / (tp + fn + eps)
        f1 = 2 * p * r / (p + r + eps)

        if f1 > best["f1"]:
            best = {"thr": float(thr), "f1": float(f1), "p": float(p), "r": float(r)}
    return best["thr"], best["f1"], best["p"], best["r"]


# -----------------------------------------------------------------------------
# 把 model.predict 的輸出整理成 y_true 與 y_prob，方便後面統一算指標
# -----------------------------------------------------------------------------
def eval_probs(model, ds):
    y_true_all, y_prob_all = [], []
    for (x, y) in ds:
        prob = model.predict(x, verbose=0).reshape(-1)
        y_prob_all.append(prob)
        y_true_all.append(y.numpy().reshape(-1))
    y_true = np.concatenate(y_true_all).astype(int)
    y_prob = np.concatenate(y_prob_all).astype(float)
    return y_true, y_prob


# -----------------------------------------------------------------------------
# 用 best threshold 把機率轉成預測，再算 acc/precision/recall/f1/auroc 與混淆矩陣元素
# -----------------------------------------------------------------------------
def compute_metrics_from_probs(y_true, y_prob, thr):
    y_true = np.asarray(y_true).astype(int)
    y_prob = np.asarray(y_prob).astype(float)
    y_pred = (y_prob >= thr).astype(int)

    eps = 1e-9
    tp = int(((y_pred == 1) & (y_true == 1)).sum())
    tn = int(((y_pred == 0) & (y_true == 0)).sum())
    fp = int(((y_pred == 1) & (y_true == 0)).sum())
    fn = int(((y_pred == 0) & (y_true == 1)).sum())

    acc = (tp + tn) / (tp + tn + fp + fn + eps)
    precision = tp / (tp + fp + eps)
    recall = tp / (tp + fn + eps)
    f1 = 2 * precision * recall / (precision + recall + eps)

    try:
        auroc = float(roc_auc_score(y_true, y_prob))
    except Exception:
        auroc = float("nan")

    return {
        "acc_bt": float(acc),
        "precision_bt": float(precision),
        "recall_bt": float(recall),
        "f1_bt": float(f1),
        "auroc_prob": float(auroc),
        "thr": float(thr),
        "tp": tp, "tn": tn, "fp": fp, "fn": fn
    }


# -----------------------------------------------------------------------------
# 做一個 callback，每個 epoch 結束時都跑 train/val/test 的完整評估
# 會把當下的結果塞進 out_rows，最後用它輸出成 per-epoch CSV
# best epoch 的判斷我用 val_auroc_prob 最大化
# -----------------------------------------------------------------------------
class EpochFullEvalCallback(keras.callbacks.Callback):
    def __init__(self, fold, train_eval_ds, val_eval_ds, test_eval_ds, out_rows,
                 thr_steps=201, best_monitor="val_auroc_prob"):
        super().__init__()
        self.fold = fold
        self.train_eval_ds = train_eval_ds
        self.val_eval_ds = val_eval_ds
        self.test_eval_ds = test_eval_ds
        self.out_rows = out_rows
        self.thr_steps = thr_steps
        self.best_monitor = best_monitor
        self.best_seen = -1e9
        self.best_epoch = 0
        self.best_snapshot = None

    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        ep = int(epoch + 1)

        yv_true, yv_prob = eval_probs(self.model, self.val_eval_ds)
        best_thr, best_f1, best_p, best_r = sweep_best_threshold(yv_true, yv_prob, steps=self.thr_steps)

        yt_true, yt_prob = eval_probs(self.model, self.train_eval_ds)
        yte_true, yte_prob = eval_probs(self.model, self.test_eval_ds)

        m_train = compute_metrics_from_probs(yt_true, yt_prob, best_thr)
        m_val   = compute_metrics_from_probs(yv_true, yv_prob, best_thr)
        m_test  = compute_metrics_from_probs(yte_true, yte_prob, best_thr)

        row = {"fold": self.fold, "epoch": ep}

        for k, v in logs.items():
            try:
                row[k] = float(v)
            except Exception:
                pass

        row.update({
            "best_thr_on_val": float(best_thr),
            "val_best_f1": float(best_f1),
            "val_best_p": float(best_p),
            "val_best_r": float(best_r),
        })

        for prefix, m in [("train", m_train), ("val", m_val), ("test", m_test)]:
            for k, v in m.items():
                row[f"{prefix}_{k}"] = v

        self.out_rows.append(row)

        cur = row.get(self.best_monitor, None)
        if cur is not None and np.isfinite(cur) and cur > self.best_seen:
            self.best_seen = float(cur)
            self.best_epoch = ep
            self.best_snapshot = dict(row)

        print(
            f"[Fold {self.fold}][Ep {ep:03d}] "
            f"VAL(AUROC_prob={row.get('val_auroc_prob', float('nan')):.4f}, F1_bt={row.get('val_f1_bt', float('nan')):.4f}, thr={best_thr:.3f}) | "
            f"TEST(AUROC_prob={row.get('test_auroc_prob', float('nan')):.4f}, F1_bt={row.get('test_f1_bt', float('nan')):.4f})"
        )


# -----------------------------------------------------------------------------
# 曲線圖與混淆矩陣輸出包成函式，讓每折做完直接存檔
# -----------------------------------------------------------------------------
def _safe_float(x):
    try:
        return float(x)
    except Exception:
        return float("nan")

def plot_curve(df_ep: pd.DataFrame, fold: int, metric: str, out_png: Path):
    x = df_ep["epoch"].astype(int).to_numpy()
    y_tr = df_ep[metric].apply(_safe_float).to_numpy() if metric in df_ep.columns else None
    y_va = df_ep[f"val_{metric}"].apply(_safe_float).to_numpy() if f"val_{metric}" in df_ep.columns else None

    plt.figure()
    if y_tr is not None:
        plt.plot(x, y_tr, label=f"train_{metric}")
    if y_va is not None:
        plt.plot(x, y_va, label=f"val_{metric}")
    plt.title(f"Fold {fold} {metric}")
    plt.xlabel("epoch")
    plt.ylabel(metric)
    if metric in ("acc", "auroc"):
        plt.ylim(0.0, 1.0)
    plt.legend()
    plt.tight_layout()
    plt.savefig(out_png, dpi=140)
    plt.close()

def save_confusion_matrix_png(y_true: np.ndarray, y_prob: np.ndarray, thr: float, out_png: Path, fold: int):
    y_true = np.asarray(y_true).astype(int)
    y_pred = (np.asarray(y_prob) >= float(thr)).astype(int)
    cm = confusion_matrix(y_true, y_pred, labels=[0, 1])

    plt.figure()
    plt.imshow(cm, interpolation="nearest")
    plt.title(f"Fold {fold} Confusion Matrix (thr={thr:.3f})")
    plt.xlabel("Pred")
    plt.ylabel("True")
    for (i, j), v in np.ndenumerate(cm):
        plt.text(j, i, str(v), ha="center", va="center")
    plt.xticks([0, 1], ["0", "1"])
    plt.yticks([0, 1], ["0", "1"])
    plt.tight_layout()
    plt.savefig(out_png, dpi=160)
    plt.close()


# =============================================================================
# 開始跑五折
# -----------------------------------------------------------------------------
# 把 Step3 的 val_idx 當作外層測試集 test
# 然後再從 train_idx 裡切一個內層 val，用來 early stopping 與挑 best epoch
# =============================================================================

all_best_rows = []

for sp in splits:
    fold = sp["fold"]
    train_idx_full = sp["train_idx"]
    test_idx = sp["val_idx"]

    y_train_full = df_m.iloc[train_idx_full]["label"].astype(int).to_numpy()
    sss = StratifiedShuffleSplit(n_splits=1, test_size=VAL_RATIO_IN_TRAIN, random_state=SEED + fold)
    tr_sub, va_sub = next(sss.split(np.zeros(len(train_idx_full)), y_train_full))
    tr_idx = train_idx_full[tr_sub]
    val_idx = train_idx_full[va_sub]

    tr_pos, tr_neg = count_pos_neg(df_m.iloc[tr_idx]["label"].values)
    va_pos, va_neg = count_pos_neg(df_m.iloc[val_idx]["label"].values)
    te_pos, te_neg = count_pos_neg(df_m.iloc[test_idx]["label"].values)

    logger.info("------------------------------------------------------------")
    logger.info(f"[Step4][Fold {fold}] split for Step4 (train/val/test)")
    logger.info(f"[Fold {fold}] TRAIN seg: pos={tr_pos}, neg={tr_neg}, pos_rate={pos_rate(tr_pos, tr_neg):.4f}")
    logger.info(f"[Fold {fold}] VAL   seg: pos={va_pos}, neg={va_neg}, pos_rate={pos_rate(va_pos, va_neg):.4f}")
    logger.info(f"[Fold {fold}] TEST  seg: pos={te_pos}, neg={te_neg}, pos_rate={pos_rate(te_pos, te_neg):.4f}")

    train_ds = make_dataset(df_m, tr_idx,  MEL_PIC_DIR, STFT_PIC_DIR, batch_size=BATCH_SIZE,
                            training=True, shuffle=True, seed=SEED + fold)
    val_ds   = make_dataset(df_m, val_idx, MEL_PIC_DIR, STFT_PIC_DIR, batch_size=BATCH_SIZE,
                            training=False, shuffle=False)
    test_ds  = make_dataset(df_m, test_idx, MEL_PIC_DIR, STFT_PIC_DIR, batch_size=BATCH_SIZE,
                            training=False, shuffle=False)

    train_eval_ds = make_dataset(df_m, tr_idx,  MEL_PIC_DIR, STFT_PIC_DIR, batch_size=BATCH_SIZE,
                                 training=False, shuffle=False)

    # 我用 class_weight 來提升少數類的損失權重
    # 這份資料 pos 比較多，所以我會提高類別 0 的權重，逼模型不要一直偏向猜 1
    w0 = float(tr_pos / max(tr_neg, 1))
    w1 = 1.0
    class_weight = {0: w0, 1: w1}
    logger.info(f"[Fold {fold}] class_weight={class_weight}")

    tf.keras.backend.clear_session()
    model = build_dual_shared_effnet(l2=WEIGHT_DECAY_L2, dropout=DROPOUT, head_units=HEAD_UNITS)

    epoch_rows = []
    cb_full = EpochFullEvalCallback(
        fold=fold,
        train_eval_ds=train_eval_ds,
        val_eval_ds=val_ds,
        test_eval_ds=test_ds,
        out_rows=epoch_rows,
        thr_steps=THR_SWEEP_STEPS,
        best_monitor="val_auroc_prob"
    )

    cb_es = keras.callbacks.EarlyStopping(
        monitor="val_auroc", mode="max",
        patience=PATIENCE, restore_best_weights=True, verbose=1
    )

    # 我先做 warmup：backbone 全凍結，只訓練 head
    set_backbone_trainable(model, trainable=False)
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=LR_WARMUP),
        loss=keras.losses.BinaryCrossentropy(from_logits=False, label_smoothing=0.05),
        metrics=[
            keras.metrics.BinaryAccuracy(name="acc"),
            keras.metrics.AUC(name="auroc"),
            keras.metrics.Precision(name="precision"),
            keras.metrics.Recall(name="recall"),
        ]
    )
    logger.info(f"[Fold {fold}] Stage1 warmup start: epochs={EPOCHS_WARMUP}, LR={LR_WARMUP}, backbone frozen")

    model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=EPOCHS_WARMUP,
        callbacks=[cb_full, cb_es],
        class_weight=class_weight,
        verbose=1
    )

    # 我再做 finetune：只解凍 backbone 最後一段，BatchNorm 固定不動
    set_backbone_trainable(model, trainable=True, unfreeze_ratio=0.2)
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=LR_FINETUNE),
        loss=keras.losses.BinaryCrossentropy(from_logits=False, label_smoothing=0.05),
        metrics=[
            keras.metrics.BinaryAccuracy(name="acc"),
            keras.metrics.AUC(name="auroc"),
            keras.metrics.Precision(name="precision"),
            keras.metrics.Recall(name="recall"),
        ]
    )
    logger.info(f"[Fold {fold}] Stage2 finetune start: epochs={EPOCHS_FINETUNE}, LR={LR_FINETUNE}, unfreeze_ratio=0.2")

    model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=EPOCHS_WARMUP + EPOCHS_FINETUNE,
        initial_epoch=EPOCHS_WARMUP,
        callbacks=[cb_full, cb_es],
        class_weight=class_weight,
        verbose=1
    )

    df_ep = pd.DataFrame(epoch_rows)
    ep_csv = CSV_OUT_DIR / f"fold{fold:02d}_epoch_metrics.csv"
    df_ep.to_csv(ep_csv, index=False, encoding="utf-8")
    logger.info(f"[Fold {fold}] saved per-epoch metrics: {ep_csv}")

    # 我用 val_auroc_prob 最大的那個 epoch 當 best
    if cb_full.best_snapshot is None:
        best_idx = pd.to_numeric(df_ep["val_auroc_prob"], errors="coerce").idxmax()
        best_row = df_ep.loc[best_idx].to_dict()
        best_epoch = int(best_row.get("epoch", -1))
    else:
        best_row = dict(cb_full.best_snapshot)
        best_epoch = int(cb_full.best_epoch)

    best_row["best_by"] = "val_auroc_prob"
    best_row["best_epoch"] = best_epoch

    best_csv = CSV_OUT_DIR / f"fold{fold:02d}_best_metrics.csv"
    pd.DataFrame([best_row]).to_csv(best_csv, index=False, encoding="utf-8")
    logger.info(f"[Fold {fold}] saved best metrics: {best_csv}")

    # 把最佳權重存起來，方便之後做重現或 Grad-CAM
    w_path = OUT_DIR / f"fold{fold:02d}_best_by_valauroc_epoch{best_epoch:03d}.weights.h5"
    model.save_weights(w_path)
    logger.info(f"[Fold {fold}] saved weights: {w_path}")

    plot_curve(df_ep, fold, "loss",  PLOT_DIR / f"fold{fold:02d}_loss.png")
    plot_curve(df_ep, fold, "acc",   PLOT_DIR / f"fold{fold:02d}_acc.png")
    plot_curve(df_ep, fold, "auroc", PLOT_DIR / f"fold{fold:02d}_auroc.png")
    logger.info(f"[Fold {fold}] saved curves: {PLOT_DIR}/fold{fold:02d}_(loss|acc|auroc).png")

    # 用驗證集找到的 best threshold，回到 test 產混淆矩陣
    best_thr = float(best_row.get("best_thr_on_val", 0.5))
    model.load_weights(w_path)

    yte_true, yte_prob = eval_probs(model, test_ds)
    cm_png = CM_DIR / f"fold{fold:02d}_cm_best.png"
    save_confusion_matrix_png(yte_true, yte_prob, best_thr, cm_png, fold)
    logger.info(f"[Fold {fold}] saved confusion matrix: {cm_png}")

    all_best_rows.append(best_row)


# =============================================================================
# 五折彙總：把每折 best 指標合併，並算 mean/std
# =============================================================================

df_best = pd.DataFrame(all_best_rows)

num_cols = []
for c in df_best.columns:
    if c in ["fold", "epoch", "best_epoch"]:
        continue
    s = pd.to_numeric(df_best[c], errors="coerce")
    if s.notna().sum() > 0:
        num_cols.append(c)

mean_row = {"fold": "mean"}
std_row  = {"fold": "std"}
for c in num_cols:
    vals = pd.to_numeric(df_best[c], errors="coerce")
    mean_row[c] = float(np.nanmean(vals))
    std_row[c]  = float(np.nanstd(vals))

df_sum = pd.concat([df_best, pd.DataFrame([mean_row, std_row])], ignore_index=True)

summary_csv = CSV_OUT_DIR / "cv_best_summary.csv"
df_sum.to_csv(summary_csv, index=False, encoding="utf-8")
logger.info(f"[CV] saved summary: {summary_csv}")

print("\n========== CV BEST SUMMARY (tail) ==========")
print(df_sum.tail(7))
print("\n[Done] Outputs:")
print(f"- Log: {LOG_PATH}")
print(f"- Per-epoch CSV: {CSV_OUT_DIR}/fold**_epoch_metrics.csv")
print(f"- Per-fold best CSV: {CSV_OUT_DIR}/fold**_best_metrics.csv")
print(f"- CV summary CSV: {summary_csv}")
print(f"- Curves: {PLOT_DIR}/fold**_(loss|acc|auroc).png")
print(f"- Confusion Matrix: {CM_DIR}/fold**_cm_best.png")


In [45]:
import pandas as pd
import numpy as np

def summarize_fold_ratios(df_m, splits):
    y_all = df_m["label"].to_numpy().astype(int)

    rows = []
    for s in splits:
        fold = s["fold"]
        tr = s["train_idx"]
        va = s["val_idx"]

        tr_pos = int((y_all[tr] == 1).sum())
        tr_neg = int((y_all[tr] == 0).sum())
        tr_rate = tr_pos / (tr_pos + tr_neg) if (tr_pos + tr_neg) > 0 else np.nan

        va_pos = int((y_all[va] == 1).sum())
        va_neg = int((y_all[va] == 0).sum())
        va_rate = va_pos / (va_pos + va_neg) if (va_pos + va_neg) > 0 else np.nan

        rows.append({
            "fold": fold,
            "train_pos": tr_pos,
            "train_neg": tr_neg,
            "train_pos_rate": round(tr_rate, 4),
            "train_pos:neg": f"{tr_pos}:{tr_neg}",
            "val_pos": va_pos,
            "val_neg": va_neg,
            "val_pos_rate": round(va_rate, 4),
            "val_pos:neg": f"{va_pos}:{va_neg}",
        })

    out = pd.DataFrame(rows).sort_values("fold").reset_index(drop=True)
    return out

tbl = summarize_fold_ratios(df_m, splits)
tbl


Unnamed: 0,fold,train_pos,train_neg,train_pos_rate,train_pos:neg,val_pos,val_neg,val_pos_rate,val_pos:neg
0,1,200,130,0.6061,200:130,60,40,0.6,60:40
1,2,210,130,0.6176,210:130,50,40,0.5556,50:40
2,3,210,140,0.6,210:140,50,30,0.625,50:30
3,4,210,140,0.6,210:140,50,30,0.625,50:30
4,5,210,140,0.6,210:140,50,30,0.625,50:30


In [None]:
# explain_cam.py  (Keras 3 compatible / Restricted to 5 yellow weights)
# =============================================================================
# 只跑指定的 5 個 best_by_valauroc 權重檔，避免誤讀到其他版本的權重造成結果混亂
# Grad-CAM 分別對 mel / stft 產熱區，再做 ΔCAM 方便看兩個分支的注意力差異
# Keras 3 下 load_weights 不支援 by_name，權重必須完全對齊，模型結構因此刻意保持訓練同款
#
# 依賴：
# - /home/jovyan/114-1_HA/data/manifest_S_only.csv
# - /home/jovyan/114-1_HA/data/pic_v4/mel/*.png
# - /home/jovyan/114-1_HA/data/pic_v4/stft/*.png
# - /home/jovyan/114-1_HA/data/out/csv/fold**_best_metrics.csv
# - /home/jovyan/114-1_HA/data/out/fold**_best_by_valauroc_epochXXX.weights.h5
#
# 輸出：
# - /home/jovyan/114-1_HA/data/out/explain/cam/fold**/*__mel_cam.png
# - /home/jovyan/114-1_HA/data/out/explain/cam/fold**/*__stft_cam.png
# - /home/jovyan/114-1_HA/data/out/explain/cam/fold**/*__dcam.png
# - /home/jovyan/114-1_HA/data/out/explain/cam/cam_index.csv
# - /home/jovyan/114-1_HA/log_explain/explain_cam_*.log
# =============================================================================

import re, time, logging
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

from sklearn.metrics import confusion_matrix
from sklearn.model_selection import StratifiedKFold

from tensorflow.keras.applications.efficientnet import EfficientNetB0, preprocess_input as effnet_preprocess


# =============================================================================
# 0) 全域設定
# =============================================================================
BASE_DIR = Path("/home/jovyan/114-1_HA")
DATA_DIR = BASE_DIR / "data"

MANIFEST_PATH = DATA_DIR / "manifest_S_only.csv"
PIC_DIR       = DATA_DIR / "pic_v4"
MEL_DIR       = PIC_DIR / "mel"
STFT_DIR      = PIC_DIR / "stft"

OUT_DIR       = DATA_DIR / "out"
BEST_CSV_DIR  = OUT_DIR / "csv"

EXPL_DIR      = OUT_DIR / "explain" / "cam"
EXPL_DIR.mkdir(parents=True, exist_ok=True)

IMG_H, IMG_W = 224, 224

# 指定的 5 個權重檔名，檔名固定才能精準對到每折的最佳 epoch
ALLOWED_WEIGHTS = [
    "fold01_best_by_valauroc_epoch031.weights.h5",
    "fold02_best_by_valauroc_epoch079.weights.h5",
    "fold03_best_by_valauroc_epoch080.weights.h5",
    "fold04_best_by_valauroc_epoch080.weights.h5",
    "fold05_best_by_valauroc_epoch080.weights.h5",
]

# 每種 TP/TN/FP/FN 各抽幾張
# 常見現象：模型偏猜 1 時，TN/FN 可能幾乎消失，抽樣會直接抽不到
K_PER_CASE = 10

DEFAULT_THR = 0.5

# 抽不到某類時用機率排序補齊，避免整折沒有圖可看
# 小插曲：很多時候反而是這個補齊機制最先暴露校準問題
FALLBACK_PROB_RANKING = True

INFER_LOG_EVERY  = 25
RENDER_LOG_EVERY = 5

SEED = 42
rng = np.random.default_rng(SEED)


# =============================================================================
# logger（主控台 + 檔案）
# =============================================================================
LOG_DIR = BASE_DIR / "log_explain"
LOG_DIR.mkdir(parents=True, exist_ok=True)
RUN_TAG = time.strftime("%Y%m%d_%H%M%S")
LOG_PATH = LOG_DIR / f"explain_cam_{RUN_TAG}.log"

def build_logger(log_path: Path):
    logger = logging.getLogger(log_path.stem)
    logger.setLevel(logging.INFO)
    logger.propagate = False
    while logger.handlers:
        logger.handlers.pop()

    fmt = logging.Formatter(
        "%(asctime)s | %(levelname)s | %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S"
    )

    fh = logging.FileHandler(log_path, mode="w", encoding="utf-8")
    fh.setFormatter(fmt)
    logger.addHandler(fh)

    sh = logging.StreamHandler()
    sh.setFormatter(fmt)
    logger.addHandler(sh)
    return logger

logger = build_logger(LOG_PATH)
logger.info("==== Start explain_cam.py (restricted 5 weights / Keras3) ====")
logger.info(f"LOG_PATH={LOG_PATH}")
logger.info(f"MANIFEST_PATH={MANIFEST_PATH}")
logger.info(f"MEL_DIR={MEL_DIR}")
logger.info(f"STFT_DIR={STFT_DIR}")
logger.info(f"BEST_CSV_DIR={BEST_CSV_DIR}")
logger.info(f"EXPL_DIR={EXPL_DIR}")
logger.info("==== Allowed weights (ONLY these 5) ====")
for w in ALLOWED_WEIGHTS:
    logger.info(f"  - {OUT_DIR / w}")


# =============================================================================
# 1) 與訓練一致的讀圖（灰階 -> tile -> EfficientNet preprocess）
# =============================================================================
def read_png_gray_as_3ch_preprocessed(path: tf.Tensor) -> tf.Tensor:
    img = tf.io.read_file(path)
    img = tf.image.decode_png(img, channels=1)
    img = tf.image.resize(img, [IMG_H, IMG_W])
    img = tf.cast(img, tf.float32)          # 0..255
    img3 = tf.tile(img, [1, 1, 3])          # 1ch -> 3ch
    img3 = effnet_preprocess(img3)
    return img3

def make_single_input(mel_png: str, stft_png: str):
    mel  = read_png_gray_as_3ch_preprocessed(tf.constant(mel_png))
    stft = read_png_gray_as_3ch_preprocessed(tf.constant(stft_png))
    x = {"mel": tf.expand_dims(mel, 0), "stft": tf.expand_dims(stft, 0)}
    return x, mel, stft


# =============================================================================
# 2) 訓練同款模型（避免 weights 對不上）
# =============================================================================
def build_dual_shared_effnet_train_like(l2=1e-4, dropout=0.35, head_units=64):
    mel_in  = keras.Input(shape=(IMG_H, IMG_W, 3), name="mel")
    stft_in = keras.Input(shape=(IMG_H, IMG_W, 3), name="stft")

    backbone = EfficientNetB0(
        include_top=False,
        weights="imagenet",
        input_shape=(IMG_H, IMG_W, 3)
    )
    backbone.trainable = False

    z_mel  = backbone(mel_in, training=False)
    z_stft = backbone(stft_in, training=False)

    # 兩個 GAP 的 input 直接當 CAM 的 feature map
    # 小發現：拿 GAP 的 output 做 CAM 會完全變成常數向量，熱圖會像是在敷衍
    z_mel_gap  = layers.GlobalAveragePooling2D(name="mel_gap")(z_mel)
    z_stft_gap = layers.GlobalAveragePooling2D(name="stft_gap")(z_stft)

    z = layers.Concatenate(name="concat")([z_mel_gap, z_stft_gap])
    z = layers.Dense(
        head_units,
        activation="relu",
        kernel_regularizer=keras.regularizers.l2(l2),
        name="head_dense"
    )(z)
    z = layers.Dropout(dropout, name="head_dropout")(z)
    out = layers.Dense(1, activation="sigmoid", name="prob")(z)

    model = keras.Model(
        inputs={"mel": mel_in, "stft": stft_in},
        outputs=out,
        name="dual_effnet_shared"
    )
    return model

def set_backbone_trainable(model, trainable: bool, unfreeze_ratio: float = 0.2):
    # 結構對齊用途，推論階段其實凍結也能產 CAM
    backbone = None
    for layer in model.layers:
        if isinstance(layer, keras.Model) and ("efficientnet" in layer.name):
            backbone = layer
            break
    if backbone is None:
        raise RuntimeError("Cannot find EfficientNet backbone inside the model.")

    if not trainable:
        backbone.trainable = False
        return

    backbone.trainable = True
    layers_all = backbone.layers
    n = len(layers_all)
    cut = int((1.0 - float(unfreeze_ratio)) * n)

    for i, lyr in enumerate(layers_all):
        if i < cut:
            lyr.trainable = False
        else:
            if isinstance(lyr, layers.BatchNormalization):
                lyr.trainable = False
            else:
                lyr.trainable = True


# =============================================================================
# 3) CAM models（抓 mel_gap/stft_gap 的 input 當 feature map）
# =============================================================================
def build_cam_models(model: keras.Model):
    mel_feat  = model.get_layer("mel_gap").input
    stft_feat = model.get_layer("stft_gap").input
    prob = model.output

    cam_model_mel  = keras.Model(inputs=model.inputs, outputs=[mel_feat, prob],  name="cam_model_mel")
    cam_model_stft = keras.Model(inputs=model.inputs, outputs=[stft_feat, prob], name="cam_model_stft")

    logger.info(f"[CAM] mel_feat shape  = {mel_feat.shape}")
    logger.info(f"[CAM] stft_feat shape = {stft_feat.shape}")
    return cam_model_mel, cam_model_stft


# =============================================================================
# 4) Grad-CAM
# =============================================================================
def grad_cam(cam_model: keras.Model, x_dict: dict) -> (np.ndarray, float):
    # cam_model outputs: [feature_map, prob]
    # returns: cam(224,224) 0..1, prob float
    with tf.GradientTape() as tape:
        feat, prob = cam_model(x_dict, training=False)
        prob_scalar = prob[:, 0]
        tape.watch(feat)

    grads = tape.gradient(prob_scalar, feat)
    if grads is None:
        # 稀有但致命：通常是 feature map 與 prob 不在同一張圖
        raise RuntimeError("Gradients are None (graph disconnected).")

    weights = tf.reduce_mean(grads, axis=(1, 2))
    cam = tf.reduce_sum(feat * weights[:, None, None, :], axis=-1)
    cam = tf.nn.relu(cam)

    cam = cam[0]
    cam = cam / (tf.reduce_max(cam) + 1e-9)

    cam = tf.image.resize(cam[..., None], [IMG_H, IMG_W])
    cam = tf.squeeze(cam, axis=-1).numpy()
    return cam.astype(np.float32), float(prob_scalar.numpy()[0])

def overlay_cam_on_image(img3: tf.Tensor, cam01: np.ndarray, alpha=0.45):
    base = img3.numpy().astype(np.float32)
    base = base - base.min()
    base = base / (base.max() + 1e-9)

    plt.figure(figsize=(4.2, 4.2))
    plt.imshow(base)
    plt.imshow(cam01, alpha=alpha)
    plt.axis("off")
    return plt.gcf()


# =============================================================================
# 5) 資料與推論工具
# =============================================================================
def load_manifest():
    if not MANIFEST_PATH.exists():
        raise FileNotFoundError(f"manifest not found: {MANIFEST_PATH}")
    dfm = pd.read_csv(MANIFEST_PATH)
    need = {"wav_name", "patient_id", "label"}
    miss = need - set(dfm.columns)
    if miss:
        raise ValueError(f"manifest missing cols: {miss}, cols={list(dfm.columns)}")
    return dfm

def rebuild_fold_test_indices(dfm: pd.DataFrame):
    # Segment-level StratifiedKFold，shuffle=True + random_state=42
    y_all = dfm["label"].astype(int).to_numpy()
    all_idx = np.arange(len(y_all), dtype=int)
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    fold_to_test = {}
    for f, (_, va) in enumerate(skf.split(all_idx, y_all), start=1):
        fold_to_test[f] = va.astype(int)
    return fold_to_test

def infer_probs_for_indices(model, dfm: pd.DataFrame, idx: np.ndarray):
    y_true = dfm.iloc[idx]["label"].astype(int).to_numpy()
    y_prob = np.zeros(len(idx), dtype=np.float32)

    n = len(idx)
    for i, ridx in enumerate(idx):
        if (i + 1) % INFER_LOG_EVERY == 0 or (i + 1) == n:
            logger.info(f"  [infer] {i+1}/{n} samples")

        wav = str(dfm.iloc[ridx]["wav_name"])
        mel_png  = MEL_DIR / f"{wav}.png"
        stft_png = STFT_DIR / f"{wav}.png"
        if (not mel_png.exists()) or (not stft_png.exists()):
            y_prob[i] = np.nan
            continue

        x, _, _ = make_single_input(str(mel_png), str(stft_png))
        p = model.predict(x, verbose=0).reshape(-1)[0]
        y_prob[i] = float(p)

    return y_true, y_prob


# =============================================================================
# 6) 抽樣策略
# =============================================================================
def pick_cases_by_cm(idx_all, y_true, y_pred, case_type: str, k: int):
    if case_type == "TP":
        mask = (y_true == 1) & (y_pred == 1)
    elif case_type == "TN":
        mask = (y_true == 0) & (y_pred == 0)
    elif case_type == "FP":
        mask = (y_true == 0) & (y_pred == 1)
    elif case_type == "FN":
        mask = (y_true == 1) & (y_pred == 0)
    else:
        raise ValueError(case_type)

    cand = idx_all[mask]
    if len(cand) == 0:
        return np.array([], dtype=int)
    if len(cand) <= k:
        return cand
    sel = rng.choice(cand, size=k, replace=False)
    return np.array(sel, dtype=int)

def pick_cases_by_prob_fallback(idx_all, y_true, y_prob, case_type: str, k: int):
    # 抽不到某類時的補齊策略：用機率排序抓極端案例
    # 小發現：TN-like 與 FN-like 常常會變成很相似的一群，代表模型幾乎只靠一個訊號在決策
    idx_all = np.asarray(idx_all).astype(int)
    y_true = np.asarray(y_true).astype(int)
    y_prob = np.asarray(y_prob).astype(float)

    if case_type in ("TN", "FP"):
        cand_mask = (y_true == 0)
    else:
        cand_mask = (y_true == 1)

    cand_idx = idx_all[cand_mask]
    cand_prob = y_prob[cand_mask]
    if len(cand_idx) == 0:
        return np.array([], dtype=int)

    if case_type in ("TN", "FN"):
        order = np.argsort(cand_prob)
    else:
        order = np.argsort(-cand_prob)

    sel = cand_idx[order[:min(k, len(order))]]
    return np.array(sel, dtype=int)


# =============================================================================
# 7) Main
# =============================================================================
def main():
    dfm = load_manifest()
    fold_to_test = rebuild_fold_test_indices(dfm)

    cam_index_rows = []

    for w_name in ALLOWED_WEIGHTS:
        w_path = OUT_DIR / w_name
        if not w_path.exists():
            raise FileNotFoundError(f"Allowed weight not found: {w_path}")

        m = re.match(r"fold(\d+)_best_by_valauroc_epoch(\d+)\.weights\.h5$", w_name)
        if not m:
            raise ValueError(f"Bad allowed weight filename format: {w_name}")

        fold = int(m.group(1))
        best_epoch_from_name = int(m.group(2))

        best_csv = BEST_CSV_DIR / f"fold{fold:02d}_best_metrics.csv"
        best_thr = DEFAULT_THR
        if best_csv.exists():
            d = pd.read_csv(best_csv).iloc[0].to_dict()
            try:
                best_thr = float(d.get("best_thr_on_val", DEFAULT_THR))
            except Exception:
                best_thr = DEFAULT_THR

        fold_out = EXPL_DIR / f"fold{fold:02d}"
        fold_out.mkdir(parents=True, exist_ok=True)

        logger.info("------------------------------------------------------------")
        logger.info(f"==== [Fold {fold:02d}] load weights (restricted): {w_path}")
        logger.info(f"[Fold {fold:02d}] epoch(from filename)={best_epoch_from_name}")
        logger.info(f"[Fold {fold:02d}] best_thr_on_val={best_thr:.3f}")
        logger.info(f"[Fold {fold:02d}] output_dir={fold_out}")

        tf.keras.backend.clear_session()
        model = build_dual_shared_effnet_train_like(l2=1e-4, dropout=0.35, head_units=64)
        set_backbone_trainable(model, trainable=True, unfreeze_ratio=0.2)
        model.load_weights(str(w_path))

        cam_model_mel, cam_model_stft = build_cam_models(model)

        test_idx = fold_to_test.get(fold, None)
        if test_idx is None:
            logger.info(f"[Fold {fold:02d}] test_idx not found, skip.")
            continue

        logger.info(f"[Fold {fold:02d}] infer test probs ... (n={len(test_idx)})")
        y_true, y_prob = infer_probs_for_indices(model, dfm, test_idx)

        ok = np.isfinite(y_prob)
        y_true_ok = y_true[ok]
        y_prob_ok = y_prob[ok]
        test_idx_ok = test_idx[ok]

        y_pred_ok = (y_prob_ok >= best_thr).astype(int)
        cm = confusion_matrix(y_true_ok, y_pred_ok, labels=[0, 1])
        logger.info(f"[Fold {fold:02d}] TEST cm=\n{cm}")

        cases = {}
        for ct in ["TP", "TN", "FP", "FN"]:
            sel = pick_cases_by_cm(test_idx_ok, y_true_ok, y_pred_ok, ct, K_PER_CASE)
            if len(sel) == 0 and FALLBACK_PROB_RANKING:
                sel = pick_cases_by_prob_fallback(test_idx_ok, y_true_ok, y_prob_ok, ct, K_PER_CASE)
                logger.info(f"[Fold {fold:02d}] {ct} cm-picked=0 -> fallback prob-ranking picked={len(sel)}")
            else:
                logger.info(f"[Fold {fold:02d}] {ct} picked: {len(sel)}")
            cases[ct] = sel

        for ct, sel_idx in cases.items():
            if len(sel_idx) == 0:
                continue
            logger.info(f"[Fold {fold:02d}] render {ct}: {len(sel_idx)} samples")

            for j, ridx in enumerate(sel_idx, start=1):
                if j % RENDER_LOG_EVERY == 0 or j == len(sel_idx):
                    logger.info(f"  [{ct}] {j}/{len(sel_idx)}")

                row = dfm.iloc[int(ridx)]
                wav = str(row["wav_name"])
                pid = int(row["patient_id"])
                y_t = int(row["label"])

                mel_png  = MEL_DIR / f"{wav}.png"
                stft_png = STFT_DIR / f"{wav}.png"
                if (not mel_png.exists()) or (not stft_png.exists()):
                    continue

                x, mel_img3, stft_img3 = make_single_input(str(mel_png), str(stft_png))

                cam_mel,  prob_mel  = grad_cam(cam_model_mel,  x)
                cam_stft, prob_stft = grad_cam(cam_model_stft, x)

                prob = prob_mel
                y_p = int(prob >= best_thr)

                dcam = cam_mel - cam_stft
                dcam = dcam - dcam.min()
                dcam = dcam / (dcam.max() + 1e-9)

                stem = (
                    f"fold{fold:02d}_{ct}_pid{pid}_"
                    f"{wav.replace('.wav','')}_yt{y_t}_yp{y_p}_"
                    f"p{prob:.3f}_thr{best_thr:.3f}"
                )

                fig = overlay_cam_on_image(mel_img3, cam_mel, alpha=0.45)
                out1 = fold_out / f"{stem}__mel_cam.png"
                fig.savefig(out1, dpi=180, bbox_inches="tight")
                plt.close(fig)

                fig = overlay_cam_on_image(stft_img3, cam_stft, alpha=0.45)
                out2 = fold_out / f"{stem}__stft_cam.png"
                fig.savefig(out2, dpi=180, bbox_inches="tight")
                plt.close(fig)

                plt.figure(figsize=(4.2, 4.2))
                plt.imshow(dcam)
                plt.title("ΔCAM (mel - stft)")
                plt.axis("off")
                out3 = fold_out / f"{stem}__dcam.png"
                plt.tight_layout()
                plt.savefig(out3, dpi=180, bbox_inches="tight")
                plt.close()

                cam_index_rows.append({
                    "fold": fold,
                    "case_type": ct,
                    "patient_id": pid,
                    "wav_name": wav,
                    "y_true": y_t,
                    "y_prob": float(prob),
                    "y_pred": int(y_p),
                    "thr": float(best_thr),
                    "weights_used": str(w_path),
                    "mel_cam_png": str(out1),
                    "stft_cam_png": str(out2),
                    "dcam_png": str(out3),
                })

        logger.info(f"[Fold {fold:02d}] done.")

    idx_csv = EXPL_DIR / "cam_index.csv"
    pd.DataFrame(cam_index_rows).to_csv(idx_csv, index=False, encoding="utf-8")
    logger.info("------------------------------------------------------------")
    logger.info(f"[Done] CAM index saved: {idx_csv}")
    logger.info(f"[Done] CAM images saved under: {EXPL_DIR}/fold**/")
    logger.info("==== End explain_cam.py ====")

if __name__ == "__main__":
    main()


In [None]:
# explain_cam.py（Grad-CAM + ΔCAM）流程圖
# =============================================================================
# 用 Graphviz 把 explain_cam.py 的處理步驟畫成流程圖，方便放報告或論文
# 輸入：manifest、Mel/STFT 圖、指定的五個權重、每折最佳門檻 CSV
# 輸出：流程圖 PNG，固定存到 data/out/explain/cam/
# 小觀察：流程看起來很長，但真正吃時間的是 test 推論與逐張疊圖輸出
# =============================================================================

from graphviz import Digraph
from pathlib import Path


def draw_explain_cam_flow_cn():
    # 輸出位置固定，避免到處散落
    out_dir = Path("/home/jovyan/114-1_HA/data/out/explain/cam")
    out_dir.mkdir(parents=True, exist_ok=True)
    out_png = out_dir / "流程圖_explain_cam_GradCAM_ΔCAM.png"

    g = Digraph("流程圖_explain_cam", format="png")
    g.attr(rankdir="LR", splines="ortho", nodesep="0.35", ranksep="0.55")
    g.attr("node", shape="box", style="rounded", fontsize="11")

    # ========== 輸入 ==========
    with g.subgraph(name="cluster_in") as c:
        c.attr(label="輸入資料與依賴檔案", style="rounded")
        c.node(
            "IN1",
            "清單資料 manifest_S_only.csv\n欄位 wav_name, patient_id, label\n用途 連到圖檔與真實標籤"
        )
        c.node(
            "IN2",
            "影像資料\nMel PNG pic_v4/mel/*.png\nSTFT PNG pic_v4/stft/*.png\n用途 作為雙輸入推論與疊圖底圖"
        )
        c.node(
            "IN3",
            "權重白名單 五個\nfold**_best_by_valauroc_epochXXX.weights.h5\n用途 限縮只看最關鍵的模型版本"
        )
        c.node(
            "IN4",
            "每折最佳門檻 fold**_best_metrics.csv\n欄位 best_thr_on_val\n用途 推論後切 TP TN FP FN\n備用 找不到就用 0.5"
        )

    # ========== 每折處理 ==========
    with g.subgraph(name="cluster_loop") as c:
        c.attr(label="每折處理流程 只跑白名單五個權重", style="rounded")

        c.node("S1", "解析權重檔名\n抽出 fold 與 epoch\n順便檢查格式是否正確")
        c.node(
            "S2",
            "重建切分 對齊訓練\nStratifiedKFold 5 折 shuffle seed=42\n本折 test set 使用 val_idx"
        )
        c.node(
            "S3",
            "建立模型 與訓練一致\n雙輸入 shared EfficientNetB0\nmel_gap stft_gap concat head prob"
        )
        c.node("S4", "載入本折權重\nKeras 3 直接 load_weights\n避免 by_name")
        c.node(
            "S5",
            "建立 CAM 子模型\n輸出 feature_map 與 prob\nmel_feat 取 mel_gap.input\nstft_feat 取 stft_gap.input\n小提醒 這樣才不會出現 grads=None"
        )
        c.node(
            "S6",
            "test 推論\n逐筆讀 mel stft 圖\n輸出 prob\n不經意發現 這段最容易卡在缺圖或讀檔慢"
        )
        c.node(
            "S7",
            "門檻分類\nthr 來自 best_thr_on_val\n沒有就用 0.5\n計算 TP TN FP FN 混淆矩陣"
        )
        c.node(
            "S8",
            "抽樣\n每類抽 K_PER_CASE 張\n抽不到就用機率排序補齊\n用途 每折仍能產出完整案例組合"
        )
        c.node(
            "S9",
            "逐樣本產圖\nGrad-CAM mel\nGrad-CAM stft\nΔCAM = mel - stft\n正規化後輸出 overlay"
        )
        c.node(
            "S10",
            "記錄索引\n存檔路徑 標籤 機率 門檻 權重\n最後彙整成 cam_index.csv"
        )

    # ========== 輸出 ==========
    with g.subgraph(name="cluster_out") as c:
        c.attr(label="輸出產物", style="rounded")
        c.node(
            "OUT1",
            "本折 CAM 圖檔\n__mel_cam.png\n__stft_cam.png\n__dcam.png\n位置 explain/cam/fold**/"
        )
        c.node("OUT2", "索引表 cam_index.csv\n每張圖的 meta 與路徑")
        c.node("OUT3", "執行紀錄 log_explain/explain_cam_*.log\n用途 回溯抽樣與缺圖狀況")

    # ========== 連線 ==========
    g.edge("IN3", "S1")
    g.edge("IN1", "S2")
    g.edge("S1", "S3")
    g.edge("S3", "S4")
    g.edge("S4", "S5")
    g.edge("IN2", "S6")
    g.edge("S2", "S6")
    g.edge("S6", "S7")
    g.edge("IN4", "S7")
    g.edge("S7", "S8")
    g.edge("S8", "S9")
    g.edge("S9", "S10")

    g.edge("S9", "OUT1")
    g.edge("S10", "OUT2")
    g.edge("S1", "OUT3")

    g.render(filename=str(out_png).replace(".png", ""), cleanup=True)
    print(f"已輸出流程圖：{out_png}")


if __name__ == "__main__":
    draw_explain_cam_flow_cn()


In [None]:
# pip install graphviz
from graphviz import Digraph

def draw_explain_feature_flow(out_png="flow_explain_feature.png"):
    g = Digraph("explain_feature_flow", format="png")
    g.attr(rankdir="LR", splines="ortho", nodesep="0.35", ranksep="0.5")
    g.attr("node", shape="box", style="rounded", fontsize="11")

    with g.subgraph(name="cluster_inputs") as c:
        c.attr(label="Inputs / Artifacts", style="rounded")
        c.node("M", "manifest_S_only.csv\n(wav_name, patient_id, label)")
        c.node("I", "Mel PNGs + STFT PNGs\n(pic_v4/mel, pic_v4/stft)")
        c.node("W", "Allowed weights ×5\n(fold**_best_by_valauroc_epochXXX.weights.h5)")

    with g.subgraph(name="cluster_fold") as c:
        c.attr(label="Per-Fold Loop", style="rounded")
        c.node("F0", "Rebuild fold test indices\nStratifiedKFold(5, shuffle=True, seed=42)")
        c.node("F1", "Build model (same as training)\nDual-input + shared EfficientNetB0\nGAP → concat → head → prob")
        c.node("F2", "Load weights for fold")
        c.node("F3", "Sample N test examples\nN_SAMPLES_PER_FOLD (random)")
        c.node("F4", "Load images + predict base prob\np_i = model(mel_i, stft_i)")
        c.node("F5", "Compute mean images\nmel_mean, stft_mean (baseline replacement)")
        c.node("F6", "Branch contribution\nmel_off: replace mel_i → mel_mean\nstft_off: replace stft_i → stft_mean\nΔp = p_i - p_replaced")
        c.node("F7", "Occlusion importance: Frequency bands\nSplit H into N_FREQ_BANDS\nFor each band: occlude band → Δp mean")
        c.node("F8", "Occlusion importance: Time segments\nSplit W into N_TIME_SEGS\nFor each seg: occlude seg → Δp mean")
        c.node("F9", "Save CSVs per fold\nfreq_importance.csv\ntime_importance.csv\nbranch_contrib.csv")
        c.node("F10", "Append fold summary\nmel_branch_mean, stft_branch_mean")

    with g.subgraph(name="cluster_outputs") as c:
        c.attr(label="Outputs", style="rounded")
        c.node("O1", "fold##_freq_importance.csv\n(mel, stft)")
        c.node("O2", "fold##_time_importance.csv\n(mel, stft)")
        c.node("O3", "fold##_branch_contrib.csv\n(mel_off, stft_off)")
        c.node("O4", "feature_summary_allfolds.csv\n(branch means across folds)")

    g.edge("M", "F0")
    g.edge("W", "F1")
    g.edge("F0", "F3")
    g.edge("F1", "F2")
    g.edge("F2", "F4")
    g.edge("I", "F4")
    g.edge("F4", "F5")
    g.edge("F5", "F6")
    g.edge("F4", "F7")
    g.edge("F4", "F8")
    g.edge("F6", "F9")
    g.edge("F7", "F9")
    g.edge("F8", "F9")
    g.edge("F9", "F10")
    g.edge("F9", "O1")
    g.edge("F9", "O2")
    g.edge("F9", "O3")
    g.edge("F10", "O4")

    g.render(filename=out_png.replace(".png", ""), cleanup=True)
    print(f"[Saved] {out_png}")

if __name__ == "__main__":
    draw_explain_feature_flow()

In [None]:
# 列出 fold01 下所有 CAM 圖檔
# 目標：找出模型輸出機率最接近門檻值的樣本
# 越接近門檻的樣本，通常越能反映模型猶豫區域

import re
from pathlib import Path

# 使用當時驗證集選出的最佳門檻
thr = 0.315

# CAM 圖檔所在資料夾
d = Path("/home/jovyan/114-1_HA/data/out/explain/cam/fold01")

# 從檔名中擷取機率 p 與門檻 thr
# 檔名結尾只處理 mel_cam / stft_cam / dcam 三種
pat = re.compile(
    r"_p(?P<p>\d+\.\d+)_thr(?P<thr>\d+\.\d+).*_(mel_cam|stft_cam|dcam)\.png$"
)

items = []

# 掃描所有 png，僅保留符合命名規則者
for f in d.glob("*.png"):
    m = pat.search(f.name)
    if not m:
        continue

    # 從檔名取出模型預測機率
    p = float(m.group("p"))

    # 儲存與門檻的距離，方便排序
    items.append((abs(p - thr), p, f.name))

# 依 |p - thr| 由小到大排序
# 排序前幾名通常是 decision boundary 附近的關鍵樣本
items.sort()

# 取最接近門檻的前 10 張
items[:10]


In [None]:
!pip install graphviz

In [None]:
from pathlib import Path
p = Path("/home/jovyan/114-1_HA/data/pic_v3/mel")
print("exists:", (p/"1_S1_1.png").exists())
print("similar:", sorted([x.name for x in p.glob("1_S1_1.*")])[:10])

In [None]:
class_weight只用在 train loss
Checkpoint：監控 val_auroc