# Train Custom "Claudinho" Wake Word Model

Trains an openWakeWord-compatible model using **real voice recordings**.

**Runtime: GPU** (Runtime > Change runtime type > T4 GPU)

Total time: ~15-20 minutes

In [None]:
# === 1. SETUP ===
!pip install -q openwakeword onnxruntime numpy scipy torch

# Get the voice recordings
!git clone -q https://github.com/claudinhocoding/claudinho.git claudinho_repo

# Download pre-computed negative features
!wget -q https://huggingface.co/datasets/davidscripka/openwakeword_features/resolve/main/openwakeword_features_ACAV100M_2000_hrs_16bit.npy
!wget -q https://huggingface.co/datasets/davidscripka/openwakeword_features/resolve/main/validation_set_features.npy

# Download embedding models
import openwakeword, os
oww_dir = os.path.join(os.path.dirname(openwakeword.__file__), 'resources', 'models')
os.makedirs(oww_dir, exist_ok=True)
!wget -q https://github.com/dscripka/openWakeWord/releases/download/v0.5.1/melspectrogram.onnx -O {oww_dir}/melspectrogram.onnx
!wget -q https://github.com/dscripka/openWakeWord/releases/download/v0.5.1/embedding_model.onnx -O {oww_dir}/embedding_model.onnx

import glob
samples = sorted(glob.glob('claudinho_repo/training/positive/claudinho/*.wav'))
print(f'\n=== Setup complete: {len(samples)} voice samples found ===')

In [None]:
# === 2. EXTRACT FEATURES FROM RECORDINGS ===
import numpy as np
import scipy.io.wavfile
import scipy.signal
import onnxruntime as ort
from pathlib import Path

mel_session = ort.InferenceSession(os.path.join(oww_dir, 'melspectrogram.onnx'))
emb_session = ort.InferenceSession(os.path.join(oww_dir, 'embedding_model.onnx'))

WINDOW = 1280      # 80ms at 16kHz
N_FRAMES = 76      # embedding model needs 76 mel frames
MIN_SAMPLES = N_FRAMES * WINDOW  # ~6.08s

def audio_to_features(audio):
    """Extract openWakeWord embeddings from 16kHz audio. Pads short clips."""
    audio = np.asarray(audio, dtype=np.float32)
    if np.abs(audio).max() > 1.5:  # likely int16
        audio = audio / 32768.0

    # Pad short clips with silence (centered)
    if len(audio) < MIN_SAMPLES:
        pad = MIN_SAMPLES - len(audio)
        audio = np.pad(audio, (pad // 2, pad - pad // 2))

    # Mel spectrograms
    mels = []
    for s in range(0, len(audio) - WINDOW + 1, WINDOW):
        m = mel_session.run(None, {'input': audio[s:s+WINDOW].reshape(1,-1)})[0]
        mels.append(m)

    # Embeddings (sliding window of 76 mels, step 8)
    feats = []
    for i in range(0, max(1, len(mels) - N_FRAMES + 1), 8):
        if i + N_FRAMES > len(mels): break
        batch = np.concatenate(mels[i:i+N_FRAMES]).reshape(1, N_FRAMES, 32)
        emb = emb_session.run(None, {'input': batch.astype(np.float32)})[0]
        feats.append(emb.flatten())
    return np.array(feats) if feats else np.empty((0, 96))

def augment(audio):
    """Generate augmented versions: noise, volume, speed, shift."""
    a = audio.astype(np.float32)
    versions = [a]
    # Noise
    for nl in [0.003, 0.008, 0.015, 0.025]:
        versions.append(a + np.random.randn(len(a)) * nl * 32768)
    # Volume
    for g in [0.4, 0.6, 0.8, 1.2, 1.5, 2.0]:
        versions.append(a * g)
    # Time shift
    for sh in [1600, 3200, 6400]:
        versions.append(np.concatenate([np.zeros(sh), a]))
        versions.append(np.concatenate([a, np.zeros(sh)]))
    # Speed
    for rate in [0.85, 0.92, 1.08, 1.15]:
        n = int(len(a) / rate)
        versions.append(np.interp(np.linspace(0, len(a)-1, n), np.arange(len(a)), a))
    # Noise + volume combos
    for nl, g in [(0.005, 0.7), (0.01, 1.3), (0.02, 0.5)]:
        versions.append((a + np.random.randn(len(a)) * nl * 32768) * g)
    return versions  # ~24 versions per sample

print('Extracting features from recordings (with augmentation)...')
all_feats = []
for i, wav in enumerate(samples):
    sr, audio = scipy.io.wavfile.read(wav)
    if sr != 16000:
        audio = scipy.signal.resample(audio, int(len(audio) * 16000 / sr)).astype(np.int16)
    for aug in augment(audio):
        f = audio_to_features(aug)
        if len(f) > 0:
            all_feats.append(f)
    if (i+1) % 10 == 0:
        print(f'  processed {i+1}/{len(samples)} recordings...')

pos_features = np.concatenate(all_feats)
print(f'\n=== Positive: {pos_features.shape[0]} embeddings from {len(samples)} recordings ===')
print(f'    ({pos_features.shape[0] / len(samples):.0f}x augmentation multiplier)')

In [None]:
# === 3. LOAD NEGATIVE FEATURES ===
print('Loading negative features...')
neg_raw = np.load('openwakeword_features_ACAV100M_2000_hrs_16bit.npy')
neg_features = neg_raw.astype(np.float32)
if neg_raw.dtype == np.int16:
    neg_features = neg_features / 256.0

# Subsample: 50:1 negative:positive ratio
max_neg = min(len(neg_features), pos_features.shape[0] * 50)
idx = np.random.choice(len(neg_features), max_neg, replace=False)
neg_features = neg_features[idx]

val_raw = np.load('validation_set_features.npy')
val_neg = val_raw.astype(np.float32)
if val_raw.dtype == np.int16:
    val_neg = val_neg / 256.0

print(f'=== Negative: {neg_features.shape[0]} train, {val_neg.shape[0]} validation ===')
print(f'    Ratio: {neg_features.shape[0] / pos_features.shape[0]:.0f}:1')

In [None]:
# === 4. TRAIN THE MODEL ===
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Normalize features
all_X = np.concatenate([pos_features, neg_features])
feat_mean = all_X.mean(axis=0)
feat_std = all_X.std(axis=0) + 1e-8

X_pos = (pos_features - feat_mean) / feat_std
X_neg = (neg_features - feat_mean) / feat_std

# Train/val split for positives
n_val = max(int(len(X_pos) * 0.15), 10)
perm = np.random.permutation(len(X_pos))
X_pos_val, X_pos_train = X_pos[perm[:n_val]], X_pos[perm[n_val:]]

# Assemble training set
X_train = np.concatenate([X_pos_train, X_neg])
y_train = np.concatenate([np.ones(len(X_pos_train)), np.zeros(len(X_neg))])
shuf = np.random.permutation(len(X_train))
X_train, y_train = X_train[shuf], y_train[shuf]

# Validation set
val_neg_norm = ((val_neg - feat_mean) / feat_std)[:2000]
X_val = np.concatenate([X_pos_val, val_neg_norm])
y_val = np.concatenate([np.ones(len(X_pos_val)), np.zeros(len(val_neg_norm))])

train_loader = DataLoader(
    TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)),
    batch_size=512, shuffle=True
)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Device: {device}')
print(f'Train: {len(X_pos_train)} pos + {len(X_neg)} neg')
print(f'Val:   {len(X_pos_val)} pos + {len(val_neg_norm)} neg\n')

# Model: simple 2-layer DNN (matches openWakeWord architecture)
model = nn.Sequential(
    nn.Linear(96, 32),
    nn.ReLU(),
    nn.Linear(32, 1),
).to(device)

pos_weight = torch.tensor([len(X_neg) / max(len(X_pos_train), 1)]).to(device)
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=75)

best_val_acc, best_state = 0, None
for epoch in range(75):
    model.train()
    total_loss = 0
    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        loss = criterion(model(xb).squeeze(), yb)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    scheduler.step()

    model.eval()
    with torch.no_grad():
        vo = model(torch.FloatTensor(X_val).to(device)).squeeze()
        vp = (torch.sigmoid(vo) > 0.5).cpu().numpy()
        acc = (vp == y_val).mean()
        recall = vp[y_val == 1].mean() if (y_val == 1).sum() > 0 else 0
        fpr = vp[y_val == 0].mean() if (y_val == 0).sum() > 0 else 0

    if acc > best_val_acc:
        best_val_acc = acc
        best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}

    if (epoch + 1) % 10 == 0:
        print(f'Epoch {epoch+1:3d} | Loss {total_loss/len(train_loader):.4f} | Acc {acc:.3f} | Recall {recall:.3f} | FPR {fpr:.4f}')

model.load_state_dict(best_state)
print(f'\n=== Training done! Best val accuracy: {best_val_acc:.3f} ===')

In [None]:
# === 5. EXPORT TO ONNX ===

class ExportModel(nn.Module):
    def __init__(self, base):
        super().__init__()
        self.base = base
    def forward(self, x):
        return torch.sigmoid(self.base(x))

export_model = ExportModel(model.cpu()).eval()
dummy = torch.randn(1, 96)

os.makedirs('output', exist_ok=True)
onnx_path = 'output/claudinho.onnx'

torch.onnx.export(
    export_model, dummy, onnx_path,
    input_names=['input'], output_names=['output'],
    dynamic_axes={'input': {0: 'batch'}, 'output': {0: 'batch'}},
    opset_version=11,
)

# Save normalization params
np.savez('output/claudinho_norm.npz', mean=feat_mean, std=feat_std)

# Verify
test_sess = ort.InferenceSession(onnx_path)
test_out = test_sess.run(None, {'input': dummy.numpy()})
print(f'Model: {os.path.getsize(onnx_path)/1024:.1f} KB')
print(f'Output shape: {test_out[0].shape}, test score: {test_out[0][0][0]:.3f}')

# Quick accuracy check
print('\nTesting on real samples:')
for wav in samples[:5]:
    sr, audio = scipy.io.wavfile.read(wav)
    f = audio_to_features(audio)
    if len(f) > 0:
        fn = (f - feat_mean) / feat_std
        s = test_sess.run(None, {'input': fn.astype(np.float32)})[0]
        print(f'  {os.path.basename(wav)}: {s.max():.3f} {"OK" if s.max() > 0.5 else "LOW"}')

print('\nNoise (should be low):')
for i in range(3):
    noise = np.random.randn(48000).astype(np.float32) * 0.1
    f = audio_to_features(noise)
    if len(f) > 0:
        fn = (f - feat_mean) / feat_std
        s = test_sess.run(None, {'input': fn.astype(np.float32)})[0]
        print(f'  noise_{i}: {s.max():.3f}')

print('\n=== Export complete ===')

In [None]:
# === 6. DOWNLOAD ===
from google.colab import files
files.download('output/claudinho.onnx')
files.download('output/claudinho_norm.npz')
print('\nDone! Copy both files to Pi: ~/claudinho/models/')