# Training a microWakeWord Model

**The model generated will most likely not be usable for everyday use; it may be difficult to trigger or falsely activates too frequently. You will most likely have to experiment with many different settings to obtain a decent model!**

At the end of this notebook, you will be able to download a tflite file. To use this in ESPHome, you need to write a model manifest JSON file. See the [ESPHome documentation](https://esphome.io/components/micro_wake_word) for the details and the [model repo](https://github.com/esphome/micro-wake-word-models/tree/main/models/v2) for examples.

In [None]:

import os

from IPython.display import Audio
from urllib.parse import urlparse

# target_word = 'okay_GLAD_ohss'
target_word = "oʊˈkeɪ ɡlˈæ dˈoʊs↑"
piper_model_dir = "piper-models/"
piper_models = [
    {
        "model_url": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/libritts_r/medium/en_US-libritts_r-medium.onnx?download=true",
        "metadata_url": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/libritts_r/medium/en_US-libritts_r-medium.onnx.json?download=true",
    },
]
SAMPLES_DIR = "data/generated_samples"
LENGTH_SCALES = 0.8
NOISE_SCALE = 0.2

os.makedirs(piper_model_dir, exist_ok=True)
for piper_model in piper_models: 
    path = urlparse(piper_model["model_url"]).path
    model_path = os.path.basename(path)
    model_path = os.path.join(piper_model_dir, model_path)
    metadata_path = model_path + ".json"
    piper_model["model_path"] = model_path
    piper_model["metadata_path"] = metadata_path
    if not os.path.exists(model_path):
        pass
        !wget -O {model_path} "{model_url}"
        !wget -O {metadata_path} "{model_metadata_url}"


In [None]:

# Generates 1 sample of the target word for manual verification.

!uv run libs/piper-sample-generator/generate_samples.py "{target_word}" \
--phoneme-input \
--length-scales {LENGTH_SCALES} \
--noise-scale-ws {NOISE_SCALE} \
--max-samples 1 \
--batch-size 1 \
--output-dir {SAMPLES_DIR} \
--model {piper_models[0]["model_path"]}

Audio(f"{SAMPLES_DIR}/0.wav", autoplay=True)

In [None]:
# Generates a larger amount of wake word samples.
# Start here when trying to improve your model.
# See https://github.com/rhasspy/piper-sample-generator for the full set of
# parameters. In particular, experiment with noise-scales and noise-scale-ws,
# generating negative samples similar to the wake word, and generating many more
# wake word samples, possibly with different phonetic pronunciations.

!uv run libs/piper-sample-generator/generate_samples.py "{target_word}" \
--phoneme-input \
--length-scales {LENGTH_SCALES} \
--noise-scale-ws {NOISE_SCALE} \
--max-samples 100000 \
--batch-size 100 \
--output-dir {SAMPLES_DIR} \
--model {model_path}

In [None]:
# Downloads audio data for augmentation. This can be slow!
# Borrowed from openWakeWord's automatic_model_training.ipynb, accessed March 4, 2024

# **Important note!** The data downloaded here has a mixture of difference
# licenses and usage restrictions. As such, any custom models trained with this
# data should be considered as appropriate for **non-commercial** personal use only.


import datasets
import scipy
import os

import numpy as np

from pathlib import Path
from tqdm.notebook import tqdm

SOURCE_URL = "davidscripka/MIT_environmental_impulse_responses"
OUTPUT_DIR = "data/augmentations/mit_rirs"
SPLIT = "train"
SAMPLE_RATE = 16_000

os.makedirs(OUTPUT_DIR, exist_ok=True)
builder = datasets.load_dataset_builder(SOURCE_URL)
assert builder.info.splits is not None
count = builder.info.splits[SPLIT].num_examples
dataset = builder.as_streaming_dataset(split=SPLIT)
assert isinstance(dataset, datasets.IterableDataset)
dataset = dataset.decode(False)
audio_feature = datasets.Audio(sampling_rate=SAMPLE_RATE)

# Save clips to 16-bit PCM wav files
for row in tqdm(dataset, total=count):
    decoder = audio_feature.decode_example(row["audio"])
    samples = decoder.get_all_samples()
    data = samples.data.detach().cpu().numpy()
    data = np.mean(data, axis=tuple(range(data.ndim - 1))) if data.ndim > 1 else data
    data = (data * 32767).astype(np.int16)
    name = Path(row["audio"]["path"]).with_suffix(".wav").name
    path = os.path.join(OUTPUT_DIR, name)
    scipy.io.wavfile.write(path, samples.sample_rate, data)


In [None]:

# Download noise and background audio

# Audioset Dataset (https://research.google.com/audioset/dataset/index.html)
# Download one part of the audioset .tar files, extract, and convert to 16khz
# For full-scale training, it's recommended to download the entire dataset from
# https://huggingface.co/datasets/agkphysics/AudioSet, and
# even potentially combine it with other background noise datasets (e.g., FSD50k, Freesound, etc.)

from urllib.parse import urlparse

URL = "https://huggingface.co/datasets/agkphysics/AudioSet/resolve/main/data/bal_train09.tar"
TEMP_DIR = "data/augmentations/audioset"
OUTPUT_DIR = "data/augmentations/audioset_16k"
SAMPLE_RATE = 16_000

file_name = Path(urlparse(URL).path).name
temp_path = os.path.join(TEMP_DIR, file_name)

# Download the data
os.makedirs(TEMP_DIR, exist_ok=True)
!wget -O {temp_path} {URL}
!cd {TEMP_DIR} && tar -xf {file_name}

# Save clips to 16-bit PCM wav files
os.makedirs(OUTPUT_DIR, exist_ok=True)
paths = [str(i) for i in Path(f"{TEMP_DIR}/audio").glob("**/*.flac")]
audioset_dataset = datasets.Dataset.from_dict({"audio": paths, "path": paths})
audioset_dataset = audioset_dataset.cast_column("audio", datasets.Audio(sampling_rate=16000))
for row in tqdm(audioset_dataset, total=len(paths)):
    data = (row["audio"]["array"] * 32767).astype(np.int16)
    name = Path(row["path"]).with_suffix(".wav").name
    path = os.path.join(OUTPUT_DIR, name)
    scipy.io.wavfile.write(path, SAMPLE_RATE, data)


In [None]:
# Download noise and background audio

# Free Music Archive dataset
# https://github.com/mdeff/fma
# (Third-party mchl914 extra small set)

from urllib.parse import urlparse

URL = "https://huggingface.co/datasets/mchl914/fma_xsmall/resolve/main/fma_xs.zip"
TEMP_DIR = "data/augmentations/fma"
OUTPUT_DIR = "data/augmentations/fma_16k"
SAMPLE_RATE = 16_000

file_name = Path(urlparse(URL).path).name
temp_path = os.path.join(TEMP_DIR, file_name)

# Download the data
os.makedirs(TEMP_DIR, exist_ok=True)
!wget -O {temp_path} {URL}
!cd {TEMP_DIR} && unzip -oq {file_name}

# Save clips to 16-bit PCM wav files
os.makedirs(OUTPUT_DIR, exist_ok=True)
paths = [str(i) for i in Path(f"{TEMP_DIR}/fma_small").glob("**/*.mp3")]
audioset_dataset = datasets.Dataset.from_dict({"audio": paths, "path": paths})
audioset_dataset = audioset_dataset.cast_column("audio", datasets.Audio(sampling_rate=16000))
for row in tqdm(audioset_dataset, total=len(paths)):
    data = (row["audio"]["array"] * 32767).astype(np.int16)
    name = Path(row["path"]).with_suffix(".wav").name
    path = os.path.join(OUTPUT_DIR, name)
    scipy.io.wavfile.write(path, SAMPLE_RATE, data)


In [None]:
# Sets up the augmentations.
# To improve your model, experiment with these settings and use more sources of
# background clips.

from microwakeword.audio.augmentation import Augmentation
from microwakeword.audio.clips import Clips
from microwakeword.audio.spectrograms import SpectrogramGeneration

clips = Clips(
    input_directory=SAMPLES_DIR,
    file_pattern="*.wav",
    max_clip_duration_s=None,
    remove_silence=False,
    random_split_seed=10,
    split_count=0.1,
)
augmenter = Augmentation(
    augmentation_duration_s=3.2,
    augmentation_probabilities={
        "SevenBandParametricEQ": 0.1,
        "TanhDistortion": 0.1,
        "PitchShift": 0.1,
        "BandStopFilter": 0.1,
        "AddColorNoise": 0.1,
        "AddBackgroundNoise": 0.75,
        "Gain": 1.0,
        "RIR": 0.5,
    },
    impulse_paths=["data/augmentations/mit_rirs"],
    background_paths=["data/augmentations/fma_16k", "data/augmentations/audioset_16k"],
    background_min_snr_db=-5,
    background_max_snr_db=10,
    min_jitter_s=0.195,
    max_jitter_s=0.205,
)


In [None]:
# Augment a random clip and play it back to verify it works well

from IPython.display import Audio
from microwakeword.audio.audio_utils import save_clip

random_clip = clips.get_random_clip()
augmented_clip = augmenter.augment_clip(random_clip)
save_clip(augmented_clip, "data/augmented_clip.wav")

Audio("data/augmented_clip.wav", autoplay=True)

In [None]:
# Augment samples and save the training, validation, and testing sets.
# Validating and testing samples generated the same way can make the model
# benchmark better than it performs in real-word use. Use real samples or TTS
# samples generated with a different TTS engine to potentially get more accurate
# benchmarks.

import os
from typing import TypedDict
from mmap_ninja.ragged import RaggedMmap


class Split(TypedDict):
    name: str
    path: str
    repetition: int
    slide_frames: int
    step_ms: int


OUTPUT_DIR = "data/generated_augmented_features"
splits: list[Split] = [
    {
        "name": "train",
        "path": "training",
        "repetition": 2,
        # Uses the same spectrogram repeatedly, just shifted over by one frame.
        # This simulates the streaming inferences while training/validating in nonstreaming mode.
        "slide_frames": 10,
        "step_ms": 10,
    },
    {
        "name": "validation",
        "path": "validation",
        "repetition": 1,
        "slide_frames": 10,
        "step_ms": 10,
    },
    {
        "name": "test",
        "path": "testing",
        "repetition": 1,
        # The testing set uses the streaming version of the model, so no artificial repetition is necessary
        "slide_frames": 1,
        "step_ms": 10,
    },
]

for split in splits:
    output_path = os.path.join(OUTPUT_DIR, split["path"])
    os.makedirs(output_path, exist_ok=True)

    spectrograms = SpectrogramGeneration(
        clips=clips,
        augmenter=augmenter,
        slide_frames=split["slide_frames"],
        step_ms=split["step_ms"],
    )
    generator = spectrograms.spectrogram_generator(
        split=split["name"], repeat=split["repetition"]
    )

    RaggedMmap.from_generator(
        out_dir=os.path.join(output_path, "wakeword_mmap"),
        sample_generator=generator,
        batch_size=100,
        verbose=True,
    )

In [None]:
# Downloads pre-generated spectrogram features (made for microWakeWord in
# particular) for various negative datasets. This can be slow!

OUTPUT_DIR = 'data/negative_datasets'
os.makedirs(OUTPUT_DIR, exist_ok=True)
ROOT_URL = "https://huggingface.co/datasets/kahrendt/microwakeword/resolve/main/"
FILE_NAMES = ['dinner_party.zip', 'dinner_party_eval.zip', 'no_speech.zip', 'speech.zip']
for file_name in FILE_NAMES:
    link = ROOT_URL + file_name
    zip_path = f"{OUTPUT_DIR}/{file_name}"
    !wget -O {zip_path} {link}
    !unzip -q {zip_path} -d {OUTPUT_DIR}

In [None]:
# Save a yaml config that controls the training process
# These hyperparamters can make a huge different in model quality.
# Experiment with sampling and penalty weights and increasing the number of
# training steps.

import yaml
import os

config = {
    "window_step_ms": 10,
    "train_dir": "output",
    # Each feature_dir should have at least one of the following folders with this structure:
    #  training/
    #    ragged_mmap_folders_ending_in_mmap
    #  testing/
    #    ragged_mmap_folders_ending_in_mmap
    #  testing_ambient/
    #    ragged_mmap_folders_ending_in_mmap
    #  validation/
    #    ragged_mmap_folders_ending_in_mmap
    #  validation_ambient/
    #    ragged_mmap_folders_ending_in_mmap
    #
    #  sampling_weight: Weight for choosing a spectrogram from this set in the batch
    #  penalty_weight: Penalizing weight for incorrect predictions from this set
    #  truth: Boolean whether this set has positive samples or negative samples
    #  truncation_strategy = If spectrograms in the set are longer than necessary for training, how are they truncated
    #       - random: choose a random portion of the entire spectrogram - useful for long negative samples
    #       - truncate_start: remove the start of the spectrogram
    #       - truncate_end: remove the end of the spectrogram
    #       - split: Split the longer spectrogram into separate spectrograms offset by 100 ms. Only for ambient sets
    "features": [
        {
            "features_dir": "data/generated_augmented_features",
            "sampling_weight": 3.0,
            "penalty_weight": 1.0,
            "truth": True,
            "truncation_strategy": "truncate_start",
            "type": "mmap",
        },
        {
            "features_dir": "data/negative_datasets/speech",
            "sampling_weight": 10.0,
            "penalty_weight": 1.0,
            "truth": False,
            "truncation_strategy": "random",
            "type": "mmap",
        },
        {
            "features_dir": "data/negative_datasets/dinner_party",
            "sampling_weight": 10.0,
            "penalty_weight": 1.0,
            "truth": False,
            "truncation_strategy": "random",
            "type": "mmap",
        },
        {
            "features_dir": "data/negative_datasets/no_speech",
            "sampling_weight": 5.0,
            "penalty_weight": 1.0,
            "truth": False,
            "truncation_strategy": "random",
            "type": "mmap",
        },
        {
            # Only used for validation and testing
            "features_dir": "data/negative_datasets/dinner_party_eval",
            "sampling_weight": 0.0,
            "penalty_weight": 1.0,
            "truth": False,
            "truncation_strategy": "split",
            "type": "mmap",
        },
    ],
    # Number of training steps in each iteration - various other settings are configured as lists that correspond to different steps
    "training_steps": [100_000],
    # Penalizing weight for incorrect class predictions - lists that correspond to training steps
    "positive_class_weight": [1],
    "negative_class_weight": [20],
    # Learning rates for Adam optimizer - list that corresponds to training steps
    "learning_rates": [0.001],
    "batch_size": 128,
    # SpecAugment - list that corresponds to training steps
    "time_mask_max_size": [0],
    # SpecAugment - list that corresponds to training steps
    "time_mask_count": [0],
    # SpecAugment - list that corresponds to training steps
    "freq_mask_max_size": [0],
    # SpecAugment - list that corresponds to training steps
    "freq_mask_count": [0],
    # Test the validation sets after every this many steps
    "eval_step_interval": 500,
    # Maximum length of wake word that the streaming model will accept
    "clip_duration_ms": 1500,
    # The best model weights are chosen first by minimizing the specified minimization metric below the specified target_minimization
    # Once the target has been met, it chooses the maximum of the maximization metric. Set 'minimization_metric' to None to only maximize
    # Available metrics:
    #   - "loss" - cross entropy error on validation set
    #   - "accuracy" - accuracy of validation set
    #   - "recall" - recall of validation set
    #   - "precision" - precision of validation set
    #   - "false_positive_rate" - false positive rate of validation set
    #   - "false_negative_rate" - false negative rate of validation set
    #   - "ambient_false_positives" - count of false positives from the split validation_ambient set
    #   - "ambient_false_positives_per_hour" - estimated number of false positives per hour on the split validation_ambient set
    "target_minimization": 0.9,
    # Set to None to disable
    "minimization_metric": None,
    "maximization_metric": "average_viable_recall",
}

with open(os.path.join("training_parameters.yaml"), "w") as file:
    documents = yaml.dump(config, file)

In [None]:
# Trains a model. When finished, it will quantize and convert the model to a
# streaming version suitable for on-device detection.
# It will resume if stopped, but it will start over at the configured training
# steps in the yaml file.
# Change --train 0 to only convert and test the best-weighted model.


!uv run python -m microwakeword.model_train_eval \
    --training_config='training_parameters.yaml' \
    --train 0 \
    --restore_checkpoint 1 \
    --test_tf_nonstreaming 0 \
    --test_tflite_nonstreaming 0 \
    --test_tflite_nonstreaming_quantized 0 \
    --test_tflite_streaming 0 \
    --test_tflite_streaming_quantized 1 \
    --use_weights "best_weights" \
    mixednet \
    --pointwise_filters "64,64,64,64" \
    --repeat_in_block  "1, 1, 1, 1" \
    --mixconv_kernel_sizes '[5], [7,11], [9,15], [23]' \
    --residual_connection "0,0,0,0" \
    --first_conv_filters 32 \
    --first_conv_kernel_size 5 \
    --stride 3

# For docker workflow:
# docker run --gpus all \
#   --rm -i --device=/dev/dxg \
#   -v /usr/lib/wsl:/usr/lib/wsl:ro \
#   -v "$(pwd)":"$(pwd)" \
#   -w "$(pwd)" \
#   tensorflow/tensorflow:2.16.2-gpu \
#   python train.py