In [None]:
# ASR Commands (mini_speech_commands) â€“ training + evaluation notebook
# Mirrors the end-to-end style of sentiment_embeddings/colab_training.ipynb.

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
import sys
import json
import math
from collections import Counter

import numpy as np
import pandas as pd

import torch


def find_repo_root(start: Path) -> Path:
    cur = start.resolve()
    for p in [cur, *cur.parents]:
        if (p / "data_ingestion").exists() and (p / "utils").exists():
            return p
    return cur


ROOT = find_repo_root(Path())
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))

from utils.paths import CACHE_PATH  # noqa: E402

CACHE = CACHE_PATH
PIPELINE_CACHE = CACHE / "asr_commands"
RAW_DIR = PIPELINE_CACHE / "raw"
OUTPUTS_DIR = ROOT / "outputs" / "asr_commands"
OUTPUTS_DIR.mkdir(parents=True, exist_ok=True)

device = "cuda" if torch.cuda.is_available() else "cpu"
device

In [None]:
# Ensure dataset is present in cache (runs ingestion if needed)
mini_root = RAW_DIR / "mini_speech_commands"
if not mini_root.exists():
    print("mini_speech_commands not found in cache; running ingestion...")
    !python data_ingestion/asr_commands/run.py

assert mini_root.exists(), f"Missing: {mini_root}"

labels_path = PIPELINE_CACHE / "labels.json"
if labels_path.exists():
    labels = json.loads(labels_path.read_text(encoding="utf-8"))["labels"]
else:
    # Fallback: infer from directories
    labels = sorted([p.name for p in mini_root.iterdir() if p.is_dir()])

labels

In [None]:
# Collect WAV files and build deterministic train/val/test splits (stratified per label)
all_rows = []
for lbl in labels:
    wavs = sorted((mini_root / lbl).glob("*.wav"))
    for p in wavs:
        all_rows.append({"path": str(p), "label": lbl})

df = pd.DataFrame(all_rows)
df["label"].value_counts()

In [None]:
# Deterministic split: per label 80/10/10 (train/val/test)
rng = np.random.default_rng(42)
splits = {"train": [], "val": [], "test": []}

for lbl in labels:
    sub = df[df["label"] == lbl].copy()
    idx = sub.index.to_numpy()
    rng.shuffle(idx)
    n = len(idx)
    n_train = int(0.8 * n)
    n_val = int(0.1 * n)
    train_idx = idx[:n_train]
    val_idx = idx[n_train : n_train + n_val]
    test_idx = idx[n_train + n_val :]
    splits["train"].extend(df.loc[train_idx].to_dict(orient="records"))
    splits["val"].extend(df.loc[val_idx].to_dict(orient="records"))
    splits["test"].extend(df.loc[test_idx].to_dict(orient="records"))

{k: len(v) for k, v in splits.items()}

In [None]:
# (Optional) persist splits for reuse (matches project pipeline idea)
(OUTPUTS_DIR / "preprocessing").mkdir(parents=True, exist_ok=True)
splits_path = OUTPUTS_DIR / "preprocessing" / "splits.json"
splits_payload = {"labels": labels, "splits": splits}
splits_path.write_text(json.dumps(splits_payload, indent=2), encoding="utf-8")

splits_path

In [None]:
# Audio feature pipeline (torchaudio log-mel) + Dataset
try:
    import torchaudio
except Exception as e:
    raise RuntimeError(
        "torchaudio is required for this notebook. Install it in your env (uv/pip) and restart kernel.\n"
        f"Original import error: {e}"
    )


label_to_id = {lbl: i for i, lbl in enumerate(labels)}
id_to_label = {i: lbl for lbl, i in label_to_id.items()}


@dataclass(frozen=True)
class Example:
    path: Path
    label_id: int


def make_examples(items: list[dict]):
    out = []
    for it in items:
        out.append(Example(path=Path(it["path"]), label_id=label_to_id[it["label"]]))
    return out


train_examples = make_examples(splits["train"])
val_examples = make_examples(splits["val"])
test_examples = make_examples(splits["test"])

len(train_examples), len(val_examples), len(test_examples)

In [None]:
class SpeechCommandsDataset(torch.utils.data.Dataset):
    def __init__(self, *, examples: list[Example], sample_rate: int = 16000, clip_seconds: float = 1.0, n_mels: int = 64):
        self.examples = examples
        self.sample_rate = sample_rate
        self.num_samples = int(sample_rate * clip_seconds)

        self._resamplers: dict[int, torchaudio.transforms.Resample] = {}
        self.melspec = torchaudio.transforms.MelSpectrogram(
            sample_rate=sample_rate,
            n_fft=1024,
            hop_length=320,
            n_mels=n_mels,
        )
        self.to_db = torchaudio.transforms.AmplitudeToDB(stype="power")

    def __len__(self):
        return len(self.examples)

    def _resample(self, wav: torch.Tensor, orig_sr: int) -> torch.Tensor:
        if orig_sr == self.sample_rate:
            return wav
        if orig_sr not in self._resamplers:
            self._resamplers[orig_sr] = torchaudio.transforms.Resample(orig_freq=orig_sr, new_freq=self.sample_rate)
        return self._resamplers[orig_sr](wav)

    def _pad_or_trim(self, wav: torch.Tensor) -> torch.Tensor:
        # wav: (1, T)
        T = int(wav.shape[-1])
        if T == self.num_samples:
            return wav
        if T > self.num_samples:
            return wav[..., : self.num_samples]
        pad = self.num_samples - T
        return torch.nn.functional.pad(wav, (0, pad))

    def __getitem__(self, idx: int):
        ex = self.examples[idx]
        wav, sr = torchaudio.load(ex.path)
        if wav.shape[0] > 1:
            wav = wav.mean(dim=0, keepdim=True)
        wav = self._resample(wav, sr)
        wav = self._pad_or_trim(wav)

        spec = self.melspec(wav)          # (1, n_mels, time)
        spec = self.to_db(spec)
        spec = (spec - spec.mean()) / (spec.std() + 1e-6)
        return spec, ex.label_id


train_ds = SpeechCommandsDataset(examples=train_examples)
val_ds = SpeechCommandsDataset(examples=val_examples)
test_ds = SpeechCommandsDataset(examples=test_examples)

x0, y0 = train_ds[0]
x0.shape, y0, id_to_label[y0]

In [None]:
# Simple CNN baseline (like a compact "training stage")
class KwsCnn(torch.nn.Module):
    def __init__(self, n_classes: int):
        super().__init__()
        self.net = torch.nn.Sequential(
            torch.nn.Conv2d(1, 16, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(2),
            torch.nn.Conv2d(16, 32, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(2),
            torch.nn.Conv2d(32, 64, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.AdaptiveAvgPool2d((1, 1)),
            torch.nn.Flatten(),
            torch.nn.Linear(64, n_classes),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x)


def collate(batch):
    x = torch.stack([b[0] for b in batch], dim=0)
    y = torch.tensor([b[1] for b in batch], dtype=torch.long)
    return x, y


train_loader = torch.utils.data.DataLoader(train_ds, batch_size=64, shuffle=True, collate_fn=collate)
val_loader = torch.utils.data.DataLoader(val_ds, batch_size=128, shuffle=False, collate_fn=collate)
test_loader = torch.utils.data.DataLoader(test_ds, batch_size=128, shuffle=False, collate_fn=collate)

model = KwsCnn(n_classes=len(labels)).to(device)
opt = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
loss_fn = torch.nn.CrossEntropyLoss()

model

In [None]:
# Training loop (tracks best validation accuracy)
def accuracy_from_logits(logits: torch.Tensor, y: torch.Tensor) -> float:
    preds = torch.argmax(logits, dim=-1)
    return float((preds == y).float().mean().item())


epochs = 8
best_val_acc = -math.inf
history = []

best_dir = OUTPUTS_DIR / "best"
best_dir.mkdir(parents=True, exist_ok=True)
ckpt_path = best_dir / "kws_cnn.pt"
labels_out_path = best_dir / "labels.json"
labels_out_path.write_text(json.dumps({"labels": labels}, indent=2), encoding="utf-8")

for epoch in range(1, epochs + 1):
    model.train()
    train_losses, train_accs = [], []
    for x, y in train_loader:
        x = x.to(device)
        y = y.to(device)
        opt.zero_grad(set_to_none=True)
        logits = model(x)
        loss = loss_fn(logits, y)
        loss.backward()
        opt.step()
        train_losses.append(float(loss.item()))
        train_accs.append(accuracy_from_logits(logits.detach(), y))

    model.eval()
    val_losses, val_accs = [], []
    with torch.no_grad():
        for x, y in val_loader:
            x = x.to(device)
            y = y.to(device)
            logits = model(x)
            loss = loss_fn(logits, y)
            val_losses.append(float(loss.item()))
            val_accs.append(accuracy_from_logits(logits, y))

    rec = {
        "epoch": epoch,
        "train_loss": float(np.mean(train_losses)),
        "train_acc": float(np.mean(train_accs)),
        "val_loss": float(np.mean(val_losses)),
        "val_acc": float(np.mean(val_accs)),
    }
    history.append(rec)
    print(rec)

    if rec["val_acc"] > best_val_acc:
        best_val_acc = rec["val_acc"]
        torch.save(
            {
                "state_dict": model.state_dict(),
                "labels": labels,
                "sample_rate": 16000,
                "clip_seconds": 1.0,
                "n_mels": 64,
            },
            ckpt_path,
        )

train_metrics_path = OUTPUTS_DIR / "training" / "metrics.json"
train_metrics_path.parent.mkdir(parents=True, exist_ok=True)
train_metrics_path.write_text(json.dumps({"pipeline": "asr_commands", "history": history}, indent=2), encoding="utf-8")

ckpt_path, train_metrics_path

In [None]:
# Load best checkpoint and evaluate on the test split
ckpt = torch.load(ckpt_path, map_location="cpu")
model.load_state_dict(ckpt["state_dict"])
model.to(device)
model.eval()

y_true = []
y_pred = []
correct = 0
total = 0

with torch.no_grad():
    for x, y in test_loader:
        x = x.to(device)
        y = y.to(device)
        logits = model(x)
        preds = torch.argmax(logits, dim=-1)
        correct += int((preds == y).sum().item())
        total += int(y.numel())
        y_true.extend([int(v) for v in y.detach().cpu().tolist()])
        y_pred.extend([int(v) for v in preds.detach().cpu().tolist()])

test_acc = correct / total if total else 0.0
test_acc

In [None]:
# Confusion matrix + metrics export
cm = np.zeros((len(labels), len(labels)), dtype=np.int64)
for t, p in zip(y_true, y_pred):
    cm[int(t), int(p)] += 1

eval_metrics = {
    "pipeline": "asr_commands",
    "num_test": int(total),
    "accuracy": float(test_acc),
    "labels": labels,
    "confusion_matrix": cm.tolist(),
}

eval_dir = OUTPUTS_DIR / "evaluation"
eval_dir.mkdir(parents=True, exist_ok=True)
eval_metrics_path = eval_dir / "metrics.json"
eval_metrics_path.write_text(json.dumps(eval_metrics, indent=2), encoding="utf-8")

eval_metrics_path

In [None]:
# (Optional) Zip outputs for sharing (e.g., Colab -> download)
!zip -r asr_commands_outputs.zip outputs/asr_commands