In [None]:
import os
import kagglehub

dataset_dir = "./dataset"
exist_dataset_dir =  os.path.exists(dataset_dir)

# Download latest version
path = kagglehub.dataset_download("jhonromero26/voice-commands-spanish")

if not exist_dataset_dir or path:
    !rm -rf {dataset_dir}
    !cp -r {path}/dataset .

In [None]:
import datasets
import scipy
import os

import numpy as np

from pathlib import Path
from tqdm import tqdm

## Download MIR RIR data
ouput_folder = "./noising_dataset"
output_dir = f"{ouput_folder}/mit_rirs"
if not os.path.exists(output_dir):
    os.makedirs(output_dir, exist_ok=True)
    rir_dataset = datasets.load_dataset("davidscripka/MIT_environmental_impulse_responses", split="train", streaming=True)
    # Save clips to 16-bit PCM wav files
    for row in tqdm(rir_dataset):
        name = row['audio'].metadata.path.split('/')[-1]
        scipy.io.wavfile.write(os.path.join(output_dir, name), 16000, (row['audio']['array']*32767).astype(np.int16))

## Download noise and background audio

# Audioset Dataset (https://research.google.com/audioset/dataset/index.html)
# Download one part of the audioset .tar files, extract, and convert to 16khz
# For full-scale training, it's recommended to download the entire dataset from
# https://huggingface.co/datasets/agkphysics/AudioSet, and
# even potentially combine it with other background noise datasets (e.g., FSD50k, Freesound, etc.)

output_dir = f"{ouput_folder}/audioset"
if not os.path.exists(output_dir):
    os.makedirs(output_dir, exist_ok=True)

    fname = "bal_train09.tar"
    out_dir = f"{output_dir}/{fname}"
    link = "https://huggingface.co/datasets/agkphysics/AudioSet/resolve/main/data/" + fname
    !wget -O {out_dir} {link}
    !cd {output_dir} && tar -xf {fname}
    !rm -rf {fname}

    subout_dir = f"{ouput_folder}/audioset_16k"
    if not os.path.exists(subout_dir):
        os.makedirs(subout_dir, exist_ok=True)

    # Save clips to 16-bit PCM wav files
    audioset_dataset = datasets.Dataset.from_dict({"audio": [str(i) for i in Path(f"{output_dir}/audio").glob("**/*.flac")]})
    audioset_dataset = audioset_dataset.cast_column("audio", datasets.Audio(sampling_rate=16000))
    for row in tqdm(audioset_dataset):
        name = row['audio'].metadata.path.split('/')[-1].replace(".flac", ".wav")
        scipy.io.wavfile.write(os.path.join(subout_dir, name), 16000, (row['audio']['array']*32767).astype(np.int16))

# Free Music Archive dataset
# https://github.com/mdeff/fma
# (Third-party mchl914 extra small set)

output_dir = f"{ouput_folder}/fma"
if not os.path.exists(output_dir):
    os.makedirs(output_dir, exist_ok=True)
    fname = "fma_xs.zip"
    link = "https://huggingface.co/datasets/mchl914/fma_xsmall/resolve/main/" + fname
    out_dir = f"{output_dir}/{fname}"
    !wget -O {out_dir} {link}
    !cd {output_dir} && unzip -q {fname}
    !rm -rf {fname}

    subout_dir = f"{ouput_folder}/fma_16k"
    if not os.path.exists(subout_dir):
        os.mkdir(subout_dir)

    # Save clips to 16-bit PCM wav files
    fma_dataset = datasets.Dataset.from_dict({"audio": [str(i) for i in Path(f"{output_dir}/fma_small").glob("**/*.mp3")]})
    fma_dataset = fma_dataset.cast_column("audio", datasets.Audio(sampling_rate=16000))
    for row in tqdm(fma_dataset):
        name = row['audio'].metadata.path.split('/')[-1].replace(".mp3", ".wav")
        scipy.io.wavfile.write(os.path.join(subout_dir, name), 16000, (row['audio']['array']*32767).astype(np.int16))

In [None]:
# Sets up the augmentations.
# To improve your model, experiment with these settings and use more sources of
# background clips.

import os
from speech_commands.audio.augmentation import Augmentation
from speech_commands.audio.clips import Clips


commands_directory = './dataset'
commands_dirs = os.listdir(commands_directory)

clips = {}
for folder in commands_dirs:
    folder_path = f'{commands_directory}/{folder}'
    clips[folder] = Clips(input_directory=folder_path,
            file_pattern='*.wav',
            max_clip_duration_s=None,
            remove_silence=False,
            random_split_seed=10,
            split_count=0.1,
            )


augmenter = Augmentation(augmentation_duration_s=3.2,
                         augmentation_probabilities = {
                                "SevenBandParametricEQ": 0.1,
                                "TanhDistortion": 0.1,
                                "PitchShift": 0.1,
                                "BandStopFilter": 0.1,
                                "AddColorNoise": 0.1,
                                "AddBackgroundNoise": 0.75,
                                "Gain": 1.0,
                                "RIR": 0.5,
                            },
                         impulse_paths = ['./noising_dataset/mit_rirs'],
                         background_paths = ['./noising_dataset/fma_16k', './noising_dataset/audioset_16k'],
                         background_min_snr_db = -5,
                         background_max_snr_db = 10,
                         min_jitter_s = 0.195,
                         max_jitter_s = 0.205,
                         )

In [None]:
# Augment a random clip and play it back to verify it works well

from IPython.display import Audio
from speech_commands.audio.audio_utils import save_clip
    
random_clip = clips['adelante'].get_random_clip()
augmented_clip = augmenter.augment_clip(random_clip)
save_clip(augmented_clip, 'augmented_clip.wav')

display(Audio("augmented_clip.wav", autoplay=True))

In [None]:
# Augment samples and save the training, validation, and testing sets.
# Validating and testing samples generated the same way can make the model
# benchmark better than it performs in real-word use. Use real samples or TTS
# samples generated with a different TTS engine to potentially get more accurate
# benchmarks.

import os
from mmap_ninja.ragged import RaggedMmap
from speech_commands.audio.spectrograms import SpectrogramGeneration


output_dataset = 'commands_augmented'
commands_dir = f'{output_dataset}/commands'
negative_dir = f'{output_dataset}/negative'
splits = ["training", "validation", "testing"]


if not os.path.exists(commands_dir):
    os.makedirs(commands_dir)


for cmd in clips.keys():
  nmap_name = 'command_mmap'
  output_dir = os.path.join(commands_dir)

  for split in splits:
    out_dir = os.path.join(output_dir, cmd, split)
    if not os.path.exists(out_dir):
        os.makedirs(out_dir)


    split_name = "train"
    repetition = 2
    
  
    spectrograms = SpectrogramGeneration(clips=clips[cmd],
                                      augmenter=augmenter,
                                      slide_frames=10,    # Uses the same spectrogram repeatedly, just shifted over by one frame. This simulates the streaming inferences while training/validating in nonstreaming mode.
                                      step_ms=10,
                                      )
    
    if split == "validation":
      split_name = "validation"
      repetition = 1
    elif split == "testing":
      split_name = "test"
      repetition = 1
      spectrograms = SpectrogramGeneration(clips=clips[cmd],
                                      augmenter=augmenter,
                                      slide_frames=1,    # The testing set uses the streaming version of the model, so no artificial repetition is necessary
                                      step_ms=10,
                                      )

    RaggedMmap.from_generator(
        out_dir=os.path.join(out_dir, 'command_mmap'),
        sample_generator=spectrograms.spectrogram_generator(split=split_name, repeat=repetition),
        batch_size=100,
        verbose=True,
    )

In [None]:
# Downloads pre-generated spectrogram features (made for microWakeWord in
# particular) for various negative datasets. This can be slow!

output_dir = f'{output_dataset}/negative'
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
    link_root = "https://huggingface.co/datasets/kahrendt/microwakeword/resolve/main/"
    filenames = ['dinner_party.zip', 'dinner_party_eval.zip', 'no_speech.zip', 'speech.zip']
    for fname in filenames:
        link = link_root + fname

        zip_path = f"{output_dir}/{fname}"
        !wget -O {zip_path} {link}
        !unzip -q {zip_path} -d {output_dir}

In [5]:
# Save a yaml config that controls the training process
# These hyperparamters can make a huge different in model quality.
# Experiment with sampling and penalty weights and increasing the number of
# training steps.

import yaml
import os

output_dataset = 'commands_augmented'
negative_dir = f'{output_dataset}/negative'
commands_dir = f'{output_dataset}/commands'

voice_commands = os.listdir(commands_dir)
total_commands = len(voice_commands)
num_classes = total_commands + 1

config = {
    "batch_size": 128,
    "clip_duration_ms": 1500,
    "training_steps": [20000],
    "learning_rates": [0.001],
    "window_step_ms": 10,
    "num_classes": num_classes,
    "train_dir": "trained",
}

config['features'] = [
    {
        "features_dir": f"{negative_dir}/speech",
        "label": 0,
        "truth": False,
        "sampling_weight": 10.0,
        "penalty_weight": 1.0,
        "truncation_strategy": "truncate_end",
        "type": "mmap",
    },
    {
        "features_dir": f"{negative_dir}/dinner_party",
        "label": 0,
        "truth": False,
        "sampling_weight": 10.0,
        "penalty_weight": 1.0,
        "truncation_strategy": "truncate_end",
        "type": "mmap",
    },
    {
        "features_dir": f"{negative_dir}/no_speech",
        "label": 0,
        "truth": False,
        "sampling_weight": 5.0,
        "penalty_weight": 1.0,
        "truncation_strategy": "truncate_end",
        "type": "mmap",
    }
]

labels = [("negative", 0)]
for i in range(total_commands):
    labels.append((voice_commands[i], i + 1))
    config['features'].append(
        {
            "features_dir": f"{commands_dir}/{voice_commands[i]}",
            "label": i + 1,
            "truth": True,
            "sampling_weight": 1.0,
            "penalty_weight": 1.0,
            "truncation_strategy": "truncate_start",
            "type": "mmap",
        }
    )

# Number of training steps in each iteration - various other settings are configured as lists that corresponds to different steps
config["training_steps"] = [20000]

# Penalizing weight for incorrect class predictions - lists that correspond to training steps
config["positive_class_weight"] = [1]
config["negative_class_weight"] = [20]

config["learning_rates"] = [
    0.001,
]  # Learning rates for Adam optimizer - list that corresponds to training steps
config["batch_size"] = 128

config["time_mask_max_size"] = [
    0
]  # SpecAugment - list that corresponds to training steps
config["time_mask_count"] = [0]  # SpecAugment - list that corresponds to training steps
config["freq_mask_max_size"] = [
    0
]  # SpecAugment - list that corresponds to training steps
config["freq_mask_count"] = [0]  # SpecAugment - list that corresponds to training steps

config["eval_step_interval"] = (
    500  # Test the validation sets after every this many steps
)
config["clip_duration_ms"] = (
    1500  # Maximum length of wake word that the streaming model will accept
)

# The best model weights are chosen first by minimizing the specified minimization metric below the specified target_minimization
# Once the target has been met, it chooses the maximum of the maximization metric. Set 'minimization_metric' to None to only maximize
# Available metrics:
#   - "loss" - cross entropy error on validation set
#   - "accuracy" - accuracy of validation set
#   - "recall" - recall of validation set
#   - "precision" - precision of validation set
#   - "false_positive_rate" - false positive rate of validation set
#   - "false_negative_rate" - false negative rate of validation set
#   - "ambient_false_positives" - count of false positives from the split validation_ambient set
#   - "ambient_false_positives_per_hour" - estimated number of false positives per hour on the split validation_ambient set
config["target_minimization"] = 0.9
config["minimization_metric"] = None  # Set to None to disable

config["maximization_metric"] = "average_viable_recall"

with open(os.path.join("training_parameters.yaml"), "w") as file:
    documents = yaml.dump(config, file)

In [2]:
# Trains a model. When finished, it will quantize and convert the model to a
# streaming version suitable for on-device detection.
# It will resume if stopped, but it will start over at the configured training
# steps in the yaml file.
# Change --train 0 to only convert and test the best-weighted model.
# On Google colab, it doesn't print the mini-batch results, so it may appear
# stuck for several minutes! Additionally, it is very slow compared to training
# on a local GPU.

!python main.py \
--training_config='training_parameters.yaml' \
--train 1 \
--restore_checkpoint 1 \
--test_tf_nonstreaming 0 \
--test_tflite_nonstreaming 0 \
--test_tflite_nonstreaming_quantized 1 \
--use_weights "best_weights" \
mixednet \
--pointwise_filters "64,64,64,64" \
--repeat_in_block  "1, 1, 1, 1" \
--mixconv_kernel_sizes '[5], [7,11], [9,15], [23]' \
--residual_connection "0,0,0,0" \
--first_conv_filters 32 \
--first_conv_kernel_size 5

2025-09-08 08:24:06.397611: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
Traceback (most recent call last):
  File "/home/romero/Downloads/microwakemodel/main.py", line 34, in <module>
    import microwakeword.data as input_data
ModuleNotFoundError: No module named 'microwakeword'
