In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import Model, load_model, Sequential
from tensorflow.keras.layers import Dense, Activation, Dropout, Input, Masking, TimeDistributed, LSTM, Conv1D
from tensorflow.keras.layers import GRU, Bidirectional, BatchNormalization, Reshape
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Precision, Recall

import tensorflow as tf
import keras_tuner as kt
from sklearn.model_selection import train_test_split
import numpy as np

In [None]:
import sys 
sys.path.append("..")
from RUN_TENSORBOARD import *

# Launch TensorBoard to monitor training
events_folder = "./logs"
main("./logs")

In [3]:
import pickle 

# Load the preprocessed datasets
with open('X.pkl', 'rb') as f:
    X = pickle.load(f)

with open('Y.pkl', 'rb') as f:
    Y = pickle.load(f)

# Perform Stratified Split: Train (80%), Val (10%), Test (10%)
# Determine which samples have at least one trigger word for stratification
y_has_trigger = np.any(Y > 0, axis=(1, 2))

X_train, X_val_test, Y_train, Y_val_test = train_test_split(
    X, Y, test_size=0.2, random_state=42, stratify=y_has_trigger
)

# Split the remaining 20% into 50% Val, 50% Test (10% total each)
y_has_trigger_val = np.any(Y_val_test > 0, axis=(1, 2))
X_val, X_test, Y_val, Y_test = train_test_split(
    X_val_test, Y_val_test, test_size=0.5, random_state=42, stratify=y_has_trigger_val
)

print(f"Training set shape: {X_train.shape}, {Y_train.shape}")
print(f"Dev (Val) set shape: {X_val.shape}, {Y_val.shape}")
print(f"Test set shape: {X_test.shape}, {Y_test.shape}")

# Handle Imbalance using Sample Weights
def get_sample_weights(y, weight_factor=10.0):
    """
    Calculate sample weights where positive timesteps get higher weight.
    """
    # Initialize weights with 1.0
    weights = np.ones(y.shape[:2])
    # Apply higher weight to positive classes (1s)
    # y shape is (samples, steps, 1), so squeeze to (samples, steps)
    weights[y.squeeze() == 1] = weight_factor
    return weights

# Calculate dynamic weight factor based on ratio
neg = np.sum(Y_train == 0)
pos = np.sum(Y_train == 1)
total = neg + pos
w1 = (1 / pos) * (total / 2.0)
w0 = (1 / neg) * (total / 2.0)
# We will base our factor relative to w0 being approx 1, so factor ~ w1/w0
weight_factor = w1 / w0
print(f"Calculated class weight factor for positives: {weight_factor:.2f}")

train_sample_weights = get_sample_weights(Y_train, weight_factor)
val_sample_weights = get_sample_weights(Y_val, weight_factor)

del X
del Y
del X_val_test
del Y_val_test

Training set shape: (29914, 431, 256), (29914, 105, 1)
Dev (Val) set shape: (3739, 431, 256), (3739, 105, 1)
Test set shape: (3740, 431, 256), (3740, 105, 1)
Calculated class weight factor for positives: 3.76


In [4]:

def build_model(hp):
    """
    Build the trigger word detection model architecture with hyperparameter tuning.
    """
    input_shape = (431, 256) # Tx, n_freq
    X_input = Input(shape = input_shape)
    
    # Conv layer to extract features and downsample
    filters = hp.Int('filters', min_value=128, max_value=512, step=32)
    kernel_size = 15
    
    X = Conv1D(filters=filters, kernel_size=kernel_size, strides=4)(X_input)
    X = BatchNormalization()(X)
    X = Activation("relu")(X)
    X = Dropout(hp.Float('dropout_1', min_value=0.2, max_value=0.6, step=0.1))(X)

    # First GRU layer to capture temporal patterns
    gru_units_1 = hp.Int('gru_units_1', min_value=64, max_value=256, step=32)
    X = GRU(gru_units_1, return_sequences=True)(X)
    X = Dropout(hp.Float('dropout_2', min_value=0.2, max_value=0.6, step=0.1))(X)
    X = BatchNormalization()(X)
    
    # Second GRU layer for deeper temporal modeling
    gru_units_2 = hp.Int('gru_units_2', min_value=64, max_value=256, step=32)
    X = GRU(gru_units_2, return_sequences=True)(X)
    X = Dropout(hp.Float('dropout_3', min_value=0.2, max_value=0.6, step=0.1))(X)
    X = BatchNormalization()(X)
    X = Dropout(hp.Float('dropout_4', min_value=0.2, max_value=0.6, step=0.1))(X)
    
    # Output layer: probability per timestep
    X = TimeDistributed(Dense(1, activation="sigmoid"))(X)

    model = Model(inputs = X_input, outputs = X)
    
    lr = hp.Choice('learning_rate', values=[1e-4, 1e-5, 1e-6])
    opt = Adam(learning_rate=lr)
    model.compile(loss='binary_crossentropy', optimizer=opt, metrics=["accuracy", Precision(name='precision'), Recall(name='recall')])
    
    return model

In [5]:
Tx = 431  # input timesteps
n_freq = 256  # mel frequency bins
Ty = 105  # output timesteps

hop_length = 256 
n_mels = 256


In [6]:
tuner = kt.BayesianOptimization(
    build_model,
    objective=[kt.Objective('val_loss', direction='min'),kt.Objective('val_precision', direction='max'),kt.Objective('val_recall', direction='max')],
    max_trials=20,
    executions_per_trial=1,
    directory='keras_tuner_logs',
    project_name='trigger_word_tuning',
    overwrite=False 
)


Reloading Tuner from keras_tuner_logs/trigger_word_tuning/tuner0.json


In [None]:
callbacks_best = [
    tf.keras.callbacks.TensorBoard(
        log_dir=events_folder,
        histogram_freq=1,
        write_graph=True,
        update_freq='batch'
    ),
    tf.keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=40,
        restore_best_weights=True
    ),
]

tuner.search(
    X_train, 
    Y_train, 
    epochs=100, 
    batch_size=8,
    validation_data=(X_val, Y_val, val_sample_weights),
    callbacks=callbacks_best,
    sample_weight=train_sample_weights
)

# Get the optimal hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

print(f"""
The hyperparameter search is complete. The optimal number of filters in the first conv layer is {best_hps.get('filters')} 
and the optimal learning rate for the optimizer is {best_hps.get('learning_rate')}.
""")


Search: Running Trial #14

Value             |Best Value So Far |Hyperparameter
384               |128               |filters
0.5               |0.5               |dropout_1
256               |96                |gru_units_1
0.4               |0.4               |dropout_2
64                |128               |gru_units_2
0.5               |0.4               |dropout_3
0.3               |0.5               |dropout_4
0.0001            |0.0001            |learning_rate



I0000 00:00:1770888836.676714   24331 gpu_device.cc:2019] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 21265 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 3090, pci bus id: 0000:01:00.0, compute capability: 8.6


Epoch 1/100


I0000 00:00:1770888852.147185   25090 cuda_dnn.cc:529] Loaded cuDNN version 91001


[1m3740/3740[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m60s[0m 15ms/step - accuracy: 0.6298 - loss: 1.0340 - precision: 0.3329 - recall: 0.7234 - val_accuracy: 0.7278 - val_loss: 0.7897 - val_precision: 0.4370 - val_recall: 0.9901
Epoch 2/100
[1m3740/3740[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m57s[0m 15ms/step - accuracy: 0.8274 - loss: 0.5353 - precision: 0.5561 - recall: 0.9050 - val_accuracy: 0.8660 - val_loss: 0.4269 - val_precision: 0.6246 - val_recall: 0.9198
Epoch 3/100
[1m3740/3740[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m55s[0m 15ms/step - accuracy: 0.8527 - loss: 0.4683 - precision: 0.5970 - recall: 0.9081 - val_accuracy: 0.8286 - val_loss: 0.4671 - val_precision: 0.5547 - val_recall: 0.9644
Epoch 4/100
[1m3740/3740[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m55s[0m 15ms/step - accuracy: 0.8679 - loss: 0.4281 - precision: 0.6281 - recall: 0.9138 - val_accuracy: 0.8451 - val_loss: 0.4112 - val_precision: 0.5803 - val_recall: 0.9710
Epoch 5/100
[1m

In [None]:
# Build the model with the optimal hyperparameters and train it on the data for 50 epochs
model = tuner.hypermodel.build(best_hps)

# callbacks
callbacks = [
    tf.keras.callbacks.ModelCheckpoint('./keras_tuner_logs/best_model.keras', save_best_only=True, monitor='val_loss'),
    tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10)
]

history = model.fit(
    X_train, 
    Y_train, 
    epochs=300, 
    validation_data=(X_val, Y_val, val_sample_weights),
    callbacks=callbacks,
    sample_weight=train_sample_weights
)

In [None]:
# Evaluate on the Test set
print("Evaluating on Test set...")
test_loss, test_acc, test_prec, test_recall = model.evaluate(X_test, Y_test)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_acc}")
print(f"Test Precision: {test_prec}")
print(f"Test Recall: {test_recall}")

In [None]:
# Load the best trained model
model.load_weights('./keras_tuner_logs/best_model.keras')

# Predictions

In [None]:
import matplotlib.pyplot as plt
from pydub import AudioSegment
import matplotlib.pyplot as plt
import librosa
import numpy as np
import IPython


In [None]:

def get_mels_spectogram(file):
    """
    Generate Mel spectrogram from an audio file.
    
    Args:
        file (str): Path to the audio file
        
    Returns:
        tuple: Contains:
            - y_orig (np.ndarray): Original audio time series
            - S_orig (np.ndarray): Mel spectrogram
            - S_dB_orig (np.ndarray): Mel spectrogram in decibels
            - sr (int): Sample rate of the audio file
    """
    y_orig, sr = librosa.load(file)
    S_orig = librosa.feature.melspectrogram(y=y_orig, sr=sr, n_mels=n_mels, hop_length=hop_length)
    S_dB_orig = librosa.power_to_db(S_orig, ref=np.max)
    return y_orig, S_orig, S_dB_orig, sr


def match_target_amplitude(sound, target_dBFS):
    """
    Adjust the volume of an audio segment to match a target amplitude level.
    
    Args:
        sound (AudioSegment): Audio segment to adjust
        target_dBFS (float): Target amplitude in decibels relative to full scale
        
    Returns:
        AudioSegment: Audio segment with adjusted volume
    """
    change_in_dBFS = target_dBFS - sound.dBFS
    return sound.apply_gain(change_in_dBFS)


def trigger_word_detections(filename, out_filename = "tmp.wav"):
    """
    Detect trigger words in an audio file and visualize the results.
    
    Processes the audio file, generates predictions using the model, and plots
    both the mel spectrogram and the prediction probabilities over time.
    
    Args:
        filename (str): Path to input audio file
        out_filename (str): Path for temporary normalized audio file (default: "tmp.wav")
        
    Returns:
        np.ndarray: Model predictions with shape (1, timesteps, 1) containing
                   probabilities of trigger word detection at each timestep
    """
    fig, ax = plt.subplots(2, 1, figsize=(6, 4))
    
    # Normalize audio amplitude before processing
    audio_clip = AudioSegment.from_wav(filename)
    audio_clip = match_target_amplitude(audio_clip, -20.0)
    
    file_handle = audio_clip.export(out_filename, format="wav")
    y_gen, S_gen, S_dB_gen, sr = get_mels_spectogram(out_filename)

    # Reshape spectrogram for model: (freqs, timesteps) -> (timesteps, freqs)
    x = S_gen.swapaxes(0, 1)
    x = np.expand_dims(x, axis=0)
    print(x.shape)
    predictions = model.predict(x)

    # Plot spectrogram
    librosa.display.specshow(S_dB_gen, sr=sr, x_axis='time', y_axis='mel', ax=ax[0], cmap='magma')

    # Plot detection probabilities
    ax[1].plot(predictions[0, :, 0])
    ax[1].set_ylabel('probability')
    plt.show()
    
    return predictions

In [None]:
file = "./Dataset/training_set/train_2.wav"
IPython.display.Audio(file)

In [None]:
predictions  = trigger_word_detections(file)
predictions