In [None]:
import os
if 'VERBOSE_CONFIG' not in globals():
    VERBOSE_CONFIG = True

if 'NOISE_COLOR' not in globals():
    NOISE_COLOR = os.getenv('NOISE_COLOR', 'white')

In [None]:
import numpy as _np
import torch as _torch
import librosa as _librosa

_NOISE_CACHE = {}

def _load_noise_1d(path: str, sr: int):
    key = (str(path), int(sr))
    arr = _NOISE_CACHE.get(key, None)
    if arr is None:
        wav, _ = _librosa.load(path, sr=sr, mono=True)
        if wav.size == 0:
            wav = _np.zeros(1, dtype=_np.float32)
        arr = wav.astype(_np.float32).copy()
        _NOISE_CACHE[key] = arr
    return arr

def _to_tensor(x):
    if _torch.is_tensor(x):
        return x.float(), "torch"
    if isinstance(x, _np.ndarray):
        return _torch.from_numpy(x.copy()).float(), "numpy"
    try:
        return _torch.tensor(x).float(), "other"
    except Exception:
        raise TypeError(f"Unsupported type for add_specific_noise: {type(x)}")

def _to_output(x_tensor, kind):
    if kind == "torch":
        return x_tensor
    if kind in ("numpy", "other"):
        return x_tensor.detach().cpu().numpy()
    return x_tensor

def _pick_segment_torch(noise_tensor: _torch.Tensor, length: int, gen: _torch.Generator | None = None):
    n = int(noise_tensor.numel())
    if n == 0:
        return _torch.zeros(length, dtype=_torch.float32)
    if n >= length:
        max_start = n - length
        start = int(_torch.randint(low=0, high=max_start+1, size=(1,), generator=gen).item())
    reps = (length + n - 1) // n
    tiled = noise_tensor.repeat(reps)[:length]
    return tiled.clone()

def _mix_to_snr_torch(clean: _torch.Tensor, noise: _torch.Tensor, snr_db: float):
    eps = 1e-12
    clean64 = clean.double()
    noise64 = noise.double()
    p_clean = _torch.sum(clean64**2) + eps
    p_noise = _torch.sum(noise64**2) + eps
    target_ratio = 10.0 ** (-float(snr_db) / 10.0)
    alpha = _torch.sqrt(_torch.tensor(target_ratio, dtype=_torch.float64) * p_clean / p_noise)
    mixed = clean64 + alpha * noise64
    return mixed.float()

def add_specific_noise(x, noise_path: str, snr_db: float, target_sr: int = None, rng=None):
    """
    Add file-based noise at a given SNR.
    - If x is 1D (waveform) or 2D (features), supports both torch.Tensor and numpy.ndarray.
    - 1D: time-domain mixing using file noise.
    - 2D: feature-domain fallback using Gaussian noise calibrated to target SNR.
    """
    x_t, kind = _to_tensor(x)

    if x_t.ndim == 1:
        assert target_sr is not None, "target_sr must be set for 1D waveform augmentation"
        noise_np = _load_noise_1d(noise_path, target_sr)
        noise_t = _torch.from_numpy(noise_np).float()
        seg = _pick_segment_torch(noise_t, x_t.numel())
        out = _mix_to_snr_torch(x_t.view(-1), seg.view(-1), float(snr_db))
        return _to_output(out.view_as(x_t), kind)

    if x_t.ndim == 2:
        clean = x_t
        eps = 1e-12
        clean64 = clean.double()
        p_clean = _torch.sum(clean64**2) + eps
        g = _torch.randn_like(clean)
        p_g = _torch.sum(g.double()**2) + eps
        target_ratio = 10.0 ** (-float(snr_db) / 10.0)
        alpha = _torch.sqrt(_torch.tensor(target_ratio, dtype=_torch.float64) * p_clean / p_g)
        out = (clean64 + alpha * g.double()).float()
        return _to_output(out, kind)

    return x

In [None]:
from pathlib import Path

DATA_ROOT = Path(globals().get("DATA_ROOT", "/kaggle/input/speech-commands"))
NOISE_ROOT = Path(globals().get("NOISE_ROOT", "/kaggle/input/speech-commands/_background_noise_"))
WORK_ROOT  = Path(globals().get("WORK_ROOT", "/kaggle/working"))
OUTPUT_ROOT = Path(globals().get("OUTPUT_ROOT", WORK_ROOT / "outputs"))
OUTPUT_ROOT.mkdir(parents=True, exist_ok=True)

def resolve_noise_path(root=NOISE_ROOT, name="white"):
    name = str(name).lower()
    candidates = {
        "white": ["white_noise.wav","white.wav","white-noise.wav"],
        "pink":  ["pink_noise.wav","pink.wav","pink-noise.wav"],
    }
    files = candidates.get(name, [])
    for fn in files:
        p = Path(root) / fn
        if p.exists():
            return p.as_posix()
    try:
        for p in Path(root).glob("*.wav"):
            if name in p.name.lower():
                return p.as_posix()
    except Exception:
        pass
    return (Path(root) / (files[0] if files else "")).as_posix()

WHITE_PATH = resolve_noise_path(NOISE_ROOT, "white")
PINK_PATH  = resolve_noise_path(NOISE_ROOT, "pink")


In [None]:
import os, random, math, time, json
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchaudio

from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, f1_score

if "COMMIT_MODE" not in globals():
    COMMIT_MODE = False


def set_seed(seed: int = 42, deterministic: bool = True):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = bool(deterministic)
    torch.backends.cudnn.benchmark = not bool(deterministic)

USE_CUDA = torch.cuda.is_available()
device = torch.device("cuda" if USE_CUDA else "cpu")
PIN_MEMORY = True if USE_CUDA else False
NON_BLOCK = True if USE_CUDA else False

SR_LIST = sorted(set(int(s) for s in (globals().get("SR_LIST", [16000]))))
SR = SR_LIST[-1]
MAX_LENGTH = SR


def mfcc_params_for_sr(sr: int) -> dict:
    sr = int(sr)
    if sr == 16000:
        return {"win_length": 400, "hop_length": 160, "n_fft": 512}
    if sr == 8000:
        return {"win_length": 200, "hop_length": 80, "n_fft": 256}
    win_length = int(round(sr * 0.025))
    hop_length = int(round(sr * 0.010))
    n_fft = 1
    while n_fft < win_length:
        n_fft <<= 1
    return {"win_length": win_length, "hop_length": hop_length, "n_fft": n_fft}

_rule = mfcc_params_for_sr(SR)

BATCH_SIZE = 64
EPOCHS = 10
N_SPLITS = 5
SEEDS = [36, 38, 42]

ARCH = "bilstm_transformer"

LSTM_HIDDEN  = 128
LSTM_LAYERS  = 2
LSTM_DROPOUT = 0.2

NOISE_PROB = 0.30
SNR_RANGE = (5, 20)

NUM_WORKERS = max(2, (os.cpu_count() or 2) // 2)

In [None]:
from pathlib import Path
import os, pandas as pd, json, hashlib

WANTED_CLASSES = ["up","down","left","right"]
ALLOWED_EXTS = (".wav",)

root = Path(DATA_ROOT)
assert root.exists(), f"DATA_ROOT tidak ditemukan: {root}"

FORCE_RESCAN = False
VERBOSE = True

manifests_dir = Path(OUTPUT_ROOT) / "manifests"
manifests_dir.mkdir(parents=True, exist_ok=True)

sig = hashlib.md5(json.dumps({
    "root": str(root.resolve()),
    "wanted": sorted(WANTED_CLASSES),
    "exts": list(ALLOWED_EXTS),
}, sort_keys=True).encode()).hexdigest()
MANIFEST_CSV = manifests_dir / f"manifest_subset_{sig}.csv"

def _log_manifest(action: str, counts):
    if VERBOSE and VERBOSE_CONFIG:
        print(f"[MANIFEST] {action}: total={int(counts.sum())} per_class={dict(counts)}")

def scan_subset_one_level():
    missing = [c for c in WANTED_CLASSES if not (root / c).is_dir()]
    assert not missing, f"Folder kelas tidak ditemukan: {missing}"
    rows = []
    for cls in sorted(WANTED_CLASSES):
        d = root / cls
        for ent in os.scandir(d):
            if ent.is_file() and ent.name.lower().endswith(ALLOWED_EXTS):
                rows.append((str(Path(ent.path)), cls))
    assert rows, f"Tidak ada file {ALLOWED_EXTS} untuk subset kelas {WANTED_CLASSES} di {root}"
    return pd.DataFrame(rows, columns=["path","label"])

reuse_ok = False
if (not FORCE_RESCAN) and ("all_samples" in globals()) and isinstance(all_samples, list) and all_samples:
    _df_in = pd.DataFrame(all_samples, columns=["path","label"])
    cur_classes = sorted(_df_in["label"].unique().tolist())
    if sorted(WANTED_CLASSES) == cur_classes:
        _df = _df_in.copy()
        reuse_ok = True
        counts = _df["label"].value_counts().sort_index()
        _log_manifest("reuse in-memory", counts)

if (not reuse_ok) and (not FORCE_RESCAN) and MANIFEST_CSV.exists():
    _df = pd.read_csv(MANIFEST_CSV)
    cur_classes = sorted(_df["label"].unique().tolist())
    if sorted(WANTED_CLASSES) == cur_classes:
        counts = _df["label"].value_counts().sort_index()
        _log_manifest("reuse manifest", counts)
    else:
        _df = scan_subset_one_level()
        _df.to_csv(MANIFEST_CSV, index=False)
        counts = _df["label"].value_counts().sort_index()
        _log_manifest("refresh manifest", counts)

if 'all_samples' not in globals() or not reuse_ok:
    if '_df' not in locals():
        _df = scan_subset_one_level()
        _df.to_csv(MANIFEST_CSV, index=False)
        counts = _df["label"].value_counts().sort_index()
        _log_manifest("scan subset", counts)

all_samples = list(map(tuple, _df[["path","label"]].itertuples(index=False, name=None)))
SELECTED_CLASSES = sorted(WANTED_CLASSES)
CLASS_TO_IDX = {c:i for i,c in enumerate(SELECTED_CLASSES)}

if VERBOSE_CONFIG:
    print(f"[DATASET] classes={len(SELECTED_CLASSES)} samples={len(all_samples)}")

In [None]:
import os
import torch
from torch.utils.data import DataLoader
from sklearn.model_selection import StratifiedKFold
import numpy as np

NOISE_PATH  = globals().get("NOISE_PATH", globals().get("WHITE_PATH", None))
NOISE_PROB  = globals().get("NOISE_PROB", 0.0)
SNR_RANGE   = globals().get("SNR_RANGE", (5, 20))
COMMIT_MODE = globals().get("COMMIT_MODE", False)
N_MFCC      = globals().get("N_MFCC", 40)
N_MELS      = globals().get("N_MELS", 40)


def build_kfold_loaders_generic(
    samples, class_to_idx,
    sr=None, n_splits=5, seed=42,
    batch_size=64, num_workers=2, pin_memory=True,
    noise_path=None,
    max_length=None,
    noise_prob=None, snr_range=None,
    augment_mode="file_noise",
    norm_mode="none",
    crop_mode="left"
):
    sr = int(sr if sr is not None else globals().get("SR", 16000))
    max_length = int(max_length if max_length is not None else sr)
    if noise_prob is None: noise_prob = NOISE_PROB
    if snr_range is None:  snr_range  = SNR_RANGE

    params = mfcc_params_for_sr(sr)
    n_fft_rule      = params["n_fft"]
    win_length_rule = params["win_length"]
    hop_length_rule = params["hop_length"]

    effective_noise_path = noise_path if noise_path is not None else NOISE_PATH
    use_file_noise = (augment_mode == "file_noise")
    if use_file_noise and (effective_noise_path is None or not os.path.exists(effective_noise_path)):
        raise FileNotFoundError(
            f"[ERROR] Noise file tidak ditemukan!\n"
            f"  - noise_path: {effective_noise_path}\n"
            f"  - NOISE_COLOR: {globals().get('NOISE_COLOR', 'unknown')}\n"
        )

    y_all = np.array([class_to_idx[c] for _, c in samples], dtype=np.int64)
    skf = StratifiedKFold(n_splits=int(n_splits), shuffle=True, random_state=int(seed))

    use_cuda = torch.cuda.is_available()
    pm = bool(pin_memory and use_cuda)

    dl_args = dict(
        batch_size=int(batch_size),
        num_workers=int(num_workers),
        pin_memory=pm,
        persistent_workers=True if int(num_workers) > 0 else False,
        drop_last=False,
    )
    if int(num_workers) > 0:
        dl_args['prefetch_factor'] = 4

    g_base = int(seed) * 1_000_003
    folds = []

    for fold_id, (tr_idx, va_idx) in enumerate(skf.split(samples, y_all), start=1):
        tr_s = [samples[i] for i in tr_idx]
        va_s = [samples[i] for i in va_idx]

        ds_tr = VoiceCommandDatasetWithNoise(
            tr_s, class_to_idx,
            noise_path=(effective_noise_path if use_file_noise else None),
            is_training=True,
            sr=sr, max_length=max_length,
            n_mfcc=N_MFCC, n_fft=n_fft_rule, hop_length=hop_length_rule, n_mels=N_MELS,
            augment=("file_noise" if use_file_noise else "none"),
            noise_prob=noise_prob, snr_range=snr_range,
            norm_mode=norm_mode, crop_mode=crop_mode,
            return_path=False, seed=g_base + fold_id
        )
        ds_va = VoiceCommandDatasetWithNoise(
            va_s, class_to_idx,
            noise_path=None, is_training=False,
            sr=sr, max_length=max_length,
            n_mfcc=N_MFCC, n_fft=n_fft_rule, hop_length=hop_length_rule, n_mels=N_MELS,
            augment="none", noise_prob=0.0, snr_range=snr_range,
            norm_mode=norm_mode, crop_mode=crop_mode,
            return_path=False, seed=g_base + 10_000 + fold_id
        )

        g = torch.Generator(device="cpu"); g.manual_seed(g_base + fold_id)

        dl_tr = DataLoader(ds_tr, shuffle=True,  generator=g, **dl_args)
        dl_va = DataLoader(ds_va, shuffle=False,                 **dl_args)

        if not COMMIT_MODE and VERBOSE_CONFIG:
            aug_str = ("file_noise" if use_file_noise else "none")
            print(f"[SR={sr} | seed={seed} | fold={fold_id}] train={len(tr_s)} val={len(va_s)}"\
                  f" | n_fft={n_fft_rule} win={win_length_rule} hop={hop_length_rule} (workers={num_workers}, pin_memory={pm})"\
                  f" | augment={aug_str}")

        folds.append({"fold": fold_id, "train_loader": dl_tr, "val_loader": dl_va})

    return folds

In [None]:
# Choose active noise file based on NOISE_COLOR (white/pink) and expose NOISE_PATH for downstream
try:
    color = str(NOISE_COLOR).lower()
except Exception:
    color = "white"

if color == "pink" and 'PINK_PATH' in globals():
    _candidate = PINK_PATH
    _name = "pink"
else:
    _candidate = WHITE_PATH if 'WHITE_PATH' in globals() else None
    _name = "white"

if not _candidate or not os.path.exists(_candidate):
    ACTIVE_NOISE_PATH = None
    ACTIVE_NOISE_NAME = f"{_name} (missing)"
else:
    ACTIVE_NOISE_PATH = _candidate
    ACTIVE_NOISE_NAME = _name

globals()["NOISE_PATH"] = ACTIVE_NOISE_PATH

In [None]:
if VERBOSE_CONFIG:
    mfcc_all = {sr: mfcc_params_for_sr(int(sr)) for sr in SR_LIST}
    lines = ["[CONFIG]",
             f"ARCH={ARCH}",
             f"Device={device}",
             f"SR_LIST={SR_LIST}",
             "MFCC params per SR:" ]
    for sr,val in mfcc_all.items():
        lines.append(f"  SR={sr}: n_fft={val['n_fft']} win={val['win_length']} hop={val['hop_length']}")
    lines.extend([
        f"Classes={len(SELECTED_CLASSES)} Samples={len(all_samples)}",
        f"Noise color={ACTIVE_NOISE_NAME} path={ACTIVE_NOISE_PATH if ACTIVE_NOISE_PATH else 'None'}",
        f"Noise prob={NOISE_PROB} SNR_RANGE={SNR_RANGE}",
        f"Batch size={BATCH_SIZE} Epochs={EPOCHS} KFold={N_SPLITS} Seeds={SEEDS}",
        f"LSTM hidden={LSTM_HIDDEN} layers={LSTM_LAYERS} dropout={LSTM_DROPOUT}"])
    print("\n".join(lines))

In [None]:
class VoiceCommandDatasetWithNoise(Dataset):
    def __init__(self,
                 samples, class_to_idx,
                 noise_path=None, is_training=True,
                 sr=SR, max_length=None,
                 n_mfcc=N_MFCC, n_fft=400, hop_length=160, n_mels=None,
                 augment="file_noise", noise_prob=0.0,
                 snr_range=(5,20), fixed_snr_db=None,
                 norm_mode="none", global_mean=None, global_std=None,
                 crop_mode="left", return_path=False, seed=None):
        super().__init__()
        self.samples, self.class_to_idx = samples, class_to_idx
        self.noise_path, self.is_training = noise_path, bool(is_training)
        self.sr = int(sr)
        self.max_length = int(max_length if max_length is not None else sr)

        self.augment      = str(augment)
        self.noise_prob   = float(noise_prob)
        self.snr_range    = snr_range
        self.fixed_snr_db = fixed_snr_db
        self.norm_mode    = str(norm_mode)
        self.global_mean  = (torch.tensor(global_mean, dtype=torch.float32)
                             if global_mean is not None else None)
        self.global_std   = (torch.tensor(global_std, dtype=torch.float32)
                             if global_std is not None else None)
        self.crop_mode    = str(crop_mode)
        self.return_path  = bool(return_path)
        self._rng = random.Random(seed) if seed is not None else random

        if self.is_training and self.augment == "file_noise":
            if not self.noise_path or not os.path.exists(self.noise_path):
                raise FileNotFoundError(f"noise_path tidak valid: {self.noise_path}")

    def __len__(self):
        return len(self.samples)

    def _crop(self, x_2d):
        T = x_2d.shape[1]
        if T < self.max_length:
            return F.pad(x_2d, (0, self.max_length - T))
        if T > self.max_length:
            start = 0 if self.crop_mode == "left" else max(0, (T - self.max_length)//2)
            return x_2d[:, start:start+self.max_length]
        return x_2d

    def _maybe_augment(self, x_2d):
        if not (self.is_training and self.augment != "none"):
            return x_2d
        if self._rng.random() >= self.noise_prob:
            return x_2d

        snr_db = float(self.fixed_snr_db) if self.fixed_snr_db is not None \
                 else self._rng.uniform(*self.snr_range)

        if self.augment == "file_noise" and self.noise_path:
            x_1d = x_2d.squeeze(0)  # [1, T] -> [T]
            x_aug = add_specific_noise(x_1d, self.noise_path, snr_db, target_sr=self.sr)
            return x_aug.unsqueeze(0)  # [T] -> [1, T]

        return x_2d

    def __getitem__(self, idx: int):
        path, cname = self.samples[idx]
        y = int(self.class_to_idx[cname])

        x_2d, sr0 = torchaudio.load(path)
        if x_2d.dim() == 1:
            x_2d = x_2d.unsqueeze(0)
        if x_2d.shape[0] > 1:
            x_2d = x_2d.mean(0, keepdim=True)

        if sr0 != self.sr:
            x_2d = torchaudio.functional.resample(x_2d, sr0, self.sr)

        x_2d = self._crop(x_2d).to(torch.float32)
        x_2d = self._maybe_augment(x_2d)

        x_1d = x_2d.squeeze(0).contiguous()
        return (x_1d, torch.tensor(y, dtype=torch.long))

In [None]:
import math
import torch
import torch.nn as nn

class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_len: int = 4096, dropout: float = 0.0):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        pe = torch.zeros(max_len, d_model, dtype=torch.float32)
        position = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float32) *
                             (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer("pe", pe.unsqueeze(0), persistent=False)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x: [B,T,d_model]
        T = x.size(1)
        pe = self.pe[:, :T, :].to(dtype=x.dtype, device=x.device)
        return self.dropout(x + pe)

class MFCC_Transformer(nn.Module):
    def __init__(self,
                 n_mfcc: int,
                 num_classes: int,
                 d_model: int = 128,
                 nhead: int = 4,
                 num_layers: int = 2,
                 dim_feedforward: int = 256,
                 dropout: float = 0.3,
                 max_len: int = 4096):
        super().__init__()
        self.n_mfcc = int(n_mfcc)
        self.input_proj = nn.Linear(self.n_mfcc, d_model)
        self.posenc  = PositionalEncoding(d_model=d_model, max_len=max_len, dropout=dropout)
        enc_layer    = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead,
                                                  dim_feedforward=dim_feedforward,
                                                  dropout=dropout, batch_first=True)
        self.encoder = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
        self.head = nn.Sequential(
            nn.LayerNorm(d_model),
            nn.Linear(d_model, num_classes)
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x: [B,T,n_mfcc] / [B,n_mfcc,T]
        if x.dim() != 3:
            raise ValueError(f"Expected 3D input, got {tuple(x.shape)}")
        if x.shape[-1] == self.n_mfcc:
            seq = x                                 # [B,T,F]
        elif x.shape[1] == self.n_mfcc:
            seq = x.transpose(1, 2).contiguous()    # [B,T,F]
        else:
            raise ValueError(f"Input last dim must be n_mfcc={self.n_mfcc}; got {tuple(x.shape)}")
        h = self.input_proj(seq)
        h = self.posenc(h)
        h = self.encoder(h)
        feat = h.mean(dim=1)
        return self.head(feat)

class BiLSTM_Transformer(nn.Module):
    def __init__(self,
                 n_mfcc: int,
                 num_classes: int,
                 lstm_hidden: int = 128,
                 lstm_layers: int = 2,
                 bidirectional: bool = True,
                 lstm_dropout: float = 0.2,
                 d_model: int = 128,
                 nhead: int = 4,
                 num_layers: int = 2,
                 dim_feedforward: int = 256,
                 dropout: float = 0.3,
                 max_len: int = 4096):
        super().__init__()
        self.n_mfcc = int(n_mfcc)
        self.bidir = bool(bidirectional)
        self.lstm = nn.LSTM(
            input_size=self.n_mfcc,
            hidden_size=lstm_hidden,
            num_layers=lstm_layers,
            batch_first=True,
            dropout=(lstm_dropout if lstm_layers > 1 else 0.0),
            bidirectional=self.bidir
        )
        lstm_out = lstm_hidden * (2 if self.bidir else 1)
        self.proj = nn.Linear(lstm_out, d_model)
        self.posenc  = PositionalEncoding(d_model=d_model, max_len=max_len, dropout=dropout)
        enc_layer    = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead,
                                                  dim_feedforward=dim_feedforward,
                                                  dropout=dropout, batch_first=True)
        self.encoder = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
        self.head = nn.Sequential(
            nn.LayerNorm(d_model),
            nn.Linear(d_model, num_classes)
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x: [B,T,n_mfcc] / [B,n_mfcc,T]
        if x.dim() != 3:
            raise ValueError(f"Expected 3D input, got {tuple(x.shape)}")
        if x.shape[-1] == self.n_mfcc:
            seq = x                                
        elif x.shape[1] == self.n_mfcc:
            seq = x.transpose(1, 2).contiguous()    
        else:
            raise ValueError(f"Input last dim must be n_mfcc={self.n_mfcc}; got {tuple(x.shape)}")

        lstm_out, _ = self.lstm(seq)                
        h = self.proj(lstm_out)                     
        h = self.posenc(h)
        h = self.encoder(h)
        feat = h.mean(dim=1)
        return self.head(feat)

def build_model(arch: str, n_mfcc: int, num_classes: int) -> nn.Module:
    arch = (arch or "transformer").lower()
    if arch in ("transformer", "mfcc_transformer"):
        return MFCC_Transformer(n_mfcc=n_mfcc, num_classes=num_classes)
    elif arch in ("bilstm_transformer", "bilstm+transformer", "lstm_transformer"):
        lstm_hidden  = globals().get("LSTM_HIDDEN", 128)
        lstm_layers  = globals().get("LSTM_LAYERS", 2)
        lstm_bidir   = globals().get("LSTM_BIDIR", True)
        lstm_dropout = globals().get("LSTM_DROPOUT", 0.2)
        return BiLSTM_Transformer(
            n_mfcc=n_mfcc, num_classes=num_classes,
            lstm_hidden=lstm_hidden, lstm_layers=lstm_layers,
            bidirectional=lstm_bidir, lstm_dropout=lstm_dropout
        )
    else:
        raise ValueError(f"Unknown ARCH: {arch}")

In [None]:
from contextlib import nullcontext
from torch.nn.utils import clip_grad_norm_

_params = mfcc_params_for_sr(SR)
print(f"[MFCC] SR={SR} -> n_fft={_params['n_fft']} win_length={_params['win_length']} hop_length={_params['hop_length']}")
mfcc = torchaudio.transforms.MFCC(
    sample_rate=SR, n_mfcc=N_MFCC,
    melkwargs={
        "n_mels": N_MELS,
        "n_fft": int(_params["n_fft"]),
        "hop_length": int(_params["hop_length"]),
        "win_length": int(_params["win_length"]),
        "center": True, "f_min": 0.0, "f_max": SR / 2
    }
).to(device)

def _compute_feats_gpu(wav: torch.Tensor) -> torch.Tensor:
    feats = mfcc(wav)                 
    feats = feats.transpose(1, 2).contiguous()  
    return feats

def train_one_epoch(model, loader, optimizer, criterion, scheduler=None, grad_clip_norm=None):
    model.train()
    total_loss = 0.0

    use_amp = ("SCALER" in globals()) and (SCALER is not None) and (device.type == "cuda")
    amp_ctx = (torch.autocast(device_type="cuda", dtype=AMP_DTYPE) if use_amp else nullcontext())

    step_per_batch = (
        scheduler is not None
        and not isinstance(scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau)
    )

    for wav, y in loader:  
        wav = wav.to(device, non_blocking=NON_BLOCK).float()  
        y   = y.to(device,   non_blocking=NON_BLOCK).long()

        optimizer.zero_grad(set_to_none=True)
        with amp_ctx:
            feats  = _compute_feats_gpu(wav)  
            logits = model(feats)
            loss   = criterion(logits, y)

        if use_amp:
            SCALER.scale(loss).backward()
            if grad_clip_norm is not None:
                SCALER.unscale_(optimizer)
                clip_grad_norm_(model.parameters(), max_norm=float(grad_clip_norm))
            SCALER.step(optimizer)
            SCALER.update()
        else:
            loss.backward()
            if grad_clip_norm is not None:
                clip_grad_norm_(model.parameters(), max_norm=float(grad_clip_norm))
            optimizer.step()

        if step_per_batch:
            scheduler.step()

        total_loss += loss.item() * y.size(0)

    return float(total_loss) / float(len(loader.dataset))


@torch.no_grad()
def evaluate(model, loader, criterion=None):
    model.eval()
    all_true, all_pred = [], []
    total_loss = 0.0
    have_loss = criterion is not None

    use_amp = (device.type == "cuda")
    amp_ctx = (torch.autocast(device_type="cuda", dtype=AMP_DTYPE) if use_amp else nullcontext())

    for wav, y in loader:
        wav = wav.to(device, non_blocking=NON_BLOCK).float()
        y   = y.to(device,   non_blocking=NON_BLOCK).long()

        with amp_ctx:
            feats  = _compute_feats_gpu(wav)
            logits = model(feats)
            if have_loss:
                total_loss += criterion(logits, y).item() * y.size(0)

        pred = logits.argmax(dim=1)
        all_true.extend(y.tolist())
        all_pred.extend(pred.tolist())

    acc = accuracy_score(all_true, all_pred)
    f1  = f1_score(all_true, all_pred, average="macro")
    avg_loss = (float(total_loss) / float(len(loader.dataset))) if have_loss else None

    return float(acc), float(f1), avg_loss, np.array(all_true, dtype=np.int64), np.array(all_pred, dtype=np.int64)

In [None]:
COMMIT_MODE = True

import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
plt.ioff()

from tqdm import tqdm as _tqdm

def tqdm(*args, **kwargs):

    if COMMIT_MODE:
        kwargs["disable"] = True
    return _tqdm(*args, **kwargs)

import pandas as pd
pd.set_option("display.max_rows", 20)
pd.set_option("display.max_columns", 20)
pd.set_option("display.max_colwidth", 120)

if COMMIT_MODE:
    def _no_show(*args, **kwargs):
        pass
    plt.show = _no_show

In [None]:
import io, sys, contextlib, os, time, numpy as np, pandas as pd, shutil, json
from pathlib import Path
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt

SR_LIST = sorted(set(int(s) for s in SR_LIST))

ARCH = "bilstm_transformer"


def _outdirs_for(SR: int):
    root = Path(f"/kaggle/working/kfold_outputs_sr{SR}")
    d_fold = root / "per_seed_and_fold"
    d_sum  = root / "summary"
    d_fold.mkdir(parents=True, exist_ok=True)
    d_sum.mkdir(parents=True, exist_ok=True)
    return root, d_fold, d_sum

@contextlib.contextmanager
def mute_outputs(active: bool):
    if not active:
        yield
    else:
        buf = io.StringIO()
        with contextlib.redirect_stdout(buf), contextlib.redirect_stderr(buf):
            yield

set_seed(0)

USE_CUDA   = torch.cuda.is_available()
device     = torch.device("cuda" if USE_CUDA else "cpu")
PIN_MEMORY = bool(USE_CUDA)
NON_BLOCK  = bool(USE_CUDA)
if not COMMIT_MODE:
    print("Device:", device)

AMP_ENABLE = USE_CUDA
if AMP_ENABLE:
    major_cc = torch.cuda.get_device_capability()[0]
    AMP_DTYPE = torch.bfloat16 if major_cc >= 8 else torch.float16
    try:
        SCALER = torch.amp.GradScaler(device="cuda") if AMP_DTYPE is torch.float16 else None
    except Exception:
        SCALER = torch.cuda.amp.GradScaler(enabled=(AMP_DTYPE is torch.float16))
else:
    AMP_DTYPE = None
    SCALER = None

results = []
summary_rows = []
efficiency_rows_global = []

with mute_outputs(COMMIT_MODE):
    for seed in SEEDS:
        if not COMMIT_MODE:
            print("\n" + "="*70)
            print(f"Running SEED = {seed}")
            print("="*70)
        set_seed(seed)

        for sr in SR_LIST:
            SR = int(sr)
            ROOT_OUT, OUT_FOLD, OUT_SUMM = _outdirs_for(SR)
            MAX_LENGTH = SR

            _params = mfcc_params_for_sr(SR)
            mfcc = torchaudio.transforms.MFCC(
                sample_rate=SR, n_mfcc=N_MFCC,
                melkwargs={
                    "n_mels": N_MELS,
                    "n_fft": int(_params["n_fft"]),
                    "hop_length": int(_params["hop_length"]),
                    "win_length": int(_params["win_length"]),
                    "center": True, "f_min": 0.0, "f_max": SR / 2
                }
            ).to(device)

            # Select active noise path from earlier cell
            active_noise = globals().get("ACTIVE_NOISE_PATH", globals().get("NOISE_PATH", None))

            folds = build_kfold_loaders_generic(
                all_samples, CLASS_TO_IDX,
                n_splits=N_SPLITS, seed=seed,
                batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY,
                noise_path=active_noise,
                max_length=MAX_LENGTH, noise_prob=NOISE_PROB, snr_range=SNR_RANGE,
                sr=SR, augment_mode="file_noise"
            )

            for fd in folds:
                fold_id     = fd["fold"]
                train_loader= fd["train_loader"]
                val_loader  = fd["val_loader"]

                RUN_NAME = f"{ARCH}_seed{seed}_fold{fold_id}_sr{SR}"
                OUT_SUB  = str((ROOT_OUT / "per_seed_and_fold" / RUN_NAME).resolve())
                os.makedirs(OUT_SUB, exist_ok=True)
                BEST_CKPT = os.path.join(OUT_SUB, "best_model.pth")
                BEST_FULL = os.path.join(OUT_SUB, "best_full.pt")
                LAST_CKPT = os.path.join(OUT_SUB, "last_model.pth")

                model = BiLSTM_Transformer(
                    n_mfcc=N_MFCC, num_classes=len(SELECTED_CLASSES),
                    lstm_hidden=LSTM_HIDDEN, lstm_layers=LSTM_LAYERS,
                    lstm_dropout=LSTM_DROPOUT
                ).to(device)
                assert next(model.parameters()).is_cuda == USE_CUDA, "Model belum di CUDA!"
                n_params = int(sum(p.numel() for p in model.parameters()))

                optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
                total_steps = max(1, EPOCHS * len(train_loader))
                scheduler = torch.optim.lr_scheduler.OneCycleLR(
                    optimizer, max_lr=1e-3, total_steps=total_steps
                )
                criterion = nn.CrossEntropyLoss().to(device)

                best_val_acc = -1.0
                best_true = best_pred = None
                current_best_ckpt_path = BEST_CKPT

                history = []

                for ep in range(1, EPOCHS+1):
                    model.train()
                    tr_loss_sum = 0.0
                    n_train = 0
                    ep_t0 = time.time()

                    for wav, y in train_loader:
                        wav = wav.to(device, non_blocking=NON_BLOCK).float()
                        y   = y.to(device, non_blocking=NON_BLOCK).long()
                        optimizer.zero_grad(set_to_none=True)

                        if AMP_ENABLE:
                            with torch.autocast(device_type='cuda', dtype=AMP_DTYPE):
                                feats  = _compute_feats_gpu(wav)
                                logits = model(feats)
                                loss   = criterion(logits, y)
                            if SCALER is not None:
                                SCALER.scale(loss).backward()
                                SCALER.step(optimizer)
                                SCALER.update()
                            else:
                                loss.backward()
                                optimizer.step()
                        else:
                            feats  = _compute_feats_gpu(wav)
                            logits = model(feats)
                            loss   = criterion(logits, y)
                            loss.backward()
                            optimizer.step()

                        if scheduler is not None:
                            scheduler.step()

                        bs = y.size(0)
                        tr_loss_sum += loss.item() * bs
                        n_train += bs

                    tr_loss = tr_loss_sum / max(1, n_train)
                    val_acc, val_f1, val_loss, y_true, y_pred = evaluate(model, val_loader, criterion)

                    if val_acc > best_val_acc:
                        best_val_acc = float(val_acc)
                        best_true = y_true.copy()
                        best_pred = y_pred.copy()

                        torch.save(model.state_dict(), BEST_CKPT)
                        torch.save({
                            "epoch": ep,
                            "model_state": model.state_dict(),
                            "optimizer_state": optimizer.state_dict(),
                            "scheduler_state": scheduler.state_dict() if scheduler is not None else None,
                            "val_acc": best_val_acc,
                            "sr": SR,
                            "seed": seed,
                            "fold": fold_id,
                            "classes": list(SELECTED_CLASSES),
                            "class_to_idx": CLASS_TO_IDX,
                            "n_mfcc": N_MFCC,
                            "arch": ARCH,
                            "model_class": model.__class__.__name__,
                        }, BEST_FULL)
                        current_best_ckpt_path = BEST_CKPT

                    ep_time = time.time() - ep_t0
                    throughput = float(n_train) / ep_time if ep_time > 0 else 0.0
                    history.append({
                        "epoch": ep,
                        "train_loss": float(tr_loss),
                        "val_loss": float(val_loss),
                        "val_acc": float(val_acc),
                        "val_f1": float(val_f1),
                        "epoch_time_sec": float(ep_time),
                        "throughput_samples_per_sec": float(throughput),
                    })

                    if (ep % 5 == 0 or ep == 1 or ep == EPOCHS) and not COMMIT_MODE:
                        if USE_CUDA:
                            gpu_mb = torch.cuda.memory_allocated() / 1e6
                            print(f"[SR={SR} | GPU {gpu_mb:.1f} MB]", end=" ")
                        print(f"Seed {seed} | Fold {fold_id} | Epoch {ep:02d} "
                              f"| tr_loss={tr_loss:.4f} | va_loss={val_loss:.4f} "
                              f"| va_acc={val_acc:.4f} | va_f1={val_f1:.4f} | ep_time={ep_time:.2f}s | thr={throughput:.1f}/s")

                torch.save(model.state_dict(), LAST_CKPT)

                cm = confusion_matrix(best_true, best_pred, labels=list(range(len(SELECTED_CLASSES))))
                cm_df = pd.DataFrame(cm, index=SELECTED_CLASSES, columns=SELECTED_CLASSES)
                cm_path = os.path.join(OUT_FOLD, f"cm_{ARCH}_seed{seed}_fold{fold_id}_sr{SR}.csv")
                cm_df.to_csv(cm_path)

                hist_df = pd.DataFrame(history)
                hist_csv = os.path.join(OUT_SUB, "history.csv")
                hist_df.to_csv(hist_csv, index=False)

                try:
                    plt.figure(figsize=(8,5))
                    plt.plot(hist_df["epoch"], hist_df["train_loss"], label="train_loss")
                    plt.plot(hist_df["epoch"], hist_df["val_loss"], label="val_loss")
                    plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.title("Loss Curve"); plt.legend();
                    plt.tight_layout(); plt.savefig(os.path.join(OUT_SUB, "loss_curve.png")); plt.close()

                    plt.figure(figsize=(8,5))
                    plt.plot(hist_df["epoch"], hist_df["val_acc"], label="val_acc")
                    plt.plot(hist_df["epoch"], hist_df["val_f1"], label="val_f1")
                    plt.xlabel("Epoch"); plt.ylabel("Score"); plt.title("Validation Metrics"); plt.legend();
                    plt.tight_layout(); plt.savefig(os.path.join(OUT_SUB, "metrics_curve.png")); plt.close()

                    per_class_counts = cm_df.sum(axis=1).replace(0, np.nan)
                    correct = np.diag(cm)
                    recall = correct / per_class_counts.values
                    err_rate = 1.0 - recall
                    plt.figure(figsize=(10,5))
                    plt.bar(cm_df.index, err_rate)
                    plt.ylabel("Error rate (1 - recall)"); plt.title("Error per Class")
                    plt.xticks(rotation=45, ha='right')
                    plt.tight_layout(); plt.savefig(os.path.join(OUT_SUB, "error_per_class.png")); plt.close()

                    plt.figure(figsize=(6,5))
                    plt.imshow(cm, interpolation='nearest', cmap='Blues')
                    plt.title('Confusion Matrix'); plt.colorbar()
                    tick_marks = np.arange(len(SELECTED_CLASSES))
                    plt.xticks(tick_marks, SELECTED_CLASSES, rotation=45, ha='right')
                    plt.yticks(tick_marks, SELECTED_CLASSES)
                    plt.tight_layout(); plt.ylabel('True label'); plt.xlabel('Predicted label')
                    plt.savefig(os.path.join(OUT_SUB, "confusion_matrix.png")); plt.close()
                except Exception as e:
                    print(f"[WARN] Failed to plot curves/CM for seed={seed} fold={fold_id} SR={SR}: {e}")

                # ROC & PR curves
                try:
                    from sklearn.preprocessing import label_binarize
                    from sklearn.metrics import roc_curve, auc, roc_auc_score, precision_recall_curve, average_precision_score

                    try:
                        state = torch.load(current_best_ckpt_path, map_location=device)
                        if isinstance(state, dict) and 'model_state' in state:
                            model.load_state_dict(state['model_state'])
                        else:
                            model.load_state_dict(state)
                    except Exception:
                        pass
                    model.eval()

                    y_true_list, y_score_chunks = [], []
                    with torch.no_grad():
                        for wav, y in val_loader:
                            wav = wav.to(device, non_blocking=NON_BLOCK).float()
                            feats = _compute_feats_gpu(wav)
                            logits = model(feats)
                            probs = torch.softmax(logits, dim=1).detach().cpu().numpy()
                            y_score_chunks.append(probs)
                            y_true_list.extend(y.numpy().tolist())

                    y_true_np = np.array(y_true_list, dtype=np.int64)
                    y_score_np = np.vstack(y_score_chunks) if y_score_chunks else np.zeros((0, len(SELECTED_CLASSES)))
                    n_classes = len(SELECTED_CLASSES)
                    if y_score_np.shape[0] > 0:
                        y_bin = label_binarize(y_true_np, classes=list(range(n_classes)))
                        fpr, tpr, roc_auc = {}, {}, {}
                        for i in range(n_classes):
                            fpr[i], tpr[i], _ = roc_curve(y_bin[:, i], y_score_np[:, i])
                            roc_auc[i] = auc(fpr[i], tpr[i])
                        fpr['micro'], tpr['micro'], _ = roc_curve(y_bin.ravel(), y_score_np.ravel())
                        roc_auc['micro'] = auc(fpr['micro'], tpr['micro'])
                        roc_auc_macro = roc_auc_score(y_bin, y_score_np, average='macro', multi_class='ovr')

                        plt.figure(figsize=(8,6))
                        for i, name in enumerate(SELECTED_CLASSES):
                            plt.plot(fpr[i], tpr[i], label=f"{name} (AUC={roc_auc[i]:.3f})")
                        plt.plot([0,1],[0,1], 'k--', alpha=0.4)
                        plt.plot(fpr['micro'], tpr['micro'], linestyle='--', label=f"micro (AUC={roc_auc['micro']:.3f})")
                        plt.xlabel("False Positive Rate"); plt.ylabel("True Positive Rate"); plt.title("ROC Curves (OvR)")
                        plt.legend(); plt.tight_layout(); plt.savefig(os.path.join(OUT_SUB, "roc_curve.png")); plt.close()

                        auc_rows = ([{'class': SELECTED_CLASSES[i], 'roc_auc': float(roc_auc[i])} for i in range(n_classes)] +
                                    [{'class': 'micro', 'roc_auc': float(roc_auc['micro'])},
                                     {'class': 'macro', 'roc_auc': float(roc_auc_macro)}])
                        pd.DataFrame(auc_rows).to_csv(os.path.join(OUT_SUB, "roc_auc_summary.csv"), index=False)

                        precision, recall, ap = {}, {}, {}
                        for i in range(n_classes):
                            precision[i], recall[i], _ = precision_recall_curve(y_bin[:, i], y_score_np[:, i])
                            ap[i] = average_precision_score(y_bin[:, i], y_score_np[:, i])
                        precision['micro'], recall['micro'], _ = precision_recall_curve(y_bin.ravel(), y_score_np.ravel())
                        ap_micro = average_precision_score(y_bin, y_score_np, average='micro')
                        ap_macro = average_precision_score(y_bin, y_score_np, average='macro')

                        plt.figure(figsize=(8,6))
                        for i, name in enumerate(SELECTED_CLASSES):
                            plt.plot(recall[i], precision[i], label=f"{name} (AP={ap[i]:.3f})")
                        plt.plot(recall['micro'], precision['micro'], linestyle='--', label=f"micro (AP={ap_micro:.3f})")
                        plt.xlabel("Recall"); plt.ylabel("Precision"); plt.title("Precision-Recall Curves (OvR)")
                        plt.legend(); plt.tight_layout(); plt.savefig(os.path.join(OUT_SUB, "pr_curve.png")); plt.close()

                        ap_rows = ([{'class': SELECTED_CLASSES[i], 'average_precision': float(ap[i])} for i in range(n_classes)] +
                                   [{'class': 'micro', 'average_precision': float(ap_micro)},
                                    {'class': 'macro', 'average_precision': float(ap_macro)}])
                        pd.DataFrame(ap_rows).to_csv(os.path.join(OUT_SUB, "pr_ap_summary.csv"), index=False)
                except Exception as e:
                    print(f"[WARN] Failed to compute PR/ROC curves for seed={seed} fold={fold_id} SR={SR}: {e}")

                total_time = float(hist_df["epoch_time_sec"].sum()) if not hist_df.empty else 0.0
                avg_ep_time = float(hist_df["epoch_time_sec"].mean()) if not hist_df.empty else 0.0
                mean_thr = float(hist_df["throughput_samples_per_sec"].mean()) if not hist_df.empty else 0.0
                efficiency_rows_global.append({
                    "sr": SR,
                    "seed": seed,
                    "fold": fold_id,
                    "arch": ARCH,
                    "n_params": n_params,
                    "epochs": EPOCHS,
                    "total_time_sec": total_time,
                    "avg_epoch_time_sec": avg_ep_time,
                    "mean_throughput_samples_per_sec": mean_thr,
                })

                results.append({
                    "seed": seed,
                    "fold": fold_id,
                    "sr": int(SR),
                    "val_acc": float(best_val_acc),
                    "cm_path": cm_path,
                    "ckpt_path": current_best_ckpt_path,
                })

        for sr0 in SR_LIST:
            accs = [r["val_acc"] for r in results if r["seed"] == seed and r["sr"] == int(sr0)]
            if accs:
                summary_rows.append({
                    "seed": seed,
                    "sr": int(sr0),
                    "acc_mean_over_folds": float(np.mean(accs)),
                    "acc_std_over_folds":  float(np.std(accs)),
                    "n_folds": N_SPLITS
                })

if efficiency_rows_global:
    eff_df = pd.DataFrame(efficiency_rows_global)
    for SR in sorted(set(eff_df["sr"].tolist())):
        ROOT_OUT, OUT_FOLD, OUT_SUMM = _outdirs_for(SR)
        eff_df_sr = eff_df[eff_df["sr"] == int(SR)].copy()
        if not eff_df_sr.empty:
            eff_df_sr.to_csv(OUT_SUMM / f"{ARCH}_speed_efficiency_per_fold_sr{SR}.csv", index=False)
            agg = (
                eff_df_sr.groupby(["seed"]).agg({
                    "total_time_sec": "sum",
                    "avg_epoch_time_sec": "mean",
                    "mean_throughput_samples_per_sec": "mean",
                    "fold": "count"
                }).rename(columns={"fold": "n_folds"}).reset_index()
            )
            agg.to_csv(OUT_SUMM / f"{ARCH}_speed_efficiency_per_seed_sr{SR}.csv", index=False)

if not COMMIT_MODE:
    print("\nTraining selesai.")