# Wake Word Dataset Generation & Augmentation Pipeline

A notebook for generating, augmenting, and benchmarking wake-word datasets.

datasets generated with this method: https://huggingface.co/collections/TigreGotico/synthetic-wakeword-datasets

for not-wake-word audio see https://huggingface.co/collections/TigreGotico/notwakeword-datasets

for voice cloning donors you can get nearly infinite variety via https://mlcommons.org/datasets/multilingual-spoken-words/

ongoing wake-word training experiments: https://github.com/OpenVoiceOS/ww-trainer

original repo: https://github.com/TigreGotico/synthetic_dataset_generator

### Pipeline stages

| # | Stage | Description |
|---|---|---|
| 0 | Configuration | Set all paths, parameters, and flags via env vars |
| 1 | Adversarial Word Generation | Phonetically confusable hard-negatives (LLM + grapheme edits) |
| 2 | Normalize Word Lists | Lowercase, sort, deduplicate |
| 3 | TTS Synthesis + Voice Conversion | Multi-engine TTS → optional Chatterbox VC |
| 4 | Voice Cloning Augmentation | Revoice existing samples via `chatterbox_bulk_vc` |
| 5 | VC TTS Synthesis | Direct Chatterbox bulk TTS with varied exaggeration |
| 6 | Training Augmentation | Noise / reverb / pitch / speed for training data |
| 7 | Benchmark Generation | Structured test sets at fixed SNRs for evaluation |

---

> **Credits:** Funded through the [NGI0 Commons Fund](https://nlnet.nl/commonsfund) via [NLnet](https://nlnet.nl), with support from the European Commission's Next Generation Internet programme (grant No 101135429).

---
## 0 · Global Configuration

All parameters are set as environment variables via `os.environ.setdefault()`.  
Override any of them from your shell, a `.env` file, or `%env` magic **before** running the config cell.

**Edit the values below to match your setup, then run the cell.**

In [None]:
import os

# ──────────────────────────────────────────────
# GENERAL
# ──────────────────────────────────────────────
os.environ.setdefault("WW_LANG",            "en")                 # Language code (en, es, pt, …)
os.environ.setdefault("WW_WORDS",           "hey_mycroft")        # Space-separated wake words (use _ for multi-word)
os.environ.setdefault("WW_BASE_DIR",        "/data/ww")           # Root directory for all I/O

# ──────────────────────────────────────────────
# ADVERSARIAL GENERATION  (Stage 1)
# ──────────────────────────────────────────────
os.environ.setdefault("ADV_N_SAMPLES",      "20")                 # Target unique samples per wake word
os.environ.setdefault("ADV_LLM_URL",        "http://localhost:11434")  # Ollama / LLM API URL
os.environ.setdefault("ADV_LLM_MODEL",      "gemma3:4b")          # LLM model name
os.environ.setdefault("ADV_LLM_WEIGHT",     "1.0")                # Weight for LLM-based generation (0 = skip)
os.environ.setdefault("ADV_GRAPH_WEIGHT",   "0.2")                # Weight for GraphemeAug generation (0 = skip)
os.environ.setdefault("ADV_GRAPH_MIN_EDIT", "2")                  # Min Levenshtein distance
os.environ.setdefault("ADV_GRAPH_MAX_EDIT", "3")                  # Max Levenshtein distance
os.environ.setdefault("ADV_FILE_MODE",      "append")             # append | overwrite | error

# ──────────────────────────────────────────────
# TTS SYNTHESIS + VOICE CONVERSION  (Stage 3)
# ──────────────────────────────────────────────
os.environ.setdefault("SYNTH_N",            "900")                # Samples per wake word
os.environ.setdefault("SYNTH_USE_EDGE",     "1")                  # 1 = enable Edge TTS
os.environ.setdefault("SYNTH_USE_GOOGLE",   "1")                  # 1 = enable Google TTS
os.environ.setdefault("SYNTH_USE_PIPER",    "0")                  # 1 = enable Piper TTS
os.environ.setdefault("SYNTH_SEED",         "")                   # Random seed (empty = none)
os.environ.setdefault("SYNTH_VC_REFS",      "")                   # Reference voice WAVs dir (empty = no VC)
os.environ.setdefault("SYNTH_VC_DEVICE",    "cuda")               # VC model device: cuda | cpu

# ──────────────────────────────────────────────
# VOICE CLONING AUGMENTATION  (Stage 4)
# ──────────────────────────────────────────────
os.environ.setdefault("VC_AUG_N_RANDOM",    "1")                  # Random voice refs per sample
os.environ.setdefault("VC_AUG_VOICES_PATH", "")                   # Voice reference dataset dir

# ──────────────────────────────────────────────
# VC TTS SYNTHESIS  (Stage 5)
# ──────────────────────────────────────────────
os.environ.setdefault("VC_TTS_VOICES_PATH",      "")              # Voice references dir
os.environ.setdefault("VC_TTS_EXAGG_RANGE",      "0.4 0.9 0.1")  # Exaggeration range: min max step

# ──────────────────────────────────────────────
# TRAINING AUGMENTATION  (Stage 6)
# ──────────────────────────────────────────────
os.environ.setdefault("AUG_INPUT_DIR",      "")                   # Preprocessed dataset dir (with metadata.csv)
os.environ.setdefault("AUG_BG_NOISE_DIR",   "")                   # General background noise
os.environ.setdefault("AUG_MUSIC_DIR",      "")                   # Music folder
os.environ.setdefault("AUG_BG_SPEECH_DIR",  "")                   # Competing-speaker / babble
os.environ.setdefault("AUG_MIC_NOISE_DIR",  "")                   # Mic-specific silence / hiss
os.environ.setdefault("AUG_RIR_DIR",        "")                   # Room impulse responses
os.environ.setdefault("AUG_SNR_MIN",        "0")                  # dB
os.environ.setdefault("AUG_SNR_MAX",        "20")                 # dB
os.environ.setdefault("AUG_PITCH_MIN",      "-1.0")               # semitones
os.environ.setdefault("AUG_PITCH_MAX",      "1.0")
os.environ.setdefault("AUG_SPEED_MIN",      "0.95")
os.environ.setdefault("AUG_SPEED_MAX",      "1.05")
os.environ.setdefault("AUG_PROB",           "0.9")                # Per-sample augmentation probability

# ──────────────────────────────────────────────
# BENCHMARK GENERATION  (Stage 7)
# ──────────────────────────────────────────────
os.environ.setdefault("BENCH_CLEAN_DIR",    "")                   # Clean wake-word samples
os.environ.setdefault("BENCH_NOISE_DIR",    "")                   # Ambient / environmental noise
os.environ.setdefault("BENCH_MUSIC_DIR",    "")                   # Music noise
os.environ.setdefault("BENCH_SPEECH_DIR",   "")                   # Competing-speaker audio
os.environ.setdefault("BENCH_RIR_DIR",      "")                   # RIR files
os.environ.setdefault("BENCH_SNRS",         "20,15,10,5,0")       # Comma-separated SNR list (dB)

# ──────────────────────────────────────────────
# Derived paths
# ──────────────────────────────────────────────
BASE    = os.environ["WW_BASE_DIR"]
LANG    = os.environ["WW_LANG"]
WW_LIST = os.environ["WW_WORDS"].split()

os.environ.setdefault("SYNTH_OUTPUT_DIR",   os.path.join(BASE, "synth_output", LANG))
os.environ.setdefault("VC_AUG_OUTPUT_DIR",  os.path.join(BASE, "vc_output"))
os.environ.setdefault("VC_TTS_OUTPUT_DIR",  os.path.join(BASE, "vc_synth_output"))
os.environ.setdefault("ADV_OUTPUT_DIR",     os.path.join(BASE, "adversarial"))
os.environ.setdefault("AUG_OUTPUT_DIR",     os.path.join(BASE, "augmented_data"))
os.environ.setdefault("BENCH_OUTPUT_DIR",   os.path.join(BASE, "benchmark_data"))

def _env(key, default=""):
    return os.environ.get(key, default)

def _flag(key):
    return _env(key, "0").strip().lower() in ("1", "true", "yes")

print("Configuration loaded.")
print(f"  Language       : {LANG}")
print(f"  Wake words     : {WW_LIST}")
print(f"  Base directory : {BASE}")

---
## 1 · Adversarial Word Generation

Generates phonetically confusable hard-negatives for each wake word using:
- **GraphemeAug** - [arxiv](https://arxiv.org/abs/2505.14814v2) - single-grapheme edits (insertion / deletion / substitution) constrained by Levenshtein distance
- **LLM** - prompted to produce rhyming, rhythm-matching adversarial phrases

The two methods are weighted by `ADV_LLM_WEIGHT` and `ADV_GRAPH_WEIGHT`.

| Key env vars | |
|---|---|
| `ADV_LLM_URL` / `ADV_LLM_MODEL` | LLM endpoint |
| `ADV_N_SAMPLES` | Total unique target per word |
| `ADV_LLM_WEIGHT` / `ADV_GRAPH_WEIGHT` | Relative generation weights |
| `ADV_FILE_MODE` | `append` \| `overwrite` \| `error` |

In [None]:
!pip install --quiet --break-system-packages requests click

In [None]:
import random
from typing import List, Set, Optional

import requests

# ═══════════════════════════════════════════════════════════════════
# GraphemeAugmenter — single-grapheme-edit hard-negative generation
# ═══════════════════════════════════════════════════════════════════

class GraphemeAugmenter:
    """
    Generates hard-negative (confusable) keyword-spotting examples based on
    single-grapheme edits (Insertion, Deletion, Substitution), filtered by
    Levenshtein distance from the original keyword.
    """
    STANDARD_VOWELS = set('aeiou')
    STANDARD_CONSONANTS = set('bcdfghjklmnpqrstvwxyz')

    def __init__(self, max_edits: int = 1, min_edits: int = 1,
                 language_vowels: Optional[Set[str]] = None,
                 language_consonants: Optional[Set[str]] = None):
        if max_edits < 1:
            raise ValueError("max_edits must be 1 or greater.")
        if min_edits < 0:
            raise ValueError("min_edits cannot be negative.")
        if min_edits > max_edits:
            raise ValueError("min_edits cannot be greater than max_edits.")

        self.max_edits = max_edits
        self.min_edits = min_edits
        self.VOWELS = self.STANDARD_VOWELS.union({c.lower() for c in (language_vowels or set())})
        self.CONSONANTS = self.STANDARD_CONSONANTS.union({c.lower() for c in (language_consonants or set())})
        self.ALL_GRAPHEMES = self.VOWELS.union(self.CONSONANTS)

    def _get_char_class(self, char: str) -> Optional[str]:
        c = char.lower()
        if c in self.VOWELS:
            return 'vowel'
        elif c in self.CONSONANTS:
            return 'consonant'
        return None

    @staticmethod
    def _levenshtein_distance(s1: str, s2: str) -> int:
        if len(s1) < len(s2):
            return GraphemeAugmenter._levenshtein_distance(s2, s1)
        if len(s2) == 0:
            return len(s1)
        previous_row = list(range(len(s2) + 1))
        for i, c1 in enumerate(s1):
            current_row = [i + 1]
            for j, c2 in enumerate(s2):
                cost = 0 if c1 == c2 else 1
                current_row.append(min(
                    previous_row[j + 1] + 1,
                    current_row[j] + 1,
                    previous_row[j] + cost
                ))
            previous_row = current_row
        return previous_row[-1]

    def _get_random_one_edit(self, text: str) -> str:
        if not text:
            return random.choice(list(self.ALL_GRAPHEMES))
        edit_type = random.randint(0, 2)

        if edit_type == 0:  # Insertion
            pos = random.randint(0, len(text))
            g = random.choice(list(self.ALL_GRAPHEMES))
            return text[:pos] + g + text[pos:]

        elif edit_type == 1:  # Deletion
            pos = random.randint(0, len(text) - 1)
            return text[:pos] + text[pos + 1:]

        else:  # Substitution (same-class: vowel↔vowel, consonant↔consonant)
            for _ in range(10):
                pos = random.randint(0, len(text) - 1)
                char_class = self._get_char_class(text[pos])
                if char_class == 'vowel':
                    target_set = self.VOWELS
                elif char_class == 'consonant':
                    target_set = self.CONSONANTS
                else:
                    continue
                possible = list(target_set - {text[pos].lower()})
                if possible:
                    return text[:pos] + random.choice(possible) + text[pos + 1:]
            return self._get_random_one_edit(text)  # fallback

    def generate_confusables(self, keyword: str, n_samples: int) -> List[str]:
        original = keyword.lower()
        generated: Set[str] = set()
        if n_samples <= 0:
            return []

        seed_pool = {original}
        attempts = 0
        max_attempts = n_samples * 10 + 100

        while len(generated) < n_samples and attempts < max_attempts:
            word = random.choice(list(seed_pool))
            candidate = self._get_random_one_edit(word)
            distance = self._levenshtein_distance(original, candidate)
            if (self.min_edits <= distance <= self.max_edits
                    and candidate != original
                    and candidate not in generated):
                generated.add(candidate)
                if distance < self.max_edits:
                    seed_pool.add(candidate)
            elif distance < self.max_edits and candidate != original:
                seed_pool.add(candidate)
            attempts += 1

        return sorted(list(generated))


# ═══════════════════════════════════════════════════════════════════
# LLM-based adversarial generation (Ollama API)
# ═══════════════════════════════════════════════════════════════════

_ADV_SYSTEM_PROMPT = """You are an expert in computational linguistics, phonetics, and adversarial machine learning. Your core function is to generate lists of strings that are acoustically and linguistically similar to a given wake word or keyword. These generated strings serve as adversarial samples intended to trick an ASR system or keyword spotter.

Generation Goal: The adversarial samples must primarily rhyme with the components of the input keyword or mimic its overall rhythm and length.

Output Constraints (CRITICAL):
1. Format: Output only the generated adversarial strings.
2. Delimiter: Use a newline to separate each sample (one per line).
3. Exclusion: DO NOT include any introductory text, numbering, bullet points, explanations, or concluding remarks.
4. Diversity: All samples MUST be unique and PHONETICALLY similar.

Example:
[Input Keyword]: "hey computer"
[Target Output]:
say scooter
pay intruder
gray maneuver
stay pewter"""

_ADV_USER_TEMPLATE = "Generate {n_samples} adversarial, rhyming samples for the following keyword: {word}"


def generate_llm_samples(url: str, model: str, wakeword: str, n_samples: int) -> List[str]:
    """Call an Ollama-compatible /api/generate endpoint for adversarial phrases."""
    if n_samples <= 0:
        return []
    print(f"  LLM (target: {n_samples}) ... ", end="", flush=True)
    try:
        resp = requests.post(
            f"{url}/api/generate",
            json={
                "model": model,
                "prompt": _ADV_USER_TEMPLATE.format(word=wakeword, n_samples=n_samples * 2),
                "system": _ADV_SYSTEM_PROMPT,
                "stream": False,
            },
            timeout=60,
        )
        resp.raise_for_status()
        result = resp.json()["response"].strip().split("\n")
        word_count = len(wakeword.split())
        filtered = [r.strip() for r in result if len(r.strip().split()) == word_count]
        uniq = list(set(filtered))[:n_samples]
        print(f"got {len(uniq)}")
        return uniq
    except requests.exceptions.RequestException as e:
        print(f"API error: {e}")
        return []


def generate_grapheme_samples(augmenter: GraphemeAugmenter, wakeword: str, n_samples: int) -> List[str]:
    """Generate adversarial samples using single-grapheme edits."""
    if n_samples <= 0:
        return []
    print(f"  GraphemeAug (target: {n_samples}) ... ", end="", flush=True)
    try:
        samples = augmenter.generate_confusables(wakeword, n_samples=n_samples)
        print(f"got {len(samples)}")
        return samples
    except Exception as e:
        print(f"error: {e}")
        return []


print("Adversarial generation functions ready.")

In [None]:
# ═══════════════════════════════════════════════════════════════════
# Run adversarial generation for each wake word
# ═══════════════════════════════════════════════════════════════════

n_samples     = int(_env("ADV_N_SAMPLES", "20"))
llm_weight    = float(_env("ADV_LLM_WEIGHT", "1.0"))
graph_weight  = float(_env("ADV_GRAPH_WEIGHT", "0.2"))
llm_url       = _env("ADV_LLM_URL")
llm_model     = _env("ADV_LLM_MODEL")
file_mode     = _env("ADV_FILE_MODE", "append")
adv_out_dir   = _env("ADV_OUTPUT_DIR")

total_weight = llm_weight + graph_weight
if total_weight <= 0:
    print("Both weights are 0 — skipping adversarial generation.")
else:
    llm_ratio   = llm_weight / total_weight
    graph_ratio = graph_weight / total_weight
    llm_n       = int(n_samples * llm_ratio)
    graph_n     = n_samples - llm_n  # remainder goes to grapheme

    print(f"Target: {n_samples} samples/word  (LLM: {llm_n}, Grapheme: {graph_n})")

    augmenter = None
    if graph_weight > 0:
        augmenter = GraphemeAugmenter(
            max_edits=int(_env("ADV_GRAPH_MAX_EDIT", "3")),
            min_edits=int(_env("ADV_GRAPH_MIN_EDIT", "2")),
        )

    os.makedirs(adv_out_dir, exist_ok=True)

    for ww in WW_LIST:
        output_file = os.path.join(adv_out_dir, f"{ww}.txt")
        print(f"\n--- Processing '{ww}' -> {output_file} ---")

        # Determine file mode and load existing samples for dedup in append mode
        existing: Set[str] = set()
        if os.path.exists(output_file):
            if file_mode == "error":
                raise FileExistsError(
                    f"Output file exists: {output_file}. "
                    f"Set ADV_FILE_MODE=append or overwrite."
                )
            if file_mode == "append":
                with open(output_file, "r") as f:
                    existing = {l.strip() for l in f if l.strip()}
                print(f"  Loaded {len(existing)} existing samples (append mode)")
            open_mode = "a" if file_mode == "append" else "w"
        else:
            open_mode = "w"

        all_samples: Set[str] = existing.copy()

        # LLM generation
        if llm_n > 0 and llm_url:
            for s in generate_llm_samples(llm_url, llm_model, ww, llm_n):
                s = s.strip()
                if s and s not in all_samples:
                    all_samples.add(s)

        # Grapheme generation
        if graph_n > 0 and augmenter:
            for s in generate_grapheme_samples(augmenter, ww, graph_n):
                s = s.strip()
                if s and s not in all_samples:
                    all_samples.add(s)

        # Write results
        new_samples = sorted(all_samples - existing) if open_mode == "a" else sorted(all_samples)
        if new_samples:
            with open(output_file, open_mode) as f:
                prefix = "\n" if (open_mode == "a" and os.path.getsize(output_file) > 0) else ""
                f.write(prefix + "\n".join(new_samples))

        print(f"  Wrote {len(new_samples)} new samples (total unique: {len(all_samples)})")

    print("\nAdversarial generation complete.")

---
## 2 · Normalize Adversarial Word Lists

Lowercase, sort, and deduplicate each generated text file in place.

In [None]:
adv_out_dir = _env("ADV_OUTPUT_DIR")

for ww in WW_LIST:
    txt_file = os.path.join(adv_out_dir, f"{ww}.txt")
    if not os.path.isfile(txt_file):
        print(f"Skipping (not found): {txt_file}")
        continue
    with open(txt_file, "r") as f:
        lines = {l.strip().lower() for l in f if l.strip()}
    with open(txt_file, "w") as f:
        f.write("\n".join(sorted(lines)))
    print(f"Normalized {txt_file}: {len(lines)} unique entries")

print("Done.")

---
## 3 · TTS Synthesis + Voice Conversion

Uses Edge TTS, Google TTS, and/or Piper to generate wake-word audio samples.  
Optionally applies voice conversion with a Chatterbox ONNX model and reference voice WAVs.

| Key env vars | |
|---|---|
| `SYNTH_N` | Samples per wake word |
| `SYNTH_USE_EDGE` / `SYNTH_USE_GOOGLE` / `SYNTH_USE_PIPER` | Toggle engines |
| `SYNTH_VC_REFS` | Reference voices dir (empty = skip VC) |
| `SYNTH_VC_DEVICE` | `cuda` or `cpu` |
| `SYNTH_SEED` | Reproducibility seed |

In [None]:
!pip install --quiet --break-system-packages \
    ovos_tts_plugin_edge_tts \
    ovos_tts_plugin_google_tx \
    ovos_tts_plugin_piper \
    chatterbox_onnx \
    torch torchaudio rich click tqdm ovos-utils

In [None]:
import sys
import time
import warnings
from os.path import join
from typing import Dict
from uuid import uuid4
from pathlib import Path

import torch
import torchaudio as ta
from tqdm import tqdm

# ═══════════════════════════════════════════════════════════════════
# Collect metadata for all available TTS voice configurations
# ═══════════════════════════════════════════════════════════════════

def _collect_tts_metadata(
    lang: str,
    edge: bool = True,
    google: bool = True,
    piper: bool = False,
) -> List[Dict]:
    """
    Scans enabled TTS plugins and returns a list of metadata dicts.
    Each dict has keys: type, name, config.
    No TTS plugin is instantiated yet — this is just metadata.
    """
    lang_prefix = lang.split("-")[0].lower()
    metadata = []

    # --- Edge TTS voices ---
    if edge:
        from ovos_tts_plugin_edge_tts import VOICES
        rate_variations = (
            [f"+{r}%" for r in range(1, 35)]
            + [f"-{r}%" for r in range(1, 30)]
        )
        for locale, voices in VOICES.items():
            if not locale.startswith(lang_prefix):
                continue
            for v in voices:
                rate = random.choice(rate_variations)
                metadata.append({
                    "type": "edge",
                    "name": f"edge_{v}",
                    "config": {"voice": v, "rate": rate},
                })

    # --- Google Translate TTS ---
    if google:
        metadata.append({
            "type": "google",
            "name": f"google_{lang}",
            "config": {"lang": lang, "slow": random.choice([True, False])},
        })

    # --- Piper TTS ---
    if piper:
        from ovos_tts_plugin_piper.voice_models import get_available_voices
        from ovos_utils.lang import standardize_lang_tag
        for voice, data in get_available_voices(update_voices=False).items():
            l = standardize_lang_tag(data["language"]["code"])
            if not l.startswith(lang_prefix):
                continue
            n_speakers = len(data["speaker_id_map"])
            voices_list = (
                [f"{voice}#{i}" for i in range(n_speakers)]
                if n_speakers > 0
                else [voice]
            )
            for v in voices_list:
                metadata.append({
                    "type": "piper",
                    "name": f"piper_{l}_{v}",
                    "config": {"lang": l, "voice": v},
                })

    print(f"  Total TTS configurations: {len(metadata)}")
    return metadata


def _instantiate_plugin(meta: Dict):
    """Create a TTS plugin instance from a metadata dict."""
    if meta["type"] == "edge":
        from ovos_tts_plugin_edge_tts import EdgeTTSPlugin
        return EdgeTTSPlugin(config=meta["config"])
    if meta["type"] == "google":
        from ovos_tts_plugin_google_tx import GoogleTranslateTTS
        return GoogleTranslateTTS(config=meta["config"])
    if meta["type"] == "piper":
        from ovos_tts_plugin_piper import PiperTTSPlugin
        return PiperTTSPlugin(config=meta["config"])
    raise ValueError(f"Unknown TTS type: {meta['type']}")


# ═══════════════════════════════════════════════════════════════════
# Core synthesis + voice conversion loop
# ═══════════════════════════════════════════════════════════════════

def synthesize_and_convert(
    wake_word: str,
    lang: str,
    output_dir: str,
    reference_voices_dir: str,
    n: int,
    device: str = "cuda",
    edge: bool = True,
    google: bool = True,
    piper: bool = False,
):
    """
    Generate N samples, each with a randomly-chosen (TTS plugin, reference voice)
    combination.  Saves results to output_dir.
    """
    all_meta = _collect_tts_metadata(lang, edge=edge, google=google, piper=piper)
    if not all_meta:
        print("  No TTS metadata found — aborting.")
        return

    # --- Load VC model + reference voices (optional) ---
    vc_model = None
    ref_voices: List[str] = []

    if reference_voices_dir:
        base_path = Path(reference_voices_dir)
        ref_voices = [str(p) for p in base_path.glob("**/*.wav")]
        if not ref_voices:
            print(f"  No reference voices (.wav) found in {reference_voices_dir}")
            return
        print(f"  Found {len(ref_voices)} reference voices. Loading VC model...")
        from chatterbox_onnx import ChatterboxOnnx
        vc_model = ChatterboxOnnx(device=device)
        print("  VC model loaded.")
    else:
        print("  Voice conversion not enabled (SYNTH_VC_REFS empty).")

    os.makedirs(output_dir, exist_ok=True)
    success, fail = 0, 0
    start = time.time()

    for i in tqdm(range(n), total=n, desc="Synth + VC", unit="sample", file=sys.stdout):
        plugin_meta = random.choice(all_meta)

        # Instantiate TTS on the fly
        try:
            plugin = _instantiate_plugin(plugin_meta)
        except Exception as e:
            fail += 1
            continue

        base_name = str(uuid4())[10:]
        tts_path = join(output_dir, f"{base_name}_tts.wav")
        vc_path  = join(output_dir, f"{base_name}.wav")

        try:
            # 1) Generate TTS audio
            plugin.get_tts(
                wake_word.replace("_", " ").replace("-", " "),
                tts_path,
                lang=lang,
                voice=plugin_meta["config"].get("voice"),
            )

            # 2) Voice conversion (if model loaded)
            if vc_model is not None:
                ref_file = random.choice(ref_voices)
                try:
                    vc_model.voice_convert(
                        source_audio_path=tts_path,
                        target_voice_path=ref_file,
                        output_file_name=vc_path,
                    )
                    success += 1
                    os.remove(tts_path)  # clean up intermediate TTS file
                except Exception as e:
                    fail += 1
            else:
                success += 1

        except Exception as e:
            fail += 1

    elapsed = time.time() - start
    print(f"  Completed {success} ok / {fail} failed in {elapsed:.1f}s")


print("TTS + VC functions ready.")

In [None]:
# ═══════════════════════════════════════════════════════════════════
# Run TTS synthesis (+ optional VC) for each wake word
# ═══════════════════════════════════════════════════════════════════

warnings.filterwarnings("ignore")

use_edge   = _flag("SYNTH_USE_EDGE")
use_google = _flag("SYNTH_USE_GOOGLE")
use_piper  = _flag("SYNTH_USE_PIPER")

if not (use_edge or use_google or use_piper):
    print(
        "No TTS engine enabled — set at least one of "
        "SYNTH_USE_EDGE / SYNTH_USE_GOOGLE / SYNTH_USE_PIPER to 1."
    )
else:
    seed_val = _env("SYNTH_SEED")
    if seed_val:
        random.seed(int(seed_val))
        print(f"Using random seed: {seed_val}")

    synth_out = _env("SYNTH_OUTPUT_DIR")
    synth_n   = int(_env("SYNTH_N", "900"))
    vc_refs   = _env("SYNTH_VC_REFS") or None
    vc_device = _env("SYNTH_VC_DEVICE", "cuda")

    for ww in WW_LIST:
        ww_out = os.path.join(synth_out, ww)
        print(f"\n{'='*50}")
        print(f"Synthesising: {ww}  ->  {ww_out}")
        print(f"{'='*50}")

        synthesize_and_convert(
            wake_word=ww,
            lang=LANG,
            output_dir=ww_out,
            reference_voices_dir=vc_refs,
            n=synth_n,
            device=vc_device,
            edge=use_edge,
            google=use_google,
            piper=use_piper,
        )

    print("\nTTS Synthesis complete.")

---
## 4 · Voice Cloning Augmentation

Revoices an existing dataset (e.g. output of Stage 3) with random speaker references using `chatterbox_bulk_vc` (a CLI tool installed via the `chatterbox_onnx` package).

| Key env vars | |
|---|---|
| `VC_AUG_VOICES_PATH` | Directory of reference voice WAVs |
| `VC_AUG_N_RANDOM` | Random voices per sample |

In [None]:
# -- Reinstall task-specific deps --
!pip install --quiet --break-system-packages chatterbox_onnx

In [None]:
import subprocess

vc_aug_voices = _env("VC_AUG_VOICES_PATH")
vc_aug_out    = _env("VC_AUG_OUTPUT_DIR")
synth_out     = _env("SYNTH_OUTPUT_DIR")
n_random      = _env("VC_AUG_N_RANDOM", "1")

if not vc_aug_voices:
    print("VC_AUG_VOICES_PATH not set — skipping voice cloning augmentation.")
else:
    for ww in WW_LIST:
        ww_in  = os.path.join(synth_out, ww)
        ww_out = os.path.join(vc_aug_out, ww)
        os.makedirs(ww_out, exist_ok=True)

        cmd = [
            "chatterbox_bulk_vc",
            "--output-path", ww_out,
            "--audios-path", ww_in,
            "--voices-path", vc_aug_voices,
            "--n-random", n_random,
        ]
        print(f">>> {' '.join(cmd)}")
        subprocess.run(cmd, check=True)

    print("\nVoice cloning augmentation complete.")

---
## 5 · VC TTS Synthesis

Direct TTS via `chatterbox_bulk_tts` (CLI from the `chatterbox_onnx` package) with multiple exaggeration levels and voice references.

| Key env vars | |
|---|---|
| `VC_TTS_VOICES_PATH` | Voice references directory |
| `VC_TTS_EXAGG_RANGE` | Exaggeration range (`min max step`) |

In [None]:
!pip install --quiet --break-system-packages chatterbox_onnx

In [None]:
vc_tts_voices = _env("VC_TTS_VOICES_PATH")
vc_tts_out    = _env("VC_TTS_OUTPUT_DIR")
exagg_range   = _env("VC_TTS_EXAGG_RANGE", "0.4 0.9 0.1").split()

if not vc_tts_voices:
    print("VC_TTS_VOICES_PATH not set — skipping VC TTS synthesis.")
else:
    for ww in WW_LIST:
        ww_phrase = ww.replace("_", " ").replace("-", " ")
        ww_out = os.path.join(vc_tts_out, ww)
        os.makedirs(ww_out, exist_ok=True)

        cmd = [
            "chatterbox_bulk_tts",
            "--output-path", ww_out,
            "--voices-path", vc_tts_voices,
            "--exaggeration-range", *exagg_range,
            ww_phrase,
        ]
        print(f">>> {' '.join(cmd)}")
        subprocess.run(cmd, check=True)

    print("\nVC TTS synthesis complete.")

---
## 6 · Training Data Augmentation

Applies stochastic augmentations to preprocessed training data:
background noise mixing, music mixing, competing speech, reverb, pitch shifting, and speed perturbation.

Reads a `metadata.csv` (`path,label`) from `AUG_INPUT_DIR` and writes augmented copies + new metadata to `AUG_OUTPUT_DIR`.

| Key env vars | |
|---|---|
| `AUG_INPUT_DIR` | Preprocessed dataset with `metadata.csv` |
| `AUG_BG_NOISE_DIR` / `AUG_MUSIC_DIR` / `AUG_BG_SPEECH_DIR` / `AUG_MIC_NOISE_DIR` | Noise sources |
| `AUG_RIR_DIR` | Room impulse responses |
| `AUG_SNR_MIN` / `AUG_SNR_MAX` | SNR range (dB) |
| `AUG_PITCH_MIN/MAX`, `AUG_SPEED_MIN/MAX` | Transform ranges |
| `AUG_PROB` | Per-sample augmentation probability |

In [None]:
!pip install --quiet --break-system-packages librosa soundfile numpy click tqdm

In [None]:
import numpy as np
import soundfile as sf
import librosa
from pathlib import Path

# ═══════════════════════════════════════════════════════════════════
# Audio utility functions  (shared by Stages 6 and 7)
# ═══════════════════════════════════════════════════════════════════

def load_audio_mono(path, sr=16000):
    """Load an audio file, convert to mono, and resample to target sr."""
    try:
        wav, orig_sr = sf.read(str(path))
    except Exception as e:
        print(f"Error reading {path}: {e}")
        return None
    if wav.ndim > 1:
        wav = np.mean(wav, axis=1)
    if orig_sr != sr:
        wav = librosa.resample(wav.astype(np.float32), orig_sr=orig_sr, target_sr=sr)
    return wav.astype(np.float32)


def save_wav(path, wav, sr=16000):
    """Save a numpy array as a WAV file."""
    Path(path).parent.mkdir(parents=True, exist_ok=True)
    sf.write(str(path), wav, sr)


def mix_audio(clean, bg, snr_db):
    """
    Mix a background track into clean audio at a given SNR (dB).
    Background is tiled/cropped to match the length of the clean audio.
    """
    clean_len = len(clean)
    if len(bg) < clean_len:
        bg = np.tile(bg, int(np.ceil(clean_len / len(bg))))
    start = random.randint(0, len(bg) - clean_len)
    bg = bg[start:start + clean_len]

    rms_clean = np.sqrt(np.mean(clean ** 2) + 1e-9)
    rms_bg    = np.sqrt(np.mean(bg ** 2) + 1e-9)
    desired   = rms_clean / (10 ** (snr_db / 20.0))
    if rms_bg > 0:
        bg = bg * (desired / rms_bg)

    mixed = clean + bg
    peak = np.max(np.abs(mixed))
    if peak > 1.0:
        mixed = mixed / peak
    return mixed.astype(np.float32)


def apply_reverb(wav, rir, attenuation=0.5):
    """Convolve with a Room Impulse Response, preserving original RMS (attenuated)."""
    out = np.convolve(wav, rir)[:len(wav)]
    rms_wav = np.sqrt(np.mean(wav ** 2) + 1e-9)
    rms_out = np.sqrt(np.mean(out ** 2) + 1e-9)
    out = out * (rms_wav / rms_out) * attenuation
    return out.astype(np.float32)


def pitch_shift(wav, sr, n_steps):
    """Pitch-shift audio by n_steps semitones."""
    return librosa.effects.pitch_shift(wav, sr=sr, n_steps=n_steps).astype(np.float32)


def speed_perturb(wav, factor):
    """Time-stretch audio by the given speed factor."""
    return librosa.effects.time_stretch(wav, rate=factor).astype(np.float32)


def collect_audio_files(base_folder):
    """Recursively collect audio files (.wav .flac .mp3 .m4a .ogg)."""
    exts = [".wav", ".flac", ".mp3", ".m4a", ".ogg"]
    if not base_folder:
        return []
    p = Path(base_folder)
    if not p.exists():
        print(f"Warning: folder not found: {base_folder}")
        return []
    files = []
    for ext in exts:
        files.extend(p.rglob(f"*{ext}"))
    return sorted(files)


print("Audio utilities ready.")

In [None]:
# ═══════════════════════════════════════════════════════════════════
# Run training augmentation
# ═══════════════════════════════════════════════════════════════════

aug_input_dir = _env("AUG_INPUT_DIR")
aug_out_dir   = _env("AUG_OUTPUT_DIR")

if not aug_input_dir:
    print("AUG_INPUT_DIR not set — skipping training augmentation.")
else:
    input_dir = Path(aug_input_dir)
    out_dir   = Path(aug_out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    metadata_in  = input_dir / "metadata.csv"
    metadata_out = out_dir / "metadata.csv"

    if not metadata_in.exists():
        raise FileNotFoundError(f"metadata.csv not found in {input_dir}")

    # Collect augmentation source files
    bg_files      = collect_audio_files(_env("AUG_BG_NOISE_DIR"))
    music_files   = collect_audio_files(_env("AUG_MUSIC_DIR"))
    speech_files  = collect_audio_files(_env("AUG_BG_SPEECH_DIR"))
    rir_files     = collect_audio_files(_env("AUG_RIR_DIR"))
    mic_files     = collect_audio_files(_env("AUG_MIC_NOISE_DIR"))

    snr_min   = float(_env("AUG_SNR_MIN", "0"))
    snr_max   = float(_env("AUG_SNR_MAX", "20"))
    pitch_min = float(_env("AUG_PITCH_MIN", "-1.0"))
    pitch_max = float(_env("AUG_PITCH_MAX", "1.0"))
    speed_min = float(_env("AUG_SPEED_MIN", "0.95"))
    speed_max = float(_env("AUG_SPEED_MAX", "1.05"))
    prob      = float(_env("AUG_PROB", "0.9"))

    with open(metadata_in, "r", encoding="utf-8") as f:
        source_data = [line.strip().split(",") for line in f]

    print(f"Processing {len(source_data)} samples from {metadata_in}")
    print(f"Augmentation probability: {prob*100:.0f}%")

    avail = []
    if bg_files:     avail.append(f"bg_noise ({len(bg_files)})")
    if music_files:  avail.append(f"music ({len(music_files)})")
    if speech_files: avail.append(f"speech ({len(speech_files)})")
    if rir_files:    avail.append(f"rir ({len(rir_files)})")
    if mic_files:    avail.append(f"mic ({len(mic_files)})")
    avail += ["pitch_shift", "speed_perturb"]
    print(f"Available augmentations: {', '.join(avail)}")

    augmented = []
    aug_count = 0

    for path_str, label in tqdm(source_data, desc="Augmenting", unit="sample"):
        src_path = Path(path_str)
        if not src_path.is_absolute():
            src_path = input_dir.parent / src_path

        try:
            wav = load_audio_mono(src_path)
        except Exception as e:
            tqdm.write(f"  Could not load {src_path}: {e}")
            continue
        if wav is None:
            continue

        was_augmented = False

        if random.random() < prob:
            # 1. Background noise (50% chance)
            if bg_files and random.random() < 0.5:
                bg = load_audio_mono(random.choice(bg_files))
                if bg is not None:
                    wav = mix_audio(wav, bg, random.uniform(snr_min, snr_max))
                    was_augmented = True

            # Mic-specific noise (80% chance)
            if mic_files and random.random() < 0.8:
                mic = load_audio_mono(random.choice(mic_files))
                if mic is not None:
                    wav = mix_audio(wav, mic, random.uniform(snr_min, snr_max))
                    was_augmented = True

            # 2. Music mixing (50% chance, louder SNR 0-10 dB)
            if music_files and random.random() < 0.5:
                music = load_audio_mono(random.choice(music_files))
                if music is not None:
                    wav = mix_audio(wav, music, random.uniform(0.0, 10.0))
                    was_augmented = True

            # 3. Background speech (60% chance, quieter SNR 10-25 dB)
            if speech_files and random.random() < 0.6:
                speech = load_audio_mono(random.choice(speech_files))
                if speech is not None:
                    wav = mix_audio(wav, speech, random.uniform(10.0, 25.0))
                    was_augmented = True

            # 4. Reverb (30% chance)
            if rir_files and random.random() < 0.3:
                rir = load_audio_mono(random.choice(rir_files))
                if rir is not None:
                    wav = apply_reverb(wav, rir)
                    was_augmented = True

            # 5. Pitch shift (30% chance)
            if random.random() < 0.3:
                wav = pitch_shift(wav, 16000, random.uniform(pitch_min, pitch_max))
                was_augmented = True

            # 6. Speed perturbation (30% chance)
            if random.random() < 0.3:
                wav = speed_perturb(wav, random.uniform(speed_min, speed_max))
                was_augmented = True

        if was_augmented:
            aug_count += 1

        # Peak-normalize and save
        wav = wav / (np.max(np.abs(wav)) + 1e-9)
        rel_path = Path("wakes_aug" if int(label) == 1 else "negatives_aug") / (src_path.stem + "_aug.wav")
        dst_path = out_dir / rel_path
        save_wav(dst_path, wav)
        augmented.append((str(dst_path), label))

    with metadata_out.open("w", encoding="utf-8") as f:
        for p, lbl in augmented:
            f.write(f"{p},{lbl}\n")

    print(f"\n--- Augmentation Complete ---")
    print(f"  Generated {len(augmented)} samples ({aug_count} received transformations)")
    print(f"  Metadata: {metadata_out}")

---
## 7 · Benchmark Dataset Generation

Creates structured test sets at fixed SNRs and with acoustic variance (pitch shifts, speed perturbations, reverb) for evaluating false-negative rates.

| Key env vars | |
|---|---|
| `BENCH_CLEAN_DIR` | Clean wake-word samples |
| `BENCH_NOISE_DIR` / `BENCH_MUSIC_DIR` / `BENCH_SPEECH_DIR` | Noise sources |
| `BENCH_RIR_DIR` | RIR files |
| `BENCH_SNRS` | Comma-separated SNR list (e.g. `20,15,10,5,0`) |

In [None]:
!pip install --quiet --break-system-packages librosa soundfile numpy click tqdm

In [None]:
# ═══════════════════════════════════════════════════════════════════
# Benchmark augmentation: deterministic test sets
# ═══════════════════════════════════════════════════════════════════

def _bench_mix(clean_files, bg_files, snr_list, output_dir, noise_type):
    """
    For each clean sample, mix with a random background file
    at every SNR in snr_list.  Creates one output per (sample, SNR).
    """
    if not bg_files:
        print(f"  Skipping {noise_type}: no background files.")
        return
    if not snr_list:
        return

    print(f"  Mixing {noise_type} at SNRs: {snr_list} dB")
    for clean_path in tqdm(clean_files, desc=f"  {noise_type}"):
        clean_wav = load_audio_mono(clean_path)
        if clean_wav is None:
            continue
        bg_path = random.choice(bg_files)
        bg_wav = load_audio_mono(bg_path)
        if bg_wav is None:
            continue

        stem = clean_path.stem
        for snr in snr_list:
            mixed = mix_audio(clean_wav, bg_wav, snr)
            out_name = f"{stem}_{noise_type}_{int(snr)}db.wav"
            save_wav(output_dir / noise_type.lower() / out_name, mixed)


def _bench_variance(clean_files, rir_files, output_dir):
    """
    Apply deterministic pitch, speed, and reverb perturbations to clean samples.
    """
    PITCH_STEPS   = [-3, -2, -1, 1, 2, 3]  # semitones
    SPEED_FACTORS = [0.9, 1.1]

    print("  Applying acoustic variance (pitch / speed / reverb)")
    for clean_path in tqdm(clean_files, desc="  Variance"):
        clean_wav = load_audio_mono(clean_path)
        if clean_wav is None:
            continue
        stem = clean_path.stem

        # Pitch shifts
        for step in PITCH_STEPS:
            out = pitch_shift(clean_wav, 16000, step)
            save_wav(output_dir / "variance" / f"{stem}_pitch_{step}.wav", out)

        # Speed perturbations
        for factor in SPEED_FACTORS:
            out = speed_perturb(clean_wav, factor)
            save_wav(output_dir / "variance" / f"{stem}_speed_{factor:.1f}.wav", out)

        # Reverb (up to 3 randomly-selected RIRs per sample)
        if rir_files:
            selected = random.sample(rir_files, k=min(3, len(rir_files)))
            for rir_path in selected:
                rir_wav = load_audio_mono(rir_path)
                if rir_wav is None:
                    continue
                out = apply_reverb(clean_wav, rir_wav)
                rir_id = rir_path.stem[:8]
                save_wav(output_dir / "variance" / f"{stem}_reverb_{rir_id}.wav", out)


print("Benchmark functions ready.")

In [None]:
# ═══════════════════════════════════════════════════════════════════
# Run benchmark generation
# ═══════════════════════════════════════════════════════════════════

bench_clean_dir = _env("BENCH_CLEAN_DIR")
bench_out_dir   = Path(_env("BENCH_OUTPUT_DIR"))

if not bench_clean_dir:
    print("BENCH_CLEAN_DIR not set — skipping benchmark generation.")
else:
    clean_files  = collect_audio_files(bench_clean_dir)
    noise_files  = collect_audio_files(_env("BENCH_NOISE_DIR"))
    music_files  = collect_audio_files(_env("BENCH_MUSIC_DIR"))
    speech_files = collect_audio_files(_env("BENCH_SPEECH_DIR"))
    rir_files    = collect_audio_files(_env("BENCH_RIR_DIR"))

    snr_str  = _env("BENCH_SNRS", "20,15,10,5,0")
    snr_list = [float(s.strip()) for s in snr_str.split(",") if s.strip()]

    if not clean_files:
        print("No clean files found — aborting.")
    else:
        bench_out_dir.mkdir(parents=True, exist_ok=True)
        print(f"Found {len(clean_files)} clean files. Output -> {bench_out_dir}")

        _bench_mix(clean_files, noise_files,  snr_list, bench_out_dir, "Noise")
        _bench_mix(clean_files, music_files,  snr_list, bench_out_dir, "Music")
        _bench_mix(clean_files, speech_files, snr_list, bench_out_dir, "Speech")
        _bench_variance(clean_files, rir_files, bench_out_dir)

        print(f"\nBenchmark generation complete. Files saved under {bench_out_dir}")

---
## 8 · Output Summary

Quick sanity check: list all output directories and count generated files.

In [None]:
dirs_to_check = {
    "Adversarial word lists": _env("ADV_OUTPUT_DIR"),
    "TTS Synth output":      _env("SYNTH_OUTPUT_DIR"),
    "VC augmented output":   _env("VC_AUG_OUTPUT_DIR"),
    "VC TTS output":         _env("VC_TTS_OUTPUT_DIR"),
    "Training augmented":    _env("AUG_OUTPUT_DIR"),
    "Benchmark test sets":   _env("BENCH_OUTPUT_DIR"),
}

print("=" * 60)
print("  OUTPUT SUMMARY")
print("=" * 60)

for label, d in dirs_to_check.items():
    p = Path(d)
    if p.exists():
        wav_n = len(list(p.rglob("*.wav")))
        txt_n = len(list(p.rglob("*.txt")))
        csv_n = len(list(p.rglob("*.csv")))
        parts = []
        if wav_n: parts.append(f"{wav_n} wav")
        if txt_n: parts.append(f"{txt_n} txt")
        if csv_n: parts.append(f"{csv_n} csv")
        total = wav_n + txt_n + csv_n
        print(f"  {label:30s}  {total:>6} files  ({', '.join(parts) if parts else 'empty'})")
    else:
        print(f"  {label:30s}  (not created)")

print("=" * 60)