In [None]:
from pathlib import Path
import zipfile, os
from google.colab import files

# Upload dataset.zip from your computer
uploaded = files.upload()   # <-- pick your dataset.zip here

# Unzip into /content/dataset
zip_path = list(uploaded.keys())[0]
extract_dir = Path("dataset")

with zipfile.ZipFile(zip_path, 'r') as zf:
    zf.extractall(extract_dir)

print("Extracted dataset to:", extract_dir)
for cls in extract_dir.iterdir():
    if cls.is_dir():
        print(cls.name, len(list(cls.glob("*.wav"))), "files")


Saving dataset.zip to dataset.zip
Extracted dataset to: dataset
dataset 0 files


In [None]:
import shutil

# Move dataset/dataset/* -> dataset/
inner = Path("dataset/dataset")
outer = Path("dataset")

if inner.exists():
    for sub in inner.iterdir():
        shutil.move(str(sub), outer)
    inner.rmdir()

print("Fixed structure:", list(outer.iterdir()))


Fixed structure: [PosixPath('dataset/fan_off'), PosixPath('dataset/noise'), PosixPath('dataset/lights_off'), PosixPath('dataset/wakeword'), PosixPath('dataset/fan_on'), PosixPath('dataset/lights_on')]


In [None]:
import numpy as np, wave

SAMPLE_RATE = 16000
WIN_LENGTH  = int(0.025 * SAMPLE_RATE)   # 25 ms
HOP_LENGTH  = int(0.010 * SAMPLE_RATE)   # 10 ms
N_FFT       = 512
N_MELS      = 40
N_MFCC      = 13

def load_wav_mono_16k(path: Path):
    with wave.open(str(path), 'rb') as wf:
        assert wf.getframerate() == SAMPLE_RATE, f"Expected {SAMPLE_RATE}, got {wf.getframerate()}"
        assert wf.getnchannels() == 1, "Audio must be mono"
        raw = wf.readframes(wf.getnframes())
        data = np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32768.0
        return data

# MFCC helpers (same as before)
def hz_to_mel(f): return 2595 * np.log10(1 + f/700)
def mel_to_hz(m): return 700 * (10**(m/2595) - 1)

def mel_filterbank(sr, n_fft, n_mels, fmin=20, fmax=None):
    if fmax is None: fmax = sr/2
    mels = np.linspace(hz_to_mel(fmin), hz_to_mel(fmax), n_mels+2)
    hz = mel_to_hz(mels)
    bins = np.floor((n_fft//2 + 1) * hz / (sr/2)).astype(int)
    fb = np.zeros((n_mels, n_fft//2 + 1))
    for m in range(1, n_mels+1):
        f_m_minus, f_m, f_m_plus = bins[m-1], bins[m], bins[m+1]
        for k in range(f_m_minus, f_m):
            fb[m-1, k] = (k - f_m_minus) / (f_m - f_m_minus)
        for k in range(f_m, f_m_plus):
            fb[m-1, k] = (f_m_plus - k) / (f_m_plus - f_m)
    return fb

def dct_matrix(M, N):
    k = np.arange(N)
    m = np.arange(M)[:, None]
    d = np.sqrt(2.0/N) * np.cos(np.pi*(k+0.5)*m/N)
    d[0] *= 1/np.sqrt(2)
    return d

MEL_FB = mel_filterbank(SAMPLE_RATE, N_FFT, N_MELS)
DCT = dct_matrix(N_MFCC, N_MELS)

def framing(y, win_length, hop_length):
    n = len(y)
    num_frames = 1 + (n - win_length)//hop_length if n >= win_length else 1
    frames = np.zeros((num_frames, win_length))
    for i in range(num_frames):
        start = i*hop_length
        end = start+win_length
        seg = np.zeros(win_length)
        seg[:min(win_length, n-start)] = y[start:min(n, end)]
        frames[i] = seg
    return frames

def mfcc_from_signal(y):
    y = np.append(y[0], y[1:] - 0.97*y[:-1])  # pre-emphasis
    frames = framing(y, WIN_LENGTH, HOP_LENGTH)
    frames *= np.hanning(WIN_LENGTH)
    spec = np.fft.rfft(frames, n=N_FFT)
    mag = np.abs(spec)
    mel = np.dot(mag, MEL_FB.T)
    logmel = np.log(np.maximum(mel, 1e-10))
    mfcc = np.dot(logmel, DCT.T)
    return mfcc.astype(np.float32)

# Build database
from collections import defaultdict
DATASET_DIR = Path("dataset")
class_to_files = defaultdict(list)
for cls_dir in DATASET_DIR.iterdir():
    if cls_dir.is_dir():
        for wav in cls_dir.glob("*.wav"):
            class_to_files[cls_dir.name].append(wav)

mfcc_db = {}
for cls, files in class_to_files.items():
    feats = [mfcc_from_signal(load_wav_mono_16k(p)) for p in files]
    mfcc_db[cls] = feats

print("Classes loaded:", {k: len(v) for k,v in mfcc_db.items()})


Classes loaded: {'lights_off': 60, 'wakeword': 60, 'lights_on': 60}


In [None]:
def dtw_distance_band(A, B, band=None):
    T1, F = A.shape
    T2, _ = B.shape
    INF = 1e30
    D = np.full((T1+1, T2+1), INF, dtype=np.float32)
    D[0,0] = 0.0
    for i in range(1, T1+1):
        j_lo = 1
        j_hi = T2
        if band is not None:
            j_lo = max(1, i - band)
            j_hi = min(T2, i + band)
        ai = A[i-1]
        for j in range(j_lo, j_hi+1):
            bj = B[j-1]
            cost = np.linalg.norm(ai - bj)
            D[i, j] = cost + min(D[i-1,j], D[i,j-1], D[i-1,j-1])
    return float(D[T1, T2])

def select_k_medoids_dtw(mfcc_list, k=10, band=10, seed=1337):
    rng = np.random.default_rng(seed)
    n = len(mfcc_list)
    k = min(k, n)
    D = np.zeros((n, n), dtype=np.float32)
    for i in range(n):
        for j in range(i+1, n):
            d = dtw_distance_band(mfcc_list[i], mfcc_list[j], band=band)
            D[i, j] = D[j, i] = d
    avg = np.mean(D, axis=1)
    medoids = [int(np.argmin(avg))]
    while len(medoids) < k:
        best_idx, best_gain = None, -1e30
        for c in range(n):
            if c in medoids: continue
            gain = 0.0
            for i in range(n):
                curr = min(D[i, m] for m in medoids)
                newc = min(curr, D[i, c])
                gain += (curr - newc)
            if gain > best_gain:
                best_gain, best_idx = gain, c
        medoids.append(best_idx)
    medoids.sort()
    return medoids, D


In [None]:
KEEP_PER_CLASS = 10
BAND = 10

selected_idxs_by_class = {}
for cls, mfcc_list in mfcc_db.items():
    idxs, _ = select_k_medoids_dtw(mfcc_list, k=KEEP_PER_CLASS, band=BAND)
    selected_idxs_by_class[cls] = idxs
    print(f"{cls}: medoids {idxs}")


lights_off: medoids [0, 2, 4, 14, 15, 24, 25, 30, 31, 40]
wakeword: medoids [0, 11, 18, 20, 22, 36, 40, 47, 48, 54]
lights_on: medoids [3, 4, 5, 11, 12, 15, 32, 40, 44, 55]


In [None]:
EXPORT_PATH = "kws_templates.h"

def quantize_mfcc(M):
    s = float(np.max(np.abs(M)) + 1e-6)
    Q = (M / s * 32767.0).astype(np.int16)
    return Q, s

class_names = sorted(mfcc_db.keys())
templates_meta, blob, scales = [], [], []
offset = 0

for ci, cls in enumerate(class_names):
    mfcc_list = mfcc_db[cls]
    for i in selected_idxs_by_class[cls]:
        T = mfcc_list[i]
        Q, s = quantize_mfcc(T)
        h, w = Q.shape
        flat = Q.ravel().tolist()
        blob.extend(flat)
        templates_meta.append((ci, h, w, offset, len(flat)))
        scales.append(s)
        offset += len(flat)

with open(EXPORT_PATH, "w") as f:
    f.write("// Auto-generated: MFCC templates (medoids) for ESP32 DTW\n")
    f.write("#pragma once\n#include <stdint.h>\n\n")
    f.write(f"#define KWS_N_MFCC {N_MFCC}\n")
    f.write(f"static const int KWS_NUM_CLASSES = {len(class_names)};\n")
    f.write(f"static const int KWS_NUM_TEMPLATES = {len(templates_meta)};\n\n")

    f.write("static const int8_t KWS_TEMPL_CLASS_IDX[] = {")
    f.write(",".join(str(m[0]) for m in templates_meta))
    f.write("};\n")

    f.write("\nstatic const uint16_t KWS_TEMPL_FRAMES[] = {")
    f.write(",".join(str(m[1]) for m in templates_meta))
    f.write("};\n")

    f.write("\nstatic const uint16_t KWS_TEMPL_COEFFS[] = {")
    f.write(",".join(str(m[2]) for m in templates_meta))
    f.write("};\n")

    f.write("\nstatic const uint32_t KWS_TEMPL_OFFSET[] = {")
    f.write(",".join(str(m[3]) for m in templates_meta))
    f.write("};\n")

    f.write("\nstatic const uint32_t KWS_TEMPL_LENGTH[] = {")
    f.write(",".join(str(m[4]) for m in templates_meta))
    f.write("};\n")

    f.write("\nstatic const float KWS_TEMPL_SCALE[] = {")
    f.write(",".join(f"{s:.8g}" for s in scales))
    f.write("};\n")

    f.write(f"\nstatic const int16_t KWS_TEMPL_DATA[{len(blob)}] = {{")
    f.write(",".join(str(v) for v in blob))
    f.write("};\n")

print("Exported:", EXPORT_PATH)


Exported: kws_templates.h


In [None]:
from google.colab import files
files.download("kws_templates.h")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
# === Cell 7: Quick evaluation on held-out clips (vs. medoids) ===
import numpy as np

BAND = 10  # same Sakoe–Chiba band you used for medoid selection

# Build template list: (class_idx, MFCC)
templ = []
for ci, cls in enumerate(class_names):
    for idx in selected_idxs_by_class[cls]:
        templ.append((ci, mfcc_db[cls][idx]))

# Simple classifier: best DTW (optionally normalized by path length)
def dtw_norm(A, B):
    D = dtw_distance_band(A, B, band=BAND)
    # normalize by total path length to stabilize across durations
    return D / (A.shape[0] + B.shape[0])

def predict_mfcc(mfcc):
    best_c, best_d = None, 1e30
    for ci, T in templ:
        d = dtw_norm(mfcc, T)
        if d < best_d:
            best_d, best_c = d, ci
    return best_c, best_d

# Split: medoids = "train", everything else = "test"
y_true, y_pred, dists = [], [], []
cls_to_d = {ci: [] for ci in range(len(class_names))}  # distance distribution per true class

for ci, cls in enumerate(class_names):
    n = len(mfcc_db[cls])
    medoid_set = set(selected_idxs_by_class[cls])
    test_indices = [i for i in range(n) if i not in medoid_set]  # held-out
    for i in test_indices:
        y_true.append(ci)
        pred, dist = predict_mfcc(mfcc_db[cls][i])
        y_pred.append(pred)
        dists.append(dist)
        cls_to_d[ci].append(dist)

y_true = np.array(y_true)
y_pred = np.array(y_pred)
dists = np.array(dists)

# Confusion matrix
num_classes = len(class_names)
cm = np.zeros((num_classes, num_classes), dtype=int)
for t, p in zip(y_true, y_pred):
    cm[t, p] += 1

# Accuracy per class and overall
per_cls_acc = (cm.diagonal() / cm.sum(axis=1).clip(min=1))
overall_acc = (cm.diagonal().sum() / cm.sum().clip(min=1))

print("Classes:", class_names)
print("\nConfusion matrix (rows=true, cols=pred):\n", cm)
print("\nPer-class accuracy:")
for ci, cls in enumerate(class_names):
    print(f"  {cls:>12s}: {per_cls_acc[ci]*100:5.1f}%  (n={cm[ci].sum()})")
print(f"\nOverall accuracy: {overall_acc*100:5.1f}%  on {cm.sum()} test clips")

# Distance stats to help choose thresholds later
print("\nDistance stats by TRUE class (normalized DTW):")
for ci, cls in enumerate(class_names):
    arr = np.array(cls_to_d[ci])
    if arr.size:
        print(f"  {cls:>12s}: mean={arr.mean():.2f}  median={np.median(arr):.2f}  min={arr.min():.2f}  max={arr.max():.2f}")
    else:
        print(f"  {cls:>12s}: (no held-out clips)")


Classes: ['lights_off', 'lights_on', 'wakeword']

Confusion matrix (rows=true, cols=pred):
 [[43  6  1]
 [ 2 47  1]
 [ 0  3 47]]

Per-class accuracy:
    lights_off:  86.0%  (n=50)
     lights_on:  94.0%  (n=50)
      wakeword:  94.0%  (n=50)

Overall accuracy:  91.3%  on 150 test clips

Distance stats by TRUE class (normalized DTW):
    lights_off: mean=2.98  median=2.83  min=1.79  max=5.66
     lights_on: mean=2.74  median=2.52  min=1.60  max=4.86
      wakeword: mean=2.91  median=2.62  min=1.51  max=7.67
