# Train Custom openWakeWord Model

Windows-compatible version of the openWakeWord training notebook.

## Prerequisites

### Setup the Jupyter kernel (one-time)

```bash
uv add ipykernel --dev
uv run python -m ipykernel install --user --name voice-gateway
```

### Run in VS Code

1. Open this notebook in VS Code
2. Click **Select Kernel** (top right) â†’ **voice-gateway**
3. Run the cells!

## 1. Install Dependencies

In [None]:
# Set to True for NVIDIA GPU support (requires CUDA installed)
USE_GPU = True

In [None]:
import subprocess
import sys

# Install PyTorch with CUDA support from PyTorch's index
if USE_GPU:
    print("Installing PyTorch with CUDA support...")
    subprocess.run([
        "uv", "pip", "install",
        "torch>=2.0,<2.6", "torchaudio",
        "--index-url", "https://download.pytorch.org/whl/cu124"
    ], check=True)
else:
    print("Installing PyTorch (CPU only)...")
    subprocess.run(["uv", "pip", "install", "torch>=2.0,<2.6", "torchaudio"], check=True)

# Verify GPU is available
import torch
if torch.cuda.is_available():
    print(f"GPU available: {torch.cuda.get_device_name(0)}")
else:
    print("No GPU detected - will use CPU")

In [None]:
# Core dependencies (Windows-compatible)
deps = [
    "piper-tts>=1.2.0",
    "numpy",
    "scipy",
    "tqdm",
    "datasets==2.14.6",
    "pyyaml",
    "onnxruntime",
    "onnx",
    "pronouncing",
    "deep-phonemizer",
    "mutagen",
    "torchinfo",
    "torchmetrics",
    "speechbrain==0.5.14",
    "requests",
    "ipywidgets",
]

# Install dependencies
failed = []
for dep in deps:
    result = subprocess.run(["uv", "pip", "install", dep], capture_output=True, text=True)
    if result.returncode != 0:
        failed.append(dep)
        print(f"Failed: {dep}")
    else:
        print(f"Installed: {dep}")

if failed:
    print(f"\nWarning: Failed to install: {failed}")
else:
    print("\nAll dependencies installed successfully!")

## 2. Setup

In [None]:
import os
import subprocess
import wave
import json
import random
from pathlib import Path
import requests
from tqdm.auto import tqdm
import numpy as np

SCRIPT_DIR = Path(".").resolve()
print(f"Working directory: {SCRIPT_DIR}")

In [None]:
# Download piper voice model (ONNX format - works on Windows)
PIPER_MODELS_DIR = SCRIPT_DIR / "piper_models"
PIPER_MODELS_DIR.mkdir(exist_ok=True)

MODEL_NAME = "en_US-libritts_r-medium"
MODEL_PATH = PIPER_MODELS_DIR / f"{MODEL_NAME}.onnx"
CONFIG_PATH = PIPER_MODELS_DIR / f"{MODEL_NAME}.onnx.json"

base_url = "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/libritts_r/medium"

for filepath, filename in [(MODEL_PATH, f"{MODEL_NAME}.onnx"), (CONFIG_PATH, f"{MODEL_NAME}.onnx.json")]:
    if not filepath.exists():
        print(f"Downloading {filename}...")
        url = f"{base_url}/{filename}"
        response = requests.get(url, stream=True)
        response.raise_for_status()
        total = int(response.headers.get("content-length", 0))
        with open(filepath, "wb") as f:
            with tqdm(total=total, unit="B", unit_scale=True, desc=filename) as pbar:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
                    pbar.update(len(chunk))
    else:
        print(f"{filename} already exists")

In [None]:
# Clone openwakeword
OWW_DIR = SCRIPT_DIR / "openwakeword"

if not OWW_DIR.exists():
    print("Cloning openwakeword...")
    subprocess.run(["git", "clone", "https://github.com/dscripka/openwakeword"], cwd=SCRIPT_DIR, check=True)
else:
    print("openwakeword already exists")

In [None]:
# Download openwakeword embedding models
OWW_MODELS_DIR = OWW_DIR / "openwakeword" / "resources" / "models"
OWW_MODELS_DIR.mkdir(parents=True, exist_ok=True)

model_urls = {
    "embedding_model.onnx": "https://github.com/dscripka/openWakeWord/releases/download/v0.5.1/embedding_model.onnx",
    "melspectrogram.onnx": "https://github.com/dscripka/openWakeWord/releases/download/v0.5.1/melspectrogram.onnx",
}

for filename, url in model_urls.items():
    filepath = OWW_MODELS_DIR / filename
    if not filepath.exists():
        print(f"Downloading {filename}...")
        response = requests.get(url, stream=True)
        response.raise_for_status()
        with open(filepath, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
    else:
        print(f"{filename} already exists")

## 3. Test Wake Word Pronunciation

Before training, verify the TTS pronounces your wake word correctly.

**Tips:**
- If pronunciation is wrong, spell it phonetically with underscores: `"hey_seer_e"` for "hey siri"
- Spell out numbers: `"two"` not `"2"`
- Avoid punctuation except `?` and `!`

In [None]:
# Configure your wake word here!
TARGET_WORD = "Seraphina"  # Change this to your desired wake word

In [None]:
from piper import PiperVoice
from piper.config import SynthesisConfig
from IPython.display import Audio, display

# Load the voice model
voice = PiperVoice.load(str(MODEL_PATH), str(CONFIG_PATH))

# Get number of speakers
with open(CONFIG_PATH) as f:
    voice_config = json.load(f)
num_speakers = voice_config.get("num_speakers", 1)
print(f"Voice model loaded with {num_speakers} speakers")

In [None]:
def generate_sample(text: str, output_path: Path, speaker_id: int = 0, 
                    length_scale: float = 1.0, noise_scale: float = 0.667, 
                    noise_w_scale: float = 0.8):
    """Generate a single audio sample using piper-tts."""
    config = SynthesisConfig(
        speaker_id=speaker_id,
        length_scale=length_scale,
        noise_scale=noise_scale,
        noise_w_scale=noise_w_scale,
    )
    
    with wave.open(str(output_path), "wb") as wav_file:
        voice.synthesize_wav(text, wav_file, syn_config=config)

# Test pronunciation
test_path = SCRIPT_DIR / "test_generation.wav"
generate_sample(TARGET_WORD, test_path, speaker_id=0, length_scale=1.1)
display(Audio(str(test_path), autoplay=True))
print(f"\nTest audio saved to: {test_path}")

In [None]:
# Try different speakers to hear variations
print("Generating samples with different speakers...")
for speaker_id in range(min(5, num_speakers)):
    sample_path = SCRIPT_DIR / f"test_speaker_{speaker_id}.wav"
    generate_sample(TARGET_WORD, sample_path, speaker_id=speaker_id)
    print(f"Speaker {speaker_id}:")
    display(Audio(str(sample_path), autoplay=False))

## 4. Generate Training Samples

Generate diverse audio samples of the wake word using different speakers and synthesis parameters.

In [None]:
# Training sample configuration
N_SAMPLES = 1000  # Number of samples to generate (more = better, but slower)
SAMPLES_DIR = SCRIPT_DIR / "generated_samples" / TARGET_WORD.replace(" ", "_")
SAMPLES_DIR.mkdir(parents=True, exist_ok=True)

print(f"Will generate {N_SAMPLES} samples to: {SAMPLES_DIR}")

In [None]:
# Generate diverse training samples
print(f"Generating {N_SAMPLES} training samples...")

# Parameter ranges for variation
length_scales = [0.8, 0.9, 1.0, 1.1, 1.2]  # Speech speed
noise_scales = [0.5, 0.667, 0.8]  # Voice variation
noise_w_scales = [0.6, 0.8, 1.0]  # Duration variation

generated = 0
for i in tqdm(range(N_SAMPLES), desc="Generating samples"):
    output_path = SAMPLES_DIR / f"sample_{i:05d}.wav"
    
    if output_path.exists():
        generated += 1
        continue
    
    # Random parameters for variation
    speaker_id = random.randint(0, num_speakers - 1)
    length_scale = random.choice(length_scales)
    noise_scale = random.choice(noise_scales)
    noise_w_scale = random.choice(noise_w_scales)
    
    try:
        generate_sample(
            TARGET_WORD, 
            output_path,
            speaker_id=speaker_id,
            length_scale=length_scale,
            noise_scale=noise_scale,
            noise_w_scale=noise_w_scale,
        )
        generated += 1
    except Exception as e:
        print(f"Failed to generate sample {i}: {e}")

print(f"\nGenerated {generated} samples in {SAMPLES_DIR}")

## 5. Download Training Data

Download pre-computed negative examples for training.

In [None]:
SKIP_LARGE_DOWNLOAD = False  # Set to True to skip the 16GB download

In [None]:
# Download validation features (small, always download)
VAL_PATH = SCRIPT_DIR / "validation_set_features.npy"

if not VAL_PATH.exists():
    print("Downloading validation features...")
    url = "https://huggingface.co/datasets/davidscripka/openwakeword_features/resolve/main/validation_set_features.npy"
    response = requests.get(url, stream=True)
    response.raise_for_status()
    total = int(response.headers.get("content-length", 0))
    with open(VAL_PATH, "wb") as f:
        with tqdm(total=total, unit="B", unit_scale=True, desc="Validation features") as pbar:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
                pbar.update(len(chunk))
else:
    print("Validation features already downloaded")

In [None]:
# Download training features (large)
FEATURES_PATH = SCRIPT_DIR / "openwakeword_features_ACAV100M_2000_hrs_16bit.npy"

if SKIP_LARGE_DOWNLOAD:
    print("Skipping large feature download (training quality will be reduced)")
elif not FEATURES_PATH.exists():
    print("Downloading training features (16GB, this will take a while)...")
    url = "https://huggingface.co/datasets/davidscripka/openwakeword_features/resolve/main/openwakeword_features_ACAV100M_2000_hrs_16bit.npy"
    response = requests.get(url, stream=True)
    response.raise_for_status()
    total = int(response.headers.get("content-length", 0))
    with open(FEATURES_PATH, "wb") as f:
        with tqdm(total=total, unit="B", unit_scale=True, desc="Training features") as pbar:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
                pbar.update(len(chunk))
else:
    print("Training features already downloaded")

## 6. Configure Training

Adjust these parameters:
- `N_STEPS`: Training steps (10000 is quick, more is better)
- `FALSE_ACTIVATION_PENALTY`: Higher = fewer false activations but may miss quiet/noisy speech

In [None]:
N_STEPS = 10000
FALSE_ACTIVATION_PENALTY = 1500

In [None]:
import yaml

# Load default config
with open(OWW_DIR / "examples" / "custom_model.yml") as f:
    config = yaml.safe_load(f)

# Modify config
config["target_phrase"] = [TARGET_WORD]
config["model_name"] = TARGET_WORD.replace(" ", "_")
config["custom_model_dir"] = str(SAMPLES_DIR)
config["n_samples"] = N_SAMPLES
config["n_samples_val"] = max(100, N_SAMPLES // 10)
config["steps"] = N_STEPS
config["target_accuracy"] = 0.5
config["target_recall"] = 0.25
config["output_dir"] = str(SCRIPT_DIR / "my_custom_model")
config["max_negative_weight"] = FALSE_ACTIVATION_PENALTY

# Data paths
config["background_paths"] = []
config["false_positive_validation_data_path"] = str(VAL_PATH)

if FEATURES_PATH.exists():
    config["feature_data_files"] = {"ACAV100M_sample": str(FEATURES_PATH)}
else:
    config["feature_data_files"] = {}

# Use pre-generated clips (skip generation step)
config["custom_clips_dir"] = str(SAMPLES_DIR)

# Save config
CONFIG_PATH = SCRIPT_DIR / "my_model.yaml"
with open(CONFIG_PATH, "w") as f:
    yaml.dump(config, f)

print(f"Config saved to: {CONFIG_PATH}")
print(f"\nTraining configuration:")
print(f"  Target word: {TARGET_WORD}")
print(f"  Samples: {N_SAMPLES}")
print(f"  Steps: {N_STEPS}")
print(f"  Output: {config['output_dir']}")

## 7. Train the Model

This runs the training pipeline. With default settings, this takes 30-60 minutes on CPU, faster on GPU.

In [None]:
TRAIN_SCRIPT = OWW_DIR / "openwakeword" / "train.py"

# Skip clip generation since we already generated them
print("=" * 50)
print("Step 1: Augmenting clips")
print("=" * 50)
subprocess.run([sys.executable, str(TRAIN_SCRIPT), "--training_config", str(CONFIG_PATH), "--augment_clips"], check=True)

In [None]:
print("=" * 50)
print("Step 2: Training model")
print("=" * 50)
subprocess.run([sys.executable, str(TRAIN_SCRIPT), "--training_config", str(CONFIG_PATH), "--train_model"], check=True)

## 8. Done!

Your trained model is in the `my_custom_model` folder.

In [None]:
OUTPUT_DIR = Path(config["output_dir"])
print(f"\nTraining complete! Model files:")
if OUTPUT_DIR.exists():
    for f in OUTPUT_DIR.glob("*"):
        print(f"  {f}")
else:
    print(f"  Output directory not found: {OUTPUT_DIR}")

In [None]:
# Optional: Copy to wakewords folder
import shutil

WAKEWORD_DIR = SCRIPT_DIR.parent / "wakewords" / TARGET_WORD.lower().replace(" ", "_")
WAKEWORD_DIR.mkdir(parents=True, exist_ok=True)

model_name = TARGET_WORD.replace(" ", "_")
for ext in [".onnx", ".tflite"]:
    src = OUTPUT_DIR / f"{model_name}{ext}"
    if src.exists():
        dst = WAKEWORD_DIR / f"{model_name}{ext}"
        shutil.copy(src, dst)
        print(f"Copied {src.name} to {dst}")