# Training custom wake word "Тимошка" for openWakeWord

Full pipeline: Russian Piper TTS → Voice Conversion (FreeVC24) → openWakeWord Training → TFLite

**Requirements:** Google Colab Pro with A100 (recommended). Runtime: 3-6 hours.

## Stages
1. Install dependencies
2. Generate TTS samples (Piper, Russian voices)
3. Prepare target voices from Common Voice
4. Voice conversion (FreeVC24)
5. Download negative data (ACAV100M)
6. Train openWakeWord
7. Convert to TFLite
8. Test the model

## Stage 1: Install dependencies

In [None]:
# Check GPU
!nvidia-smi

import torch
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

In [None]:
%%bash
# Clone repositories
cd /content

if [ ! -d "openWakeWord" ]; then
    git clone https://github.com/dscripka/openWakeWord.git
fi

if [ ! -d "piper-sample-generator" ]; then
    git clone https://github.com/rhasspy/piper-sample-generator.git
fi

echo "Done cloning repos"

In [None]:
%%bash
# Install dependencies
# piper-phonemize needs wheels from GitHub releases
pip install -q piper-phonemize -f https://github.com/rhasspy/piper-phonemize/releases/latest
pip install -q webrtcvad
pip install -q -e /content/openWakeWord
pip install -q -e /content/piper-sample-generator
pip install -q coqui-tts
pip install -q mutagen
pip install -q torchinfo
pip install -q torchmetrics
pip install -q speechbrain
pip install -q audiomentations
pip install -q torch-audiomentations
pip install -q acoustics
pip install -q pronouncing
pip install -q datasets
pip install -q deep-phonemizer
# Use Colab's pre-installed TensorFlow, just add onnx conversion
pip install -q onnx onnx2tf tf2onnx flatbuffers

echo "\nAll dependencies installed"

In [None]:
import os

# Working directories
BASE_DIR = "/content/timoshka"
VOICES_DIR = os.path.join(BASE_DIR, "piper_voices")
TTS_POSITIVE_DIR = os.path.join(BASE_DIR, "tts_positive")
TTS_NEGATIVE_DIR = os.path.join(BASE_DIR, "tts_negative")
VOICE_TARGETS_DIR = os.path.join(BASE_DIR, "voice_targets")
VC_POSITIVE_DIR = os.path.join(BASE_DIR, "vc_positive")
VC_NEGATIVE_DIR = os.path.join(BASE_DIR, "vc_negative")
OUTPUT_DIR = os.path.join(BASE_DIR, "output")

for d in [
    BASE_DIR, VOICES_DIR, TTS_POSITIVE_DIR, TTS_NEGATIVE_DIR,
    VOICE_TARGETS_DIR, VC_POSITIVE_DIR, VC_NEGATIVE_DIR, OUTPUT_DIR,
]:
    os.makedirs(d, exist_ok=True)

print("Directories created")

## Stage 2: Generate TTS samples (Piper)

In [None]:
%%bash
# Download Russian Piper voices
cd /content/timoshka/piper_voices

VOICES=(
    "ru/ru_RU/irina/medium/ru_RU-irina-medium"
    "ru/ru_RU/ruslan/medium/ru_RU-ruslan-medium"
    "ru/ru_RU/denis/medium/ru_RU-denis-medium"
    "ru/ru_RU/dmitri/medium/ru_RU-dmitri-medium"
)

BASE_URL="https://huggingface.co/rhasspy/piper-voices/resolve/main"

for voice in "${VOICES[@]}"; do
    name=$(basename $voice)
    if [ ! -f "${name}.onnx" ]; then
        echo "Downloading ${name}..."
        wget -q -O "${name}.onnx" "${BASE_URL}/${voice}.onnx?download=true"
        wget -q -O "${name}.onnx.json" "${BASE_URL}/${voice}.onnx.json?download=true"
    else
        echo "${name} already downloaded"
    fi
done

echo "\nAll voices downloaded:"
ls -la *.onnx

In [None]:
import subprocess
import glob

# Positive samples: 250 per voice x 4 voices = 1000 base samples

voices = sorted(glob.glob(os.path.join(VOICES_DIR, "*.onnx")))
print(f"Found {len(voices)} Piper voices")

POSITIVE_PHRASE = "\u0442\u0438\u043c\u043e\u0448\u043a\u0430"  # тимошка
SAMPLES_PER_VOICE = 250

for voice_path in voices:
    voice_name = os.path.basename(voice_path).replace(".onnx", "")
    out_dir = os.path.join(TTS_POSITIVE_DIR, voice_name)
    os.makedirs(out_dir, exist_ok=True)

    existing = len(glob.glob(os.path.join(out_dir, "*.wav")))
    if existing >= SAMPLES_PER_VOICE:
        print(f"  {voice_name}: {existing} samples already exist, skipping")
        continue

    print(f"  Generating {SAMPLES_PER_VOICE} positive samples with {voice_name}...")
    subprocess.run([
        "python3", "/content/piper-sample-generator/generate_samples.py",
        POSITIVE_PHRASE,
        "--model", voice_path,
        "--max-samples", str(SAMPLES_PER_VOICE),
        "--output-dir", out_dir,
    ], check=True)

    generated = len(glob.glob(os.path.join(out_dir, "*.wav")))
    print(f"    Generated: {generated}")

total = sum(
    len(glob.glob(os.path.join(TTS_POSITIVE_DIR, d, "*.wav")))
    for d in os.listdir(TTS_POSITIVE_DIR)
    if os.path.isdir(os.path.join(TTS_POSITIVE_DIR, d))
)
print(f"\nTotal positive TTS samples: {total}")

In [None]:
# Adversarial negative samples: phonetically similar Russian words

NEGATIVE_PHRASES = [
    "\u0442\u0438\u043c\u043e\u0444\u0435\u0439",      # тимофей
    "\u043a\u043e\u0448\u043a\u0430",                    # кошка
    "\u043c\u043e\u0448\u043a\u0430",                    # мошка
    "\u0440\u043e\u043c\u0430\u0448\u043a\u0430",      # ромашка
    "\u043c\u0430\u0442\u0440\u0451\u0448\u043a\u0430", # матрёшка
    "\u0433\u0430\u0440\u043c\u043e\u0448\u043a\u0430", # гармошка
    "\u043a\u0430\u0440\u0442\u043e\u0448\u043a\u0430", # картошка
    "\u043e\u043a\u0440\u043e\u0448\u043a\u0430",      # окрошка
    "\u043c\u0438\u0448\u043a\u0430",                    # мишка
    "\u043c\u044b\u0448\u043a\u0430",                    # мышка
    "\u0442\u0438\u0448\u043a\u0430",                    # тишка
    "\u0442\u0438\u043c\u043e\u0448\u0430",            # тимоша
    "\u0442\u0438\u043c\u043e\u0448\u0435\u043d\u043a\u043e", # тимошенко
    "\u043c\u043e\u0440\u043e\u0448\u043a\u0430",      # морошка
    "\u043a\u0440\u043e\u0448\u043a\u0430",            # крошка
    "\u0434\u043e\u0440\u043e\u0436\u043a\u0430",      # дорожка
    "\u043b\u043e\u0436\u043a\u0430",                    # ложка
    "\u0442\u0438\u0448\u0438\u043d\u0430",            # тишина
    "\u0442\u0451\u043c\u0443\u0448\u043a\u0430",      # тёмушка
]

NEGATIVE_SAMPLES_PER_PHRASE_PER_VOICE = 50

for phrase in NEGATIVE_PHRASES:
    for voice_path in voices:
        voice_name = os.path.basename(voice_path).replace(".onnx", "")
        safe_phrase = phrase.replace("\u0451", "\u0435")
        out_dir = os.path.join(TTS_NEGATIVE_DIR, f"{safe_phrase}_{voice_name}")
        os.makedirs(out_dir, exist_ok=True)

        existing = len(glob.glob(os.path.join(out_dir, "*.wav")))
        if existing >= NEGATIVE_SAMPLES_PER_PHRASE_PER_VOICE:
            continue

        subprocess.run([
            "python3", "/content/piper-sample-generator/generate_samples.py",
            phrase,
            "--model", voice_path,
            "--max-samples", str(NEGATIVE_SAMPLES_PER_PHRASE_PER_VOICE),
            "--output-dir", out_dir,
        ], check=True)

    print(f"  Done: {phrase}")

# Count totals
total_neg = 0
for root, dirs, files in os.walk(TTS_NEGATIVE_DIR):
    total_neg += len([f for f in files if f.endswith(".wav")])

print(f"\nTotal negative TTS samples: {total_neg}")
print(f"Expected: {len(NEGATIVE_PHRASES)} phrases x {len(voices)} voices x {NEGATIVE_SAMPLES_PER_PHRASE_PER_VOICE} = {len(NEGATIVE_PHRASES) * len(voices) * NEGATIVE_SAMPLES_PER_PHRASE_PER_VOICE}")

In [None]:
# Listen to a few samples to verify
import IPython.display as ipd

sample_files = glob.glob(os.path.join(TTS_POSITIVE_DIR, "*/*.wav"))[:3]
for f in sample_files:
    print(f"Playing: {os.path.basename(os.path.dirname(f))}/{os.path.basename(f)}")
    ipd.display(ipd.Audio(f))

## Stage 3: Prepare target voices (Common Voice)

**Option A** (below): Download automatically via HuggingFace datasets API.

**Option B** (manual):
1. Download Russian Common Voice dataset from https://commonvoice.mozilla.org/datasets
2. Upload the archive to Google Drive or directly to Colab
3. Extract to `/content/common_voice_ru/`

In [None]:
# Option A: Download via HuggingFace datasets API

from datasets import load_dataset
import soundfile as sf
import random

print("Loading Russian Common Voice from HuggingFace...")
print("(This may take a while on first download)")

cv_dataset = load_dataset(
    "mozilla-foundation/common_voice_16_1",
    "ru",
    split="validated",
    trust_remote_code=True,
)

print(f"Total validated clips: {len(cv_dataset)}")

# Pick one clip per unique client_id for speaker diversity
seen_speakers = set()
selected = []

indices = list(range(len(cv_dataset)))
random.shuffle(indices)

MAX_TARGETS = 100

for idx in indices:
    if len(selected) >= MAX_TARGETS:
        break
    row = cv_dataset[idx]
    speaker = row["client_id"]
    if speaker in seen_speakers:
        continue

    audio = row["audio"]
    duration = len(audio["array"]) / audio["sampling_rate"]

    # Filter: 3-15 seconds
    if duration < 3.0 or duration > 15.0:
        continue

    seen_speakers.add(speaker)
    selected.append(row)

print(f"Selected {len(selected)} unique speakers")

# Save as 16kHz mono WAV
import torchaudio
import torch

for i, row in enumerate(selected):
    audio = row["audio"]
    waveform = torch.tensor(audio["array"]).unsqueeze(0).float()
    sr = audio["sampling_rate"]

    if sr != 16000:
        resampler = torchaudio.transforms.Resample(sr, 16000)
        waveform = resampler(waveform)

    out_path = os.path.join(VOICE_TARGETS_DIR, f"speaker_{i:04d}.wav")
    torchaudio.save(out_path, waveform, 16000)

print(f"Saved {len(selected)} target voice files to {VOICE_TARGETS_DIR}")

In [None]:
# Option B: If you downloaded Common Voice manually and uploaded to Colab,
# uncomment and set the path:

# import sys
# sys.path.insert(0, '/content')
# from voice_convert import prepare_common_voice_targets
#
# prepare_common_voice_targets(
#     cv_dir="/content/common_voice_ru",
#     output_dir=VOICE_TARGETS_DIR,
#     max_clips=100,
# )

In [None]:
# Verify target voice count
target_files = sorted(glob.glob(os.path.join(VOICE_TARGETS_DIR, "*.wav")))
print(f"Target voice files: {len(target_files)}")

if len(target_files) < 10:
    print("WARNING: Too few target voices! Aim for 50-100 for good results.")
elif len(target_files) < 50:
    print("OK: Minimum viable, but 100 targets will produce better results.")
else:
    print("Good: Sufficient target voices for training.")

## Stage 4: Voice Conversion (FreeVC24)

This is the longest stage (~3-5 hours on A100).

Each TTS sample is converted with each target voice, producing N x M results.

In [None]:
import time
from pathlib import Path
from TTS.api import TTS

# Load FreeVC24 model
print("Loading FreeVC24 model...")
vc_model = TTS("voice_conversion_models/multilingual/vctk/freevc24").to("cuda")
print("Model loaded!")

In [None]:
def run_voice_conversion(source_dir, output_dir, target_files, label=""):
    """Convert all WAVs in source_dir with all target voices."""
    os.makedirs(output_dir, exist_ok=True)

    # Collect all source WAVs (may be in subdirectories)
    source_files = []
    for root, dirs, files in os.walk(source_dir):
        for f in files:
            if f.endswith(".wav"):
                source_files.append(os.path.join(root, f))
    source_files.sort()

    total = len(source_files) * len(target_files)
    print(f"{label}Sources: {len(source_files)}, Targets: {len(target_files)}, Total: {total}")

    done = 0
    skipped = 0
    errors = 0
    t0 = time.time()

    for src in source_files:
        src_name = Path(src).stem
        # Include parent dir name to avoid collisions
        parent_name = Path(src).parent.name
        prefix = f"{parent_name}_{src_name}" if parent_name != Path(source_dir).name else src_name

        for tgt in target_files:
            tgt_name = Path(tgt).stem
            out_path = os.path.join(output_dir, f"{prefix}_vc{tgt_name}.wav")

            if os.path.exists(out_path):
                skipped += 1
                done += 1
                continue

            try:
                vc_model.voice_conversion_to_file(
                    source_wav=src,
                    target_wav=tgt,
                    file_path=out_path,
                )
                done += 1
            except Exception as e:
                errors += 1
                done += 1
                if errors <= 5:
                    print(f"  Error: {prefix} + {tgt_name}: {e}")

            if done % 500 == 0:
                elapsed = time.time() - t0
                rate = (done - skipped) / max(elapsed, 1)
                eta = (total - done) / max(rate, 0.01)
                print(
                    f"  [{done}/{total}] {rate:.1f}/s, "
                    f"ETA {eta/60:.0f}min, errors={errors}"
                )

    elapsed = time.time() - t0
    print(
        f"  Done: {done - skipped - errors} converted, "
        f"{skipped} skipped, {errors} errors in {elapsed/60:.1f}min\n"
    )
    return done - skipped - errors

In [None]:
# Convert positive samples
print("=" * 60)
print("POSITIVE SAMPLES: Voice Conversion")
print("=" * 60)

n_positive = run_voice_conversion(
    source_dir=TTS_POSITIVE_DIR,
    output_dir=VC_POSITIVE_DIR,
    target_files=target_files,
    label="[POSITIVE] ",
)

print(f"Total positive voice-converted samples: {n_positive}")

In [None]:
# Convert negative samples
print("=" * 60)
print("NEGATIVE SAMPLES: Voice Conversion")
print("=" * 60)

n_negative = run_voice_conversion(
    source_dir=TTS_NEGATIVE_DIR,
    output_dir=VC_NEGATIVE_DIR,
    target_files=target_files,
    label="[NEGATIVE] ",
)

print(f"Total negative voice-converted samples: {n_negative}")

In [None]:
# Resample everything to 16kHz mono (safety check)
import torchaudio

def ensure_16k_mono(directory):
    """Ensure all WAVs in directory are 16kHz mono."""
    files = glob.glob(os.path.join(directory, "*.wav"))
    fixed = 0
    for f in files:
        try:
            waveform, sr = torchaudio.load(f)
            changed = False
            if waveform.shape[0] > 1:
                waveform = waveform.mean(dim=0, keepdim=True)
                changed = True
            if sr != 16000:
                waveform = torchaudio.transforms.Resample(sr, 16000)(waveform)
                changed = True
            if changed:
                torchaudio.save(f, waveform, 16000)
                fixed += 1
        except Exception as e:
            print(f"  Error resampling {f}: {e}")
    return fixed

print("Checking positive samples...")
fixed_pos = ensure_16k_mono(VC_POSITIVE_DIR)
print(f"  Fixed {fixed_pos} files")

print("Checking negative samples...")
fixed_neg = ensure_16k_mono(VC_NEGATIVE_DIR)
print(f"  Fixed {fixed_neg} files")

In [None]:
# Listen to a few voice-converted samples
vc_samples = glob.glob(os.path.join(VC_POSITIVE_DIR, "*.wav"))[:3]
for f in vc_samples:
    print(f"Playing: {os.path.basename(f)}")
    ipd.display(ipd.Audio(f))

## Stage 5: Download training data

In [None]:
%%bash
cd /content/timoshka

# ACAV100M features (~6 GB) — negative training data
if [ ! -f "openwakeword_features_ACAV100M_2000_hrs_16bit.npy" ]; then
    echo "Downloading ACAV100M features (~6 GB)..."
    wget -q --show-progress \
        https://huggingface.co/datasets/davidscripka/openwakeword_features/resolve/main/openwakeword_features_ACAV100M_2000_hrs_16bit.npy
else
    echo "ACAV100M features already downloaded"
fi

# Validation set (~30 MB)
if [ ! -f "validation_set_features.npy" ]; then
    echo "Downloading validation set..."
    wget -q --show-progress \
        https://huggingface.co/datasets/davidscripka/openwakeword_features/resolve/main/validation_set_features.npy
else
    echo "Validation set already downloaded"
fi

echo "\nData files:"
ls -lh *.npy

In [None]:
%%bash
cd /content/timoshka

# MIT Room Impulse Responses for reverb augmentation
if [ ! -d "mit_rirs" ]; then
    echo "Downloading MIT RIRs..."
    mkdir -p mit_rirs
    wget -q --show-progress -O mit_rirs.zip \
        https://mcdermottlab.mit.edu/Reverb/IRMAudio/Audio.zip
    unzip -q mit_rirs.zip -d mit_rirs/ 2>/dev/null || true
    rm -f mit_rirs.zip
    echo "MIT RIRs downloaded"
else
    echo "MIT RIRs already present"
fi

# Background noise: AudioSet subset + FMA
if [ ! -d "audioset_16k" ]; then
    echo "Downloading AudioSet background noise subset..."
    mkdir -p audioset_16k
    wget -q --show-progress -O audioset_16k.tar.gz \
        https://huggingface.co/datasets/davidscripka/openwakeword_features/resolve/main/audioset_16k_sample.tar.gz \
        2>/dev/null || echo "Note: AudioSet subset not found. Training will use ACAV100M as primary negative data."
    if [ -f audioset_16k.tar.gz ]; then
        tar -xzf audioset_16k.tar.gz -C audioset_16k/ 2>/dev/null || true
        rm -f audioset_16k.tar.gz
    fi
fi

mkdir -p fma
echo "Background data ready"

## Stage 6: Train openWakeWord

In [None]:
import yaml

# Count samples
n_pos = len(glob.glob(os.path.join(VC_POSITIVE_DIR, "*.wav")))
n_neg = len(glob.glob(os.path.join(VC_NEGATIVE_DIR, "*.wav")))
n_pos_tts = sum(
    len(glob.glob(os.path.join(TTS_POSITIVE_DIR, d, "*.wav")))
    for d in os.listdir(TTS_POSITIVE_DIR)
    if os.path.isdir(os.path.join(TTS_POSITIVE_DIR, d))
)
n_neg_tts = 0
for root, dirs, files in os.walk(TTS_NEGATIVE_DIR):
    n_neg_tts += len([f for f in files if f.endswith(".wav")])

print(f"Voice-converted positive: {n_pos}")
print(f"Voice-converted negative: {n_neg}")
print(f"TTS positive (original):  {n_pos_tts}")
print(f"TTS negative (original):  {n_neg_tts}")
print(f"Total positive: {n_pos + n_pos_tts}")
print(f"Total negative: {n_neg + n_neg_tts}")

In [None]:
# Merge all positive samples into a single directory
import shutil

ALL_POSITIVE_DIR = os.path.join(BASE_DIR, "all_positive")
ALL_NEGATIVE_DIR = os.path.join(BASE_DIR, "all_negative")
os.makedirs(ALL_POSITIVE_DIR, exist_ok=True)
os.makedirs(ALL_NEGATIVE_DIR, exist_ok=True)

# Symlink positive: VC + original TTS
for f in glob.glob(os.path.join(VC_POSITIVE_DIR, "*.wav")):
    dst = os.path.join(ALL_POSITIVE_DIR, os.path.basename(f))
    if not os.path.exists(dst):
        os.symlink(f, dst)

for root, dirs, files in os.walk(TTS_POSITIVE_DIR):
    for f in files:
        if f.endswith(".wav"):
            src = os.path.join(root, f)
            parent = os.path.basename(root)
            dst = os.path.join(ALL_POSITIVE_DIR, f"tts_{parent}_{f}")
            if not os.path.exists(dst):
                os.symlink(src, dst)

# Symlink negative: VC + original TTS
for f in glob.glob(os.path.join(VC_NEGATIVE_DIR, "*.wav")):
    dst = os.path.join(ALL_NEGATIVE_DIR, os.path.basename(f))
    if not os.path.exists(dst):
        os.symlink(f, dst)

for root, dirs, files in os.walk(TTS_NEGATIVE_DIR):
    for f in files:
        if f.endswith(".wav"):
            src = os.path.join(root, f)
            parent = os.path.basename(root)
            dst = os.path.join(ALL_NEGATIVE_DIR, f"tts_{parent}_{f}")
            if not os.path.exists(dst):
                os.symlink(src, dst)

total_pos = len(glob.glob(os.path.join(ALL_POSITIVE_DIR, "*.wav")))
total_neg = len(glob.glob(os.path.join(ALL_NEGATIVE_DIR, "*.wav")))
print(f"All positive samples: {total_pos}")
print(f"All negative samples: {total_neg}")

In [None]:
# Create training config
# openWakeWord train.py expects a specific directory structure;
# we create the necessary symlinks in output_dir

TRAINING_DIR = os.path.join(BASE_DIR, "training")
os.makedirs(TRAINING_DIR, exist_ok=True)

config = {
    "model_name": "timoshka",
    "target_phrase": ["\u0442\u0438\u043c\u043e\u0448\u043a\u0430"],
    "custom_negative_phrases": [],

    # We supply pre-generated clips, set to 0
    "n_samples": 0,
    "n_samples_val": 0,

    "augmentation_rounds": 1,
    "augmentation_batch_size": 16,

    "piper_sample_generator_path": "/content/piper-sample-generator",
    "output_dir": TRAINING_DIR,

    "rir_paths": [os.path.join(BASE_DIR, "mit_rirs")],
    "background_paths": [
        os.path.join(BASE_DIR, "audioset_16k"),
        os.path.join(BASE_DIR, "fma"),
    ],
    "background_paths_duplication_rate": [1],

    "feature_data_files": {
        "ACAV100M_sample": os.path.join(
            BASE_DIR, "openwakeword_features_ACAV100M_2000_hrs_16bit.npy"
        ),
    },
    "false_positive_validation_data_path": os.path.join(
        BASE_DIR, "validation_set_features.npy"
    ),

    "batch_n_per_class": {
        "ACAV100M_sample": 1024,
        "adversarial_negative": 50,
        "positive": 50,
    },

    "model_type": "dnn",
    "layer_size": 32,

    "steps": 50000,
    "max_negative_weight": 1500,
    "target_false_positives_per_hour": 0.2,
    "target_accuracy": 0.7,
    "target_recall": 0.5,
}

config_path = os.path.join(BASE_DIR, "timoshka_config.yaml")
with open(config_path, "w") as f:
    yaml.dump(config, f, default_flow_style=False, allow_unicode=True)

print(f"Config saved to {config_path}")
print()
print(open(config_path).read())

In [None]:
# Prepare directory structure expected by train.py
# openWakeWord expects:
#   output_dir/<phrase>/positive/  — positive WAVs
#   output_dir/<phrase>/negative/  — adversarial negative WAVs

phrase_dir = os.path.join(TRAINING_DIR, "\u0442\u0438\u043c\u043e\u0448\u043a\u0430")
pos_link = os.path.join(phrase_dir, "positive")
neg_link = os.path.join(phrase_dir, "negative")

os.makedirs(phrase_dir, exist_ok=True)

# Create symlinks to our data
if os.path.exists(pos_link):
    os.remove(pos_link)
os.symlink(ALL_POSITIVE_DIR, pos_link)

if os.path.exists(neg_link):
    os.remove(neg_link)
os.symlink(ALL_NEGATIVE_DIR, neg_link)

print(f"Positive -> {pos_link} -> {ALL_POSITIVE_DIR}")
print(f"Negative -> {neg_link} -> {ALL_NEGATIVE_DIR}")
print(f"\nPositive samples: {len(os.listdir(pos_link))}")
print(f"Negative samples: {len(os.listdir(neg_link))}")

In [None]:
%%bash
cd /content

# Step 1: Augmentation (noise, reverb, speed variations)
# Skip --generate_clips since we already generated samples externally
echo "Starting augmentation..."

python openWakeWord/openwakeword/train.py \
    --training_config /content/timoshka/timoshka_config.yaml \
    --augment_clips \
    --overwrite

echo "\nAugmentation complete!"

In [None]:
%%bash
cd /content

# Step 2: Train model
echo "Starting training (this will take a while)..."

python openWakeWord/openwakeword/train.py \
    --training_config /content/timoshka/timoshka_config.yaml \
    --train_model

echo "\nTraining complete!"

## Stage 7: Convert to TFLite

In [None]:
%%bash
cd /content

# Convert ONNX -> TFLite
python openWakeWord/openwakeword/train.py \
    --training_config /content/timoshka/timoshka_config.yaml \
    --convert_to_tflite

echo "\nConversion complete!"

In [None]:
# Find and display the result
import glob

tflite_files = glob.glob(os.path.join(TRAINING_DIR, "**/*.tflite"), recursive=True)
onnx_files = glob.glob(os.path.join(TRAINING_DIR, "**/*.onnx"), recursive=True)

print("Generated model files:")
for f in tflite_files + onnx_files:
    size = os.path.getsize(f)
    print(f"  {f} ({size/1024:.1f} KB)")

# Copy the tflite to a convenient location
if tflite_files:
    final_path = os.path.join(BASE_DIR, "timoshka.tflite")
    shutil.copy2(tflite_files[0], final_path)
    print(f"\nFinal model: {final_path}")
    print(f"Size: {os.path.getsize(final_path)/1024:.1f} KB")
else:
    print("\nERROR: No .tflite file found. Check training logs above.")
    if onnx_files:
        print(f"Found ONNX model at: {onnx_files[0]}")
        print("Try manual conversion in the next cell.")

In [None]:
# Fallback: manual ONNX -> TFLite conversion (if automatic conversion failed)
# Uses onnx2tf instead of deprecated onnx_tf

# import onnx
# import subprocess
#
# onnx_path = onnx_files[0]
# output_dir_tf = os.path.join(BASE_DIR, "tf_conversion")
#
# # onnx2tf converts ONNX -> SavedModel -> TFLite in one step
# subprocess.run([
#     "onnx2tf", "-i", onnx_path,
#     "-o", output_dir_tf,
#     "-oiqt",  # output int8 quantized tflite
# ], check=True)
#
# # Find the generated tflite
# import glob
# tflite = glob.glob(os.path.join(output_dir_tf, "**/*.tflite"), recursive=True)
# if tflite:
#     shutil.copy2(tflite[0], os.path.join(BASE_DIR, "timoshka.tflite"))
#     print(f"Saved: timoshka.tflite ({os.path.getsize(tflite[0])/1024:.1f} KB)")

## Stage 8: Test the model

In [None]:
# Test the model on our samples
from openwakeword.model import Model
import numpy as np
import wave

model_path = os.path.join(BASE_DIR, "timoshka.tflite")
oww_model = Model(wakeword_models=[model_path])
model_name = list(oww_model.models.keys())[0]

print(f"Loaded model: {model_name}")

def test_wav(wav_path, model, name):
    """Test a single WAV file and return max score."""
    waveform, sr = torchaudio.load(wav_path)
    if sr != 16000:
        waveform = torchaudio.transforms.Resample(sr, 16000)(waveform)
    if waveform.shape[0] > 1:
        waveform = waveform.mean(dim=0, keepdim=True)

    audio = (waveform.squeeze().numpy() * 32767).astype(np.int16)

    model.reset()
    chunk_size = 1280
    max_score = 0.0
    for i in range(0, len(audio) - chunk_size, chunk_size):
        chunk = audio[i:i+chunk_size]
        prediction = model.predict(chunk)
        score = prediction[name]
        max_score = max(max_score, score)
    return max_score

In [None]:
import random

# Test on positive samples
print("=" * 50)
print("POSITIVE SAMPLES (should trigger)")
print("=" * 50)

pos_files = glob.glob(os.path.join(ALL_POSITIVE_DIR, "*.wav"))
test_pos = random.sample(pos_files, min(50, len(pos_files)))

pos_scores = []
for f in test_pos:
    score = test_wav(f, oww_model, model_name)
    pos_scores.append(score)

triggered = sum(1 for s in pos_scores if s >= 0.5)
print(f"Tested: {len(test_pos)} samples")
print(f"Triggered (>0.5): {triggered}/{len(test_pos)} ({100*triggered/len(test_pos):.0f}%)")
print(f"Mean score: {np.mean(pos_scores):.3f}")
print(f"Min/Max: {np.min(pos_scores):.3f} / {np.max(pos_scores):.3f}")

In [None]:
# Test on negative samples
print("=" * 50)
print("NEGATIVE SAMPLES (should NOT trigger)")
print("=" * 50)

neg_files = glob.glob(os.path.join(ALL_NEGATIVE_DIR, "*.wav"))
test_neg = random.sample(neg_files, min(100, len(neg_files)))

neg_scores = []
false_positives = []
for f in test_neg:
    score = test_wav(f, oww_model, model_name)
    neg_scores.append(score)
    if score >= 0.5:
        false_positives.append((os.path.basename(f), score))

print(f"Tested: {len(test_neg)} samples")
print(f"False positives (>0.5): {len(false_positives)}/{len(test_neg)} ({100*len(false_positives)/len(test_neg):.1f}%)")
print(f"Mean score: {np.mean(neg_scores):.3f}")
print(f"Min/Max: {np.min(neg_scores):.3f} / {np.max(neg_scores):.3f}")

if false_positives:
    print("\nFalse positive files:")
    for fname, score in sorted(false_positives, key=lambda x: -x[1])[:10]:
        print(f"  {fname}: {score:.3f}")

In [None]:
# Summary
print("\n" + "=" * 50)
print("SUMMARY")
print("=" * 50)

tp_rate = sum(1 for s in pos_scores if s >= 0.5) / len(pos_scores) * 100
fp_rate = len(false_positives) / len(test_neg) * 100

print(f"True positive rate:  {tp_rate:.0f}%")
print(f"False positive rate: {fp_rate:.1f}%")
print()

if tp_rate >= 70 and fp_rate < 5:
    print("Model looks good! Ready for deployment.")
elif tp_rate >= 50:
    print("Model is acceptable. Consider more training or data.")
else:
    print("Model needs improvement. Try:")
    print("  - More diverse target voices")
    print("  - More training steps")
    print("  - Adjusting threshold (lower for more sensitivity)")

## Download model

In [None]:
# Download via Colab
from google.colab import files

model_path = os.path.join(BASE_DIR, "timoshka.tflite")
if os.path.exists(model_path):
    files.download(model_path)
    print(f"Downloaded: timoshka.tflite ({os.path.getsize(model_path)/1024:.1f} KB)")
else:
    print("Model file not found. Check training logs above.")

## Deploy to Home Assistant

After downloading `timoshka.tflite`:

```bash
# 1. Copy model to server
scp timoshka.tflite v@plex.local:/home/v/home-assistant/openwakeword-data/timoshka.tflite

# 2. The model will be picked up by the openwakeword container via volume mount.
#    If using /share/openwakeword/, copy there instead:
#    scp timoshka.tflite v@plex.local:/home/v/home-assistant/homeassistant/share/openwakeword/timoshka.tflite

# 3. Restart the openwakeword container
ssh v@plex.local 'cd /home/v/home-assistant && docker compose restart openwakeword'

# 4. In Home Assistant:
#    Settings -> Voice assistants -> your assistant -> Wake word -> select "timoshka"
#
# No changes needed to ESPHome/Atom Echo — wake word is processed server-side.
```