In [None]:
import os
import random
import numpy as np

import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow.keras import layers, regularizers, Input, Sequential, optimizers, callbacks, Model
from sklearn.model_selection import train_test_split

from internal_methods import spectrogramFromFile
from KerasGenerator import SpectrogramGenerator


In [None]:
wake_data_dir: str = "data/wake"
background_data_dir: str = "data/background"


sr: int = 44100
seconds: int = 2

In [None]:
sample_path = os.path.join(wake_data_dir, os.listdir(wake_data_dir)[0])
sample_shape = spectrogramFromFile(
    audio_filepath=sample_path, expand_last_dim=True
).shape
sample_shape

n_samples = len(os.listdir(wake_data_dir))

# Assemble the Dataset

In [None]:
# Class 1 : Wake word
wake_word_filenames = [
    os.path.join(wake_data_dir, item) for item in os.listdir(wake_data_dir)
]
background_filenames = [
    os.path.join(background_data_dir, item) for item in os.listdir(background_data_dir)
]

random.shuffle(wake_word_filenames)
random.shuffle(background_filenames)

wake_word_filenames = wake_word_filenames[:n_samples]
background_filenames = background_filenames[:n_samples]

wake_word_labels = np.full(shape=n_samples, fill_value=1)
background_labels = np.full(shape=n_samples, fill_value=0)

# ---------------------- #

all_filenames = np.concatenate([wake_word_filenames, background_filenames])
all_labels = np.concatenate([wake_word_labels, background_labels])

label_map = dict(zip(all_filenames, all_labels))

In [None]:
# Dataset split
X_train, X_val, y_train, y_val = train_test_split(
    all_filenames, all_labels, test_size=0.2, random_state=42
)

# Generator Initialization
params = {
    "sample_shape": sample_shape,
    "batch_size": 32,
    "shuffle": True,
}

train_Generator = SpectrogramGenerator(list_IDs=X_train, label_map=label_map, **params)
validation_Generator = SpectrogramGenerator(list_IDs=X_val, label_map=label_map, **params)

# Test the Generator

In [None]:
index = 0  # Choose any index within the range [0, len(data_generator))

# Generate a batch of data
X_batch, y_batch = validation_Generator.__getitem__(index)
# Print the shapes of the generated batch
print("X_batch shape:", X_batch.shape)
print("y_batch shape:", y_batch.shape)
print("y_batch: ", y_batch)

# Module Building

---


### Model Parameters

In [None]:
learning_rate = 0.001
num_epochs = 70
batch_size = params["batch_size"]
input_shape = params["sample_shape"]

### Model Structure

In [None]:
def residual_block(x, filters:int, kernel_size:int|tuple[int]=3, strides:int|tuple[int]=1, activation:str="relu",padding:str="same" ):
    y = layers.Conv2D(filters=filters, kernel_size=kernel_size, strides=strides, padding=padding)(x)
    y = layers.BatchNormalization()(y)
    y = layers.Activation(activation)(y)
    y = layers.Conv2D(filters=filters, kernel_size=kernel_size, strides=1, padding=padding)(y)
    y = layers.BatchNormalization()(y)
    
    print(x.shape, y.shape)
    if x.shape[-1] != filters:
        # Use pointwise convolution to manipulate filter number without changing dimenstions of spatial data
        x = layers.Conv2D(filters=filters, kernel_size=1, strides=strides, padding=padding)(x)
    
    out = layers.Add()([x, y]) # Skip Connection
    out = layers.Activation(activation)(out)
    
    return out

In [None]:
def build_model(input_shape, batch_size=32):
    inputs = Input(shape=input_shape, batch_size=batch_size)

    x = layers.Conv2D(filters=32, kernel_size=3, padding="same")(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)
    x = layers.MaxPooling2D(pool_size=2)(x)

    x = residual_block(x, filters=64)
    x = residual_block(x, filters=128)
    x = residual_block(x, filters=256, strides=2)
    x = residual_block(x, filters=512, strides=2)

    x = layers.TimeDistributed(layers.Flatten())(x)
    x = layers.Dropout(0.3)(x)

    x = layers.LSTM(
        units=256, return_sequences=True, kernel_regularizer=regularizers.l2(0.001)
    )(x)
    x = layers.Dropout(0.3)(x)

    x = layers.LSTM(
        units=512, return_sequences=True, kernel_regularizer=regularizers.l2(0.001)
    )(x)
    x = layers.Dropout(0.3)(x)
    x = layers.LSTM(units=512, kernel_regularizer=regularizers.l2(0.001))(x)
    x = layers.Dropout(0.3)(x)

    x = layers.Dense(units=128, activation="relu")(x)
    x = layers.Dropout(0.3)(x)

    outputs = layers.Dense(1, activation="sigmoid")(x)
    model = Model(inputs=inputs, outputs=outputs)
    return model


model = build_model(input_shape=input_shape)
model.summary()

In [None]:
def build__old_model(input_shape):
    model = Sequential()

    model.add(Input(shape=input_shape, batch_size=batch_size))

    model.add(layers.Conv2D(filters=64, kernel_size=(3, 3), padding="same"))
    model.add(layers.BatchNormalization())
    model.add(layers.Activation("relu"))
    model.add(layers.MaxPooling2D(pool_size=(2, 2)))

    model.add(layers.Conv2D(filters=128, kernel_size=(3, 3), padding="same"))
    model.add(layers.BatchNormalization())
    model.add(layers.Activation("relu"))
    model.add(layers.MaxPooling2D(pool_size=(2, 2)))

    model.add(layers.Conv2D(filters=256, kernel_size=(3, 3), padding="same"))
    model.add(layers.BatchNormalization())
    model.add(layers.Activation("relu"))
    model.add(layers.MaxPooling2D(pool_size=(2, 2)))

    model.add(layers.Conv2D(filters=512, kernel_size=(3, 3), padding="same"))
    model.add(layers.BatchNormalization())
    model.add(layers.Activation("relu"))
    model.add(layers.MaxPooling2D(pool_size=(2, 2)))

    # Reshape the output to be compatible with LSTM
    model.add(layers.TimeDistributed(layers.Flatten()))

    # LSTM layers for temporal dependencies
    model.add(
        layers.LSTM(
            units=512, return_sequences=True, kernel_regularizer=regularizers.l2(0.01)
        )
    )
    model.add(layers.Dropout(0.3))
    model.add(
        layers.LSTM(
            units=512, return_sequences=True, kernel_regularizer=regularizers.l2(0.01)
        )
    )
    model.add(layers.Dropout(0.3))
    model.add(layers.LSTM(units=512, kernel_regularizer=regularizers.l2(0.01)))

    # Dense layers for final classification
    model.add(layers.Dense(units=128, activation="relu"))
    model.add(layers.Dropout(0.3))
    model.add(layers.Dense(1, activation="sigmoid"))

    return model


# model = build_model(input_shape=input_shape)

# Print the model summary
print("Input shape: ", input_shape)
# model.summary()

### Compile the model 

In [None]:
model.compile(
    
    loss="binary_crossentropy",
    optimizer=optimizers.SGD(learning_rate=learning_rate, momentum=0.9, nesterov=True),
    metrics=["accuracy"],
)

### Define Callbacks

In [None]:
monitor = "val_loss"
mode = "min"
verbose = 1

early_stopping_callback = callbacks.EarlyStopping(
    monitor=monitor, mode=mode, patience=15, restore_best_weights=True, verbose=verbose
)
checkpoint_callback = callbacks.ModelCheckpoint(
    "best_model.h5",
    monitor=monitor,
    mode=mode,
    save_best_only=True,
    verbose=verbose,
)
reduce_lr_callback = callbacks.ReduceLROnPlateau(
    monitor=monitor, mode=mode, factor=0.5, patience=2, min_lr=1e-7, verbose=verbose
)

### Train the Model

In [None]:
model_path = "WAKE_WORD_ENHANCED.keras"

history = model.fit(
    train_Generator,
    validation_data=validation_Generator,
    epochs=num_epochs,
    steps_per_epoch=len(train_Generator),
    callbacks=[
        early_stopping_callback,
        checkpoint_callback,
        reduce_lr_callback,
    ]
)

model.save(model_path)
model.save("WAKE_WORD_ENHANCED.h5")

# Model Evaluation

---


### Plot Loss

In [None]:
plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Training Loss over Epochs")
plt.legend()
plt.savefig("loss.png")
plt.clf()

### Plot Accuracy

In [None]:
plt.plot(history.history["accuracy"], label="Training Accuracy")
plt.plot(history.history["val_accuracy"], label="Validation Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.title("Training Accuracy over Epochs")
plt.legend()
plt.savefig("accuracy.png")

### Evalute on the test data

In [None]:
model = tf.keras.models.load_model(model_path)
test_loss, test_accuracy = model.evaluate(
    validation_Generator, steps=len(validation_Generator)
)

print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")

In [None]:
# index = 2  # Choose any index within the range [0, len(data_generator))
# mapping = []
# for i in range(20):

#     # Generate a batch of data
#     X_batch, y_batch = validation_Generator.__getitem__(i)

#     # Print the shapes of the generated batch
#     # print("X_batch shape:", X_batch.shape)
#     # print("y_batch shape:", y_batch.shape)
#     # print("y_batch: ", y_batch)

#     predictions = _model.predict(X_batch, steps=len(X_batch))
#     predictions = (
#         np.round(predictions, 1)
#         .reshape(
#             -1,
#         )
#         .astype(np.int32)
#     )

#     mapping.append(all(val == True for val in predictions == y_batch))