# Wake word detection (#2)

Detecting wake word in audio sample. Each audio sample must have `SAMPLE_LEN` length in milliseconds.

* Prepare dataset by applying positive or negative samples on top of background (noise)
* Extract MFCC features from each training example
* Create and fit binary classification models using `x` (MFCC) and `y` as `0` or `1`

In [None]:
from wwd import load_raw_audios, generate_audio
from common import CV_DATA_DIR
from IPython.display import Audio
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix, roc_auc_score
import os
import numpy as np
import matplotlib.pyplot as plt
import sounddevice as sd
from scipy.io.wavfile import write
import librosa.core
import librosa.feature

In [None]:
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "1"

import tensorflow as tf

print(f"TF Version: {tf.__version__}")
print(f"TF Devices: {[d.device_type for d in tf.config.list_physical_devices()]}")

__Parameters__

In [None]:
# The rate of the audio
SAMPLE_RATE = 16000
# The length in ms of the audio (the segment to analyse for wake word)
SAMPLE_LEN = 1000
# The number of MFCC frames (depends on the length of the audio)
MFCC_FRAMES = 32
# The number of MFCC features to extract from the audio
MFCC_FEATURES = 13

In [None]:
WWD_ROOT = os.path.join(CV_DATA_DIR, "audio", "wwd")
WWD_PLAY = os.path.join(CV_DATA_DIR, "playground", "audio", "wwd2")

# Preparing datasets

Source: [Open Voice OS](https://huggingface.co/datasets/OpenVoiceOS/synthetic-wakewords)

__Load audio files__

In [None]:
backgrounds_paths = [
    os.path.join(WWD_ROOT, "wake_word_noise"),
]
positive_paths = [
    os.path.join(WWD_ROOT, "synthetic-wakewords", "hey_jarvis")
]
negative_paths = [
    os.path.join(WWD_ROOT, "synthetic-wakewords", "yes"),
    os.path.join(WWD_ROOT, "synthetic-wakewords", "no")
]

In [None]:
# Load background audio samples (limit each background by `SAMPLE_LEN`)
backgrounds = load_raw_audios(
    backgrounds_paths,
    max_len=SAMPLE_LEN,
    min_len=SAMPLE_LEN)
# Load positive audio samples
positives = load_raw_audios(positive_paths)
# Load negative audio samples
negatives = load_raw_audios(negative_paths)

In [None]:
# The number of positive samples
BACKGROUNDS_NUM = len(backgrounds)
# The number of positive samples
POSITIVES_NUM = len(positives)
# The number of negative samples
NEGATIVES_NUM = len(negatives)

print(f"Background samples: {BACKGROUNDS_NUM}")
print(f"Positive samples: {POSITIVES_NUM}")
print(f"Negative samples: {NEGATIVES_NUM}")

__Process audio file__

In [None]:
def process_audio(file_path, sr=SAMPLE_RATE, n_mfcc=MFCC_FEATURES, max_pad_len=None):
    # Load audio file
    y, sr = librosa.load(file_path, sr=sr)
    # Compute MFCCs
    mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)
    # Transpose MFCC features to have time as the first dimension
    mfccs = mfccs.T
    # Pad or truncate to max_pad_len
    if max_pad_len is not None:
        if mfccs.shape[0] > max_pad_len:
            mfccs = mfccs[:max_pad_len, :]
        else:
            pad_width = ((0, max_pad_len - mfccs.shape[0]), (0, 0))
            mfccs = np.pad(mfccs, pad_width, mode='constant')
    return mfccs

In [None]:
EXAMPLE_PATH = os.path.join("data", "example.wav")

positive_num, _ = generate_audio(
    EXAMPLE_PATH,
    backgrounds,
    positives,
    negatives,
    sample_len=SAMPLE_LEN,
    max_negatives=2
)

is_positive = bool(positive_num > 0)
print(f"Is positive sample: {is_positive}")

In [None]:
# Play an example
Audio(EXAMPLE_PATH)

In [None]:
# Get MFCC of one example
mfcc = process_audio(EXAMPLE_PATH)

# Adjust MFCC_FRAMES/MFCC_FEATURES parameters according to audio file length
print("MFCC parameters:")
print(f".....frames={mfcc.shape[0]}")
print(f"...features={mfcc.shape[1]}")

In [None]:
assert mfcc.shape[0] == MFCC_FRAMES, \
    "The number of MFCC frames is invalid"
assert mfcc.shape[1] == MFCC_FEATURES, \
    "The number of MFCC features is invalid"

__Generate examples__

In [None]:
# The number of examples to generate
n_samples = 50000

OUTPUT_PATH = os.path.join("data", "example.wav")
X = []
Y = []
for i in range(0, n_samples):
    positive_num, _ = generate_audio(
        OUTPUT_PATH,
        backgrounds,
        positives,
        negatives,
        sample_len=SAMPLE_LEN,
        max_negatives=2)
    x = process_audio(OUTPUT_PATH, max_pad_len=MFCC_FRAMES)
    y = 1 if positive_num > 0 else 0
    X.append(x)
    Y.append(y)
X = np.array(X)
Y = np.array(Y)

In [None]:
print(f"X shape: {X.shape}")
print(f"Y shape: {Y.shape}")

In [None]:
assert X.shape[1] == MFCC_FRAMES, \
    "The number of MFCC frames is invalid"
assert X.shape[2] == MFCC_FEATURES, \
    "The number of MFCC features is invalid"

In [None]:
# Save all examples to a file
SAVE_DIR = os.path.join(WWD_PLAY, "XY")
if not os.path.exists(SAVE_DIR):
    os.makedirs(SAVE_DIR)
np.save(os.path.join(SAVE_DIR, "X.npy"), X)
np.save(os.path.join(SAVE_DIR, "Y.npy"), Y)

__Load examples__

In [None]:
# Load examples from a file
X = []
Y = []
X_path = os.path.join(WWD_PLAY, "XY", "X.npy")
if os.path.exists(X_path):
    X = np.load(X_path)
else:
    print(f"File <{X_path}> not found")
Y_path = os.path.join(WWD_PLAY, "XY", "Y.npy")
if os.path.exists(Y_path):
    Y = np.load(Y_path)
else:
    print(f"File <{Y_path}> not found")

In [None]:
print(f"X shape: {X.shape}")
print(f"Y shape: {Y.shape}")

In [None]:
assert X.shape[1] == MFCC_FRAMES, \
    "The number of MFCC frames is invalid"
assert X.shape[2] == MFCC_FEATURES, \
    "The number of MFCC features is invalid"

__Split example into different sets__

In [None]:
# Split all examples into different sets
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.1)
X_train, X_valid, Y_train, Y_valid = train_test_split(X_train, Y_train, test_size=0.1)

print(f"Train set shape: {X_train.shape}")
print(f"Validation set shape: {X_valid.shape}")
print(f"Test set shape: {X_test.shape}")

# Build and train model

In [None]:
def evaluate_model(model, X_test, y_test):
    # Evaluate the model
    y_pred = model.predict(X_test)
    y_pred_classes = (y_pred > 0.5).astype(int)

    precision, recall, f1, _ = precision_recall_fscore_support(
        y_test,
        y_pred_classes,
        average='binary')
    conf_matrix = confusion_matrix(y_test, y_pred_classes)
    roc_auc = roc_auc_score(y_test, y_pred)

    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1-score: {f1:.4f}")
    print(f"ROC AUC: {roc_auc:.4f}")
    print("Confusion Matrix:")
    print(conf_matrix)

    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.tight_layout()
    plt.show()

## LSTM-based Model

In [None]:
def create_lstm_model(input_shape=(MFCC_FRAMES, MFCC_FEATURES)):
    return tf.keras.Sequential([
        tf.keras.Input(shape=input_shape),
        # 1st layer
        tf.keras.layers.Conv1D(filters=64,kernel_size=3, activation="relu"),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPooling1D(pool_size=2),
        # 2nd layer
        tf.keras.layers.Conv1D(128, kernel_size=3, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPooling1D(pool_size=2),
        # 3rd layer
        tf.keras.layers.Conv1D(256, kernel_size=3, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPooling1D(pool_size=2),
        # 4th layer
        tf.keras.layers.LSTM(128, return_sequences=True),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.LSTM(64),
        tf.keras.layers.Dropout(0.3),
        # 5th layer
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(32, activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])

In [None]:
lstm_model = create_lstm_model()

lstm_model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy'])

In [None]:
lstm_model.summary()

In [None]:
EPOCHS = 30
BATCH_SIZE = 32

callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-6)
]

history = lstm_model.fit(
    X_train, Y_train,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_data=(X_valid, Y_valid),
    callbacks=callbacks,
)

In [None]:
# Save model into file
lstm_model.save(os.path.join(WWD_PLAY, "models", "model-lstm.keras"))

__Load model from file__

(load model from previously saved file to skip the fitting again)

In [None]:
lstm_model = tf.keras.models.load_model(os.path.join(WWD_PLAY, "models", "model-lstm.keras"))

__Evaluate model__

In [None]:
# Evaluate model o na test set
evaluate_model(lstm_model, X_test, Y_test)

## GRU-based Model

In [None]:
def create_gru_model(input_shape=(MFCC_FRAMES, MFCC_FEATURES)):
    return tf.keras.Sequential([
        tf.keras.Input(shape=input_shape),
        # 1st layer
        tf.keras.layers.Conv1D(filters=64,kernel_size=3, activation="relu"),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPooling1D(pool_size=2),
        tf.keras.layers.Dropout(0.3),
        # 2nd layer
        tf.keras.layers.Conv1D(128, kernel_size=3, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPooling1D(pool_size=2),
        tf.keras.layers.Dropout(0.3),
        # 3rd layer
        tf.keras.layers.Conv1D(256, kernel_size=3, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPooling1D(pool_size=2),
        tf.keras.layers.Dropout(0.3),
        # 4th layer
        tf.keras.layers.GRU(128, return_sequences=True),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.GRU(64),
        tf.keras.layers.Dropout(0.3),
        # 5th layer
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(32, activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])

In [None]:
gru_model = create_gru_model()

gru_model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy'])

In [None]:
gru_model.summary()

In [None]:
EPOCHS = 30
BATCH_SIZE = 32

callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-6)
]

history = gru_model.fit(
    X_train, Y_train,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_data=(X_valid, Y_valid),
    callbacks=callbacks,
)

In [None]:
# Save model into file
gru_model.save(os.path.join(WWD_PLAY, "models", "model-gru.keras"))

__Load model from file__

(load model from previously saved file to skip the fitting again)

In [None]:
gru_model = tf.keras.models.load_model(os.path.join(WWD_PLAY, "models", "model-gru.keras"))

__Evaluate model__

In [None]:
evaluate_model(gru_model, X_test, Y_test)

# Predict

In [None]:
# List all input audio devices
devices = sd.query_devices()
print(devices)

In [None]:
# Select audio device to use by the index
sd.default.device = 8

In [None]:
# Record audio
duration = SAMPLE_LEN / 1000

print("Recording...")
sample_audio = sd.rec(
    int(SAMPLE_RATE * duration),
    samplerate=SAMPLE_RATE,
    channels=1,
    dtype='int16')
sd.wait()
print("Recording finished.")
write(os.path.join("data", "example.wav"), SAMPLE_RATE, sample_audio)

In [None]:
audio, sr = librosa.load(os.path.join("data", "example.wav"), sr=SAMPLE_RATE)
mfcc = process_audio(os.path.join("data", "example.wav"), max_pad_len=MFCC_FRAMES)
mfcc = np.expand_dims(mfcc, axis=0)
print(f"MFCC shape: {mfcc.shape}")

In [None]:
MODEL1_PATH = os.path.join(WWD_PLAY, "models", "model-lstm.keras")
MODEL2_PATH = os.path.join(WWD_PLAY, "models", "model-gru.keras")

# Select which model to use: LSTM (MODEL1_PATH) or GRU (MODEL2_PATH)
model = tf.keras.models.load_model(MODEL2_PATH)

prediction = model.predict(mfcc)

In [None]:
print(f"Prediction: {prediction[0][0] * 100:.2f}%")
print(f"Positive: {prediction[0][0] > 0.5}")