In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import pathlib, os, subprocess, json, datetime, shutil

REPO_URL   = "https://github.com/SzymonSmagowski/DeepLearningCourse.git"
BRANCH     = "main"
DATA_IN_DRIVE = "data/speech_commands_v0.01"

ROOT_DRIVE   = pathlib.Path("/content/drive/MyDrive")
DATA_SRC     = ROOT_DRIVE / DATA_IN_DRIVE
DATA_DST     = pathlib.Path("/content/data")                        # where the script expects it
REPO_DIR     = pathlib.Path("/content") / pathlib.Path(REPO_URL).stem
LOGS_DRIVE   = ROOT_DRIVE / "speech_runs"      # all logs live here

print("Repo dir :", REPO_DIR)
print("Data src :", DATA_SRC)
print("Data dst :", DATA_DST)
print("Logs dir :", LOGS_DRIVE)

if not REPO_DIR.exists():
    !git clone -b "$BRANCH" "$REPO_URL" "$REPO_DIR"
else:
    %cd $REPO_DIR
    !git pull origin "$BRANCH"
    %cd -

Repo dir : /content/DeepLearningCourse
Data src : /content/drive/MyDrive/data/speech_commands_v0.01
Data dst : /content/data
Logs dir : /content/drive/MyDrive/speech_runs
Cloning into '/content/DeepLearningCourse'...
remote: Enumerating objects: 1330, done.[K
remote: Counting objects: 100% (1330/1330), done.[K
remote: Compressing objects: 100% (1197/1197), done.[K
remote: Total 1330 (delta 162), reused 1290 (delta 129), pack-reused 0 (from 0)[K
Receiving objects: 100% (1330/1330), 28.64 MiB | 16.34 MiB/s, done.
Resolving deltas: 100% (162/162), done.


In [3]:
# -----------------------------------------------------------
# 📦  Get Speech-Commands into /content/data   (idempotent)
#
#   1. If it’s already in /content            →  nothing to do
#   2. Else if Drive has an extracted folder  →  symlink (or copy)
#   3. Else if Drive only has *.tar.gz        →  copy tar, untar locally
# -----------------------------------------------------------
import tarfile, time, shutil, os
from pathlib import Path

# --- paths --------------------------------------------------
ROOT_DRIVE  = Path("/content/drive/MyDrive/data")           # 🔁 adjust if needed
DATA_NAME   = "speech_commands_v0.01"                  # folder inside tar
DATA_DST    = Path("/content/data")                    # where scripts look
DATA_SRC    = ROOT_DRIVE / DATA_NAME                   # extracted in Drive
DATA_TAR    = ROOT_DRIVE / f"{DATA_NAME}.tar.gz"       # compressed in Drive
LOCAL_TAR   = Path("/content") / DATA_TAR.name         # temp copy

DATA_DST.mkdir(parents=True, exist_ok=True)            # ensures /content/data

# full path once extracted in Colab
LOCAL_DATA = DATA_DST / DATA_NAME

# --- logic --------------------------------------------------
if LOCAL_DATA.exists():
    print(f"✓ dataset already present at {LOCAL_DATA}")

elif DATA_SRC.exists():                                # extracted on Drive
    try:
        LOCAL_DATA.symlink_to(DATA_SRC, target_is_directory=True)
        print(f"🔗  Symlinked {DATA_SRC} → {LOCAL_DATA}")
    except Exception as e:
        print(f"Symlink failed ({e.__class__.__name__}); copying …")
        t0 = time.time()
        shutil.copytree(DATA_SRC, LOCAL_DATA, dirs_exist_ok=True)
        print(f"✓ copied in {time.time()-t0:.1f}s")

elif DATA_TAR.exists():                                # only tar on Drive
    print("📦  Found tarball in Drive — copying locally …")
    if not LOCAL_TAR.exists():
        shutil.copy2(DATA_TAR, LOCAL_TAR)
        sz = LOCAL_TAR.stat().st_size / 1_048_576
        print(f"   → {sz:.1f} MB copied")

    print("🗜️   Extracting …")
    t0 = time.time()
    with tarfile.open(LOCAL_TAR, "r:gz") as tf:
        tf.extractall(path=DATA_DST)
    print(f"✓ extracted in {time.time()-t0:.1f}s → {LOCAL_DATA}")

    LOCAL_TAR.unlink()                # optional: keep workspace tidy

else:
    raise FileNotFoundError(
        "Dataset not found!\n"
        f"Looked for either:\n  • {DATA_SRC}\n  • {DATA_TAR}"
    )

📦  Found tarball in Drive — copying locally …
   → 1420.1 MB copied
🗜️   Extracting …
✓ extracted in 29.8s → /content/data/speech_commands_v0.01


In [4]:
!pip install -q torch torchaudio librosa soundfile scikit-learn tqdm matplotlib pandas seaborn

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m123.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m98.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m65.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.3/56.3 MB[0m [31m42.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m127.9/127.9 MB[0m [31m19.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [5]:
import copy, json, time, re
from datetime import datetime
from pathlib import Path

import numpy as np
import torch
from sklearn.metrics import confusion_matrix

from DeepLearningCourse.project_2.helpers.model_runner_utils import (
    set_seed, get_device, train_epoch, validate, test_model
)
from DeepLearningCourse.project_2.helpers.speech_datasets import (
    get_task1_dataloaders, get_task2_dataloaders,
    get_task3_dataloaders, get_task4_dataloaders
)
import DeepLearningCourse.project_2.helpers.models as models

In [6]:
CFG_T2 = dict(n_mfcc=40, d_model=64,  num_heads=2,  num_layers=2,
              kernel_size=31, expansion_factor=2, dropout=0.1, pool='mean',
              optimizer='adamw', learning_rate=5e-4, weight_decay=0.01,
              epochs=50, scheduler='cosine', scheduler_step_size=10, scheduler_gamma=0.5)

CFG_T3 = dict(n_mfcc=40, d_model=192, num_heads=6,  num_layers=6,
              kernel_size=15, expansion_factor=4, dropout=0.1, pool='mean',
              optimizer='adamw', learning_rate=5e-4, weight_decay=0.01,
              epochs=50, scheduler='cosine', scheduler_step_size=10, scheduler_gamma=0.5)

CFG_T4 = dict(n_mfcc=40, d_model=256, num_heads=8,  num_layers=6,
              kernel_size=15, expansion_factor=4, dropout=0.1, pool='mean',
              optimizer='adamw', learning_rate=5e-4, weight_decay=0.01,
              epochs=50, scheduler='cosine', scheduler_step_size=10, scheduler_gamma=0.5)


In [16]:
def to_idx_name(id2lbl_raw):
    """
    Return *idx→name* dict regardless of original layout.

    • If keys are ints and values are str  → return as-is.
    • If keys are str  and values are int → invert.
    • If both keys & values are int       → map {0: 'class0', 1:'class1', ...}.
      (We fabricate names 'class0', 'class1', … so keyword look-ups will fail
       and the numeric fallback kicks in.)
    """
    k0 = next(iter(id2lbl_raw))
    if isinstance(k0, int) and isinstance(id2lbl_raw[k0], str):
        return id2lbl_raw                           # shape A
    if isinstance(k0, str) and isinstance(id2lbl_raw[k0], int):
        return {v: k for k, v in id2lbl_raw.items()}  # shape B
    # shape C
    return {k: f"class{k}" for k in id2lbl_raw}

import re

def find_label_index(idx2name: dict, *keywords, assume_zero_negative=True):
    """
    idx2name is the *normalised* int→str mapping.

    First try regex match (case-insensitive) on each keyword.
    If nothing matches and names look like 'class0', 'class1', …
    we fall back to smallest / largest index depending on the keyword intent.
    """
    for kw in keywords:
        pat = re.compile(kw, re.I)
        for idx, name in idx2name.items():
            if pat.search(name):
                return idx

    # fallback when names are uninstructive
    ints = sorted(idx2name)
    if assume_zero_negative:
        if any(kw.lower() in ("silence", "background", "unknown") for kw in keywords):
            return ints[0]
        return ints[-1]
    else:
        if any(kw.lower() in ("silence", "background", "unknown") for kw in keywords):
            return ints[-1]
        return ints[0]

In [17]:
def build_model(cfg, n_classes):
    return models.ConformerClassifier(
        n_classes=n_classes,
        n_mfcc=cfg['n_mfcc'],
        d_model=cfg['d_model'],
        num_layers=cfg['num_layers'],
        num_heads=cfg['num_heads'],
        kernel_size=cfg['kernel_size'],
        expansion_factor=cfg['expansion_factor'],
        dropout=cfg['dropout'],
        pool=cfg['pool'],
    )

def build_optim_sched(model, cfg):
    opt = torch.optim.AdamW(model.parameters(),
                            lr=cfg['learning_rate'],
                            weight_decay=cfg['weight_decay'])
    sch = torch.optim.lr_scheduler.CosineAnnealingLR(
        opt, T_max=cfg['epochs'], eta_min=1e-6)
    return opt, sch

def train_specialist(cfg, loaders, name, early_stop=5):
    tr, val, te, id2lbl = loaders
    model = build_model(cfg, len(id2lbl)).to(DEVICE)
    crit  = torch.nn.CrossEntropyLoss()
    opt, sch = build_optim_sched(model, cfg)

    best_val, patience, best_w = 0.0, 0, None
    for ep in range(1, cfg['epochs']+1):
        tl, ta = train_epoch(model, DEVICE, tr, crit, opt)
        vl, va = validate(model, DEVICE, val, crit)
        sch.step()
        if va > best_val:
            best_val, best_w, patience = va, copy.deepcopy(model.state_dict()), 0
        else:
            patience += 1
            if patience >= early_stop: break
        print(f"{name:6s}  [{ep:02d}]  train {ta:.3%} / val {va:.3%}")
    model.load_state_dict(best_w)
    te_acc, te_cm = test_model(model, DEVICE, te, id2lbl)
    print(f"{name} test_acc = {te_acc:.3%}")
    return model, te_acc, te_cm, id2lbl

def chain_predict(m2, m3, m4, loader,
                  raw2, raw3, raw4, raw1):
    # ---------- normalise every mapping ----------
    id2l2 = to_idx_name(raw2)
    id2l3 = to_idx_name(raw3)
    id2l4 = to_idx_name(raw4)
    id2l1 = to_idx_name(raw1)

    # ---------- locate key class indices ----------
    idx_sil = find_label_index(id2l2, "silence", "background")
    idx_cmd = [i for i in id2l2 if i != idx_sil][0]

    idx_known = find_label_index(id2l3, "known", "command")
    idx_unknown = [i for i in id2l3 if i != idx_known][0]

    lbl_sil = find_label_index(id2l1, "silence", "background")
    lbl_un  = find_label_index(id2l1, "unknown")

    name_to_t1idx = {name: idx for idx, name in id2l1.items()}

    # --------------- forward pass ---------------
    preds, tgts = [], []
    with torch.no_grad():
        for x, y in loader:
            x = x.to(DEVICE)
            # stage-1
            c1 = torch.softmax(m2(x),1).argmax(1)
            mask_sil = (c1 == idx_sil)
            mask_cmd = ~mask_sil
            p = torch.full_like(y, lbl_sil)
            # stage-2
            if mask_cmd.any():
                x2 = x[mask_cmd]
                c2 = torch.softmax(m3(x2),1).argmax(1)
                mask_known = (c2 == idx_known)
                mask_unk   = ~mask_known
                sel = mask_cmd.nonzero(as_tuple=True)[0]
                p[sel[mask_unk]] = lbl_un
                # stage-3
                if mask_known.any():
                    x3 = x2[mask_known]
                    c3 = torch.softmax(m4(x3),1).argmax(1)
                    final = torch.tensor(
                        [name_to_t1idx[id2l4[int(i)]] for i in c3])
                    p[sel[mask_known]] = final
            preds.append(p.cpu())
            tgts.append(y)
    preds = torch.cat(preds)
    tgts  = torch.cat(tgts)
    acc = (preds == tgts).float().mean().item()
    cm  = confusion_matrix(tgts, preds, labels=list(id2l1.keys()))
    return acc, cm

In [8]:
known10 = ['yes','no','up','down','left','right','on','off','stop','go']

DATA_DIR = '/content/data'
BATCH_SIZE = 256
SEED = 0

load_t1 = get_task1_dataloaders(data_dir=DATA_DIR, known_commands=known10, batch_size=BATCH_SIZE, seed=SEED)
load_t2 = get_task2_dataloaders(data_dir=DATA_DIR, batch_size=BATCH_SIZE, seed=SEED)
load_t3 = get_task3_dataloaders(data_dir=DATA_DIR, known_commands=known10, batch_size=BATCH_SIZE, seed=SEED)
load_t4 = get_task4_dataloaders(data_dir=DATA_DIR, known_commands=known10, batch_size=BATCH_SIZE, seed=SEED)

loading train:   0%|          | 0/51088 [00:00<?, ?it/s]

loading val:   0%|          | 0/6798 [00:00<?, ?it/s]

loading test:   0%|          | 0/6835 [00:00<?, ?it/s]

loading train:   0%|          | 0/51088 [00:00<?, ?it/s]

loading val:   0%|          | 0/6798 [00:00<?, ?it/s]

loading test:   0%|          | 0/6835 [00:00<?, ?it/s]

loading train:   0%|          | 0/51088 [00:00<?, ?it/s]

loading val:   0%|          | 0/6798 [00:00<?, ?it/s]

loading test:   0%|          | 0/6835 [00:00<?, ?it/s]

loading train:   0%|          | 0/18538 [00:00<?, ?it/s]

loading val:   0%|          | 0/2577 [00:00<?, ?it/s]

loading test:   0%|          | 0/2567 [00:00<?, ?it/s]

In [None]:
def get_device():
    """Set up device (MPS for Mac, CUDA for NVIDIA, or CPU)"""
    if torch.backends.mps.is_available() and torch.backends.mps.is_built():
        device = torch.device("mps")
        print("Using MPS (Apple Silicon GPU)")
    elif torch.cuda.is_available():
        device = torch.device("cuda")
        print("Using CUDA GPU")
    else:
        device = torch.device("cpu")
        print("Using CPU")
    return device

In [10]:
DEVICE = get_device()
m2, acc2, cm2, id2l2 = train_specialist(CFG_T2, load_t2, "task2")
m3, acc3, cm3, id2l3 = train_specialist(CFG_T3, load_t3, "task3")
m4, acc4, cm4, id2l4 = train_specialist(CFG_T4, load_t4, "task4")
print("individual test accuracies:", acc2, acc3, acc4)

Using CUDA GPU
task2   [01]  train 72.636% / val 94.713%
task2   [02]  train 96.163% / val 99.404%
task2   [03]  train 99.794% / val 100.000%
task2   [04]  train 99.817% / val 99.696%
task2   [05]  train 99.909% / val 98.263%
task2   [06]  train 99.886% / val 99.746%
task2   [07]  train 100.000% / val 99.442%
task2 test_acc = 99.773%
task3   [01]  train 76.527% / val 89.644%
task3   [02]  train 93.629% / val 92.101%
task3   [03]  train 96.014% / val 95.175%
task3   [04]  train 96.882% / val 94.940%
task3   [05]  train 97.343% / val 96.072%
task3   [06]  train 98.020% / val 95.558%
task3   [07]  train 98.220% / val 96.175%
task3   [08]  train 98.438% / val 95.690%
task3   [09]  train 98.643% / val 96.249%
task3   [10]  train 98.829% / val 96.558%
task3   [11]  train 98.918% / val 96.881%
task3   [12]  train 98.951% / val 96.720%
task3   [13]  train 99.078% / val 96.470%
task3   [14]  train 99.037% / val 96.705%
task3   [15]  train 99.261% / val 95.587%
task3   [16]  train 99.172% / val 

In [18]:
_, _, test_t1, id2l1 = load_t1
chain_acc, chain_cm  = chain_predict(
    m2, m3, m4, test_t1,
    id2l2, id2l3, id2l4, id2l1)

print(f"Composite accuracy on Task-1 test-set: {chain_acc:.3%}")

Composite accuracy on Task-1 test-set: 96.333%


In [19]:
RESULTS_DIR = pathlib.Path("/content/drive/MyDrive/speech_runs")
RESULTS_DIR.mkdir(parents=True, exist_ok=True)

stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
out_json = RESULTS_DIR / f"chain_result_{stamp}.json"
with out_json.open("w") as f:
    json.dump({
        "specialists": [
            {"task": 2, "config": CFG_T2, "test_acc": acc2},
            {"task": 3, "config": CFG_T3, "test_acc": acc3},
            {"task": 4, "config": CFG_T4, "test_acc": acc4},
        ],
        "chain_test_acc": chain_acc,
        "chain_confusion_matrix": chain_cm.tolist(),
    }, f, indent=2)
print("results saved to", out_json)

results saved to /content/drive/MyDrive/speech_runs/chain_result_20250505_234217.json
