# ELECT_ENG 475: Machine Learning: Foundations, Applications, and Algorithms Project Notebook

**Eric Oliveira, Justin Ansell, Vivek Matta**

In [None]:
# Imports
try:
    import numpy as np
    from IPython.display import Audio, display
    import os
    import matplotlib.pyplot as plt
    import librosa
    import tensorflow as tf
    import keras
    import sys

except ImportError:
    print("To run the code in its entirity, please install the requirements.txt file. ")

In [None]:
# Get essential code and data

git = False # This flag will control code and data imports

try:
    # Get our code through GitHub, if developper tools ara available.
    !git clone https://github.com/Eclo19/smart-home-kws
    print("\n Successfully cloned the repository.")
    git = True
    sys.path.append("smart-home-kws")  # path relative to the notebook
except:
    print("Could not clone repository. Assumiong 'Essential_Data' is defined.")

## Data Gathering and Processing

In [None]:
# --- 1) Show a raw data point (label is the first word in the name; 'blue' in this case.) ---

# Display a full raw recording
if git:
    raw_m4a_path = 'smart-home-kws/Toy_Dataset/Full/blue_eric_03_full.m4a'
    raw_m4a_audio_data, sr = librosa.load(raw_m4a_path, sr=None)
    print("Raw audio File info: \n")
    print(f"    Audio file format: {raw_m4a_path.split('.')[1]}")
    print(f"    Audio Shape = {raw_m4a_audio_data.shape}")
    print(f"    Sample rate = {sr}")
    print(f"    Audio array datatype = {raw_m4a_audio_data[0].dtype}")
    print("\nDisplaying audio file:")

    # Display player
    display(Audio(data=raw_m4a_audio_data, rate=sr))

    # Plot waveform
    plt.figure(figsize=(6,4))
    t = np.arange(0, len(raw_m4a_audio_data),1)/sr
    plt.plot(t, raw_m4a_audio_data, color='black')
    plt.title("Example Waveform: blue_eric_03_full.m4a")
    plt.xlabel('Time (s)')
    plt.ylabel("Amplitude (float32)")



In [None]:
if git:    

    # Sanitize data
    print("Forcing project standards to the raw recording. This includes: \n")
    print("    - forcing our sample rate of 22.05 Khz")
    print("    - forcing wav format")
    print("    - summing to mono if stereo")
    print("    - ensuring dtype is float32")
    print("    - normalizing")
    
    try: 
        import data_augmentation

    except ImportError:
        print("Could not import Python script from the cloned Repository.")
    
    data_augmentation.VANILLA_DATA_PATH = "smart-home-kws/Toy_Dataset/Full"
    data_augmentation.sanitize_vanilla_dataset()

    audio_path = 'smart-home-kws/Toy_Dataset/Full/blue_eric_03_full.wav'
    audio_data, sr = librosa.load(audio_path, sr=None)

    print("Sanitized audio File info: \n")
    print(f"    Audio file format: {audio_path.split('.')[1]}")
    print(f"    Audio Shape = {audio_data.shape}")
    print(f"    Sample rate = {sr}")
    print(f"    Audio array datatype = {audio_data[0].dtype}")

In [None]:
# --- 2) Show how a sanitized files are chopped ---
if git:
    try: 
        import file_chopper
    except ImportError:
        print("Could not import Python script from the cloned Repository.")

    print("The default parameters should be a good fit, but the user has to input their threshold" \
    "and their window_length choices. This is necessary to adapt to all recording situations.")

    print("\nNOTE: A prompt will show up to check/reset the parameters for chopping. ")
    print("If you do not wish to write the chops, press 'esc' in each prompt. This will throw an error.")
    print("For demo purposes, we recommend pressing 1 for both prompts and writting the chops.")
    file_chopper.FULL_DATA_PATH = "smart-home-kws/Toy_Dataset/Full"
    file_chopper.CHOPPED_DIR = "smart-home-kws/Toy_Dataset/Live_chops"
    
    # Adapting parameters to reasonable values for this waveform
    file_chopper.parse(wait=(int(22050*0.15)), duration=(int(22050*0.45)))

In [None]:
live_chops_path = 'smart-home-kws/Toy_Dataset/Live_chops'
if os.path.isdir(live_chops_path):

    #Sanitize chops again for safety and normalization
    data_augmentation.VANILLA_DATA_PATH = live_chops_path
    data_augmentation.sanitize_vanilla_dataset()

    print("Now displaying the first chop:")
    blue_1_path = os.path.join(live_chops_path, 'blue_eric_03_01.wav')
    audio_data, sr = librosa.load(blue_1_path, sr=None)
    display(Audio(data=audio_data, rate=sr))
    
    # Plot waveform
    plt.figure(figsize=(6,4))
    t = np.arange(0, len(audio_data),1)/sr
    plt.plot(t, audio_data, color='black')
    plt.title("Example Waveform: blue_eric_03_01.wav")
    plt.xlabel('Time (s)')
    plt.ylabel("Amplitude (float32)")
    plt.show()

else:
    print("Live chops directory not found. Skipping...")
    

In [None]:
# --- 3) Show data augmentation transformations ---
print("We split the data with wrapper.split_dataset(). However, this function takes too much" \
" time and memory to run. We move on to show how we expanded the training set. ")
print("For filtering, we built the taps with pyfda and used scipy to convolve the responses with the input.")
print("For pitch-shifting, we used librosa's effects.")
print("\nNow displaying all audio transformations applied to each point in the training data:\n\n")

print("Original chop:")
display(Audio(data=audio_data, rate=sr))
print("Low-passed:")
transformed_audio_data = data_augmentation.low_pass(audio_data)
display(Audio(data=transformed_audio_data, rate=sr))
print("High-passed:")
transformed_audio_data = data_augmentation.high_pass(audio_data)
display(Audio(data=transformed_audio_data, rate=sr))
print("Band-passed:")
transformed_audio_data = data_augmentation.band_pass(audio_data)
display(Audio(data=transformed_audio_data, rate=sr))
print("Pitched Up:")
transformed_audio_data = data_augmentation.pitch_up(audio_data)
display(Audio(data=transformed_audio_data, rate=sr))
print("Pitched Down:")
transformed_audio_data = data_augmentation.pitch_down(audio_data)
display(Audio(data=transformed_audio_data, rate=sr))
print("Added red noise: (low-passed white noise)")
transformed_audio_data = data_augmentation.add_noise(audio_data)
display(Audio(data=transformed_audio_data, rate=sr))

In [None]:
# --- 5) Show Feature Extraction ---
print("\nWe build json feature dictionaries for our training, validation, and test data.")
print("For that, we used the feature.extraction.build_feature_dict() function.")
print("This function calls the mfcc extraction and vectorize label functions shown below")
print("for each point in the dataset and writes a tuple(flat_mfccs, vec_label) in a json dict.")

if git:
    try:
        import feature_extraction
        import wrapper
    except ImportError:
                print("Could not import Python script from the cloned Repository.")

    # Force all data points to have the same time by zero padding
    wrapper.force_standard_size(dirname=live_chops_path, size=39243)

    #Get MFCCs and vectorized label
    audio_data, sr = librosa.load(blue_1_path, sr=None)
    print(f"Data length = {len(audio_data)}")
    mfccs = feature_extraction.extract_mfccs(audio_data=audio_data)
    print(f"\nMFCCs shape: {mfccs.shape}")
    l = 'blue_eric_03_01.wav'.split('_')[0]
    vec_label = feature_extraction.vectorize_label(label=l)
    print(f"\nParsed label = {l}, vec label: \n{vec_label}")

    #Plot what the model actually sees
    mfcc_masked = np.ma.masked_where(mfccs == 0.0, mfccs)

    # Copy a colormap and set the "bad" (masked) color to something obvious
    cmap = plt.cm.viridis.copy()
    cmap.set_bad(color="white")   # zero-padding will appear in red

    plt.figure(figsize=(8, 4))
    im = plt.imshow(
        mfcc_masked,
        aspect="auto",
        origin="lower",
        cmap=cmap
    )
    plt.title(f"Model's input - MFCC Visualization (Zero-padding shown in white)")
    plt.xlabel("Time frames")
    plt.ylabel("MFCC coefficient index")
    plt.colorbar(im, label="MFCC value")
    plt.tight_layout()
    plt.show()
    
    


## The Model

The full script for training the model is available and at https://colab.research.google.com/drive/1zg0SPHg8Gk4uLPPREuo5_4F8gLJBUEz0?usp=sharing. We show some basics of how the model was trained and some test data inferencing as well, but showing the full training would require too much time and memory.

In [None]:
# --- 1) Loading the data ---
print("Our processed data is stored as a json feature dictionary of (x, y) tuples.")
print("We load it to runtime with wrapper.wrapper(dirname, augmented=False). ")
print("When loading the augmented training, validation, and test data, the shapes should be:\n")
print("Train's x shape: (15561, 4928)")
print("Train's y shape: (15561, 9)")
print("\nVal's x shape: (485, 4928)")
print("Val's y shape: (485, 9)")
print("\nTest's x shape: (473, 4928)")
print("Test's y shape: (473, 9)")
print("\nThe split is roughly 70% for training, 15% for test and 15% for validation (for the vanilla set).")

In [None]:
# --- 2) Defining the Model ---

#Input
inputs = tf.keras.Input(shape=(32, 154, 1))
# Block 1
x = tf.keras.layers.Conv2D(
    64, (4, 4),
    activation='relu',
    padding='same'
)(inputs)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.MaxPool2D(pool_size=(2, 2))(x)
x = tf.keras.layers.SpatialDropout2D(0.1)(x)
# Block 2
x = tf.keras.layers.Conv2D(
    64, (3, 3),
    activation='relu',
    padding='same'
)(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.MaxPool2D(pool_size=(2, 2))(x)
x = tf.keras.layers.SpatialDropout2D(0.1)(x)
# Block 3
x = tf.keras.layers.Conv2D(
    128, (3, 3),
    activation='relu',
    padding='same'
)(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.MaxPool2D(pool_size=(2, 2))(x)
x = tf.keras.layers.SpatialDropout2D(0.2)(x)
# Global pooling 
x = tf.keras.layers.GlobalAveragePooling2D()(x)
# Dense Layers
x = tf.keras.layers.Dense(128, activation='relu')(x)
x = tf.keras.layers.Dropout(0.3)(x)
#Output
outputs = tf.keras.layers.Dense(9, activation='softmax')(x)
# Get summary
cnn = tf.keras.Model(inputs=inputs, outputs=outputs)
cnn.summary()


In [None]:
# --- 2) Training sketch (no data provided to avoid computation) ---

# Hyperparameters used
learning_rate = 1e-3
batch_size = 50
epochs = 20
stop_patience = 5
lr_patience = 2
lr_reduct_factor = 0.5
# Build model
cnn.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
    loss=tf.keras.losses.CategoricalCrossentropy(),
    metrics=['accuracy']
)
early_stop = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=stop_patience,
    restore_best_weights=True,
    verbose=1
)
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=lr_reduct_factor,
    patience=lr_patience,
    min_lr=1e-6,
    verbose=1
)
checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath="best_cnn_run4_padrobust-epoch{epoch:02d}-"
             "valloss{val_loss:.4f}-valacc{val_accuracy:.4f}.keras",
    monitor='val_loss',
    save_best_only=True,
    verbose=1
)
# Provide no data, simply show the training structure looks like
if False:
    x, y = None 
    xv, yv, = None
    history = cnn.fit(
        x,
        y,
        batch_size=batch_size,
        epochs=epochs,
        callbacks=[early_stop, reduce_lr, checkpoint],
        validation_data=(xv, yv),
        verbose=1
    )


## Real-Time Inference

Real-time inference works in the following way:

1) Record a 4s long buffer when the button is pressed
2) Extract an input window of length 39243 (model's input size in samples) based on DSP techniques (Leaky integrator's maximum will determine the midpoint of the input window)
3) Normalize and force zeros around the midpoint of the window, to mimic zero-padded training data
4) Extract zeroed-out window's MFCCs and feed it to the model
5) Get a prediction
6) Take some action based on the prediction in the embedded device.

In [None]:
# --- 1) Run Inference on Test Data ---

# Constants
test_path  = 'Toy_Dataset/Test Data'
MODEL_PATH = 'smart-home-kws/Models/best_cnn_run4_padrobust-epoch15-valloss0.0025-valacc1.0000.keras'
N_MFCC   = feature_extraction.N_MFCC
T_FRAMES = feature_extraction.T_FRAMES

# Choose a random test point
test_points = [
    f for f in os.listdir(test_path)
    if f.lower().endswith(('.wav', '.m4a', '.flac', '.ogg'))
]
test_filename = np.random.choice(test_points)
test_data_path = os.path.join(test_path, test_filename)
print(f"Random test file: {test_filename}")
true_label_str = test_filename.split('_')[0]
print(f"True label parsed from filename: {true_label_str}")

# Label mapping dim --> string
label_mapping = {
    'red': 0, 'green': 1, 'blue': 2, 'white': 3, 'off': 4,
    'time': 5, 'temperature': 6, 'unknown': 7, 'noise': 8
}
idx_to_label = {v: k for k, v in label_mapping.items()}

# Get audio features and display audio
audio_data, sr = librosa.load(test_data_path)
display(Audio(data=audio_data, rate=sr))
mfccs = feature_extraction.extract_mfccs(audio_data=audio_data)
print("MFCCs shape:", mfccs.shape)

# Load model
print(f"\nLoading model from: {MODEL_PATH}")
model = keras.models.load_model(MODEL_PATH, compile=False)
print("Loaded model successfully.\n")

# Prepare input for CNN and predict
input_matrix = mfccs.reshape(1, N_MFCC, T_FRAMES, 1).astype(np.float32)
print("input_matrix shape:", input_matrix.shape, "dtype:", input_matrix.dtype)
y_pred = model.predict(input_matrix, verbose=0)[0]
pred_idx = int(np.argmax(y_pred))
pred_label = idx_to_label[pred_idx]
print("\nInference Result ")
print(f"    -True label      : {true_label_str}")
print(f"    -Predicted label : {pred_label}")
print(f"    -Model output (softmax in final layer, these are the probabilities): \n{y_pred.ravel()}")


In [None]:
# --- 2) Run Inference on Pre-Recorded Window, Simulating Embedded Behavior ---

audio_data, sr = librosa.load("Toy_Dataset/Window/full_window.wav", sr=None)
audio_data = audio_data.astype(np.float32)
audio_data = audio_data / np.max(np.abs(audio_data)) # Normalize
print("\nInput window test:")
display(Audio(data=audio_data, rate=sr))
try:
    import inference_test
except ImportError:
            print("Could not import Python script from the cloned Repository.")
# Extract best window and plot it
input_window = inference_test.plot_signal_and_loudest_window_leaky(
    audio_data=audio_data,
    sr=sr,
    tau_ms=200
)
# Zero-out non-audio beginning and end (similar process as file_chopper)
input_window_zeroed = inference_test.zero_out(input_window)

# Plot audio data's pipeline
plt.figure(figsize=(12,6))
t = np.arange(0, len(input_window)) / sr 
plt.subplot(2, 1, 1)
plt.plot(t, input_window)
plt.title("Extracted Input Window")
plt.xlabel("Time(s)")
plt.ylabel("Amplitude")
plt.subplot(2, 1, 2)
plt.plot(t, input_window_zeroed)
plt.title("Zeroed-Out Extracted Input Window")
plt.xlabel("Time(s)")
plt.ylabel("Amplitude")
plt.tight_layout()
plt.show()

# Show clean input
print("\nCleaned input window:")
display(Audio(data=input_window_zeroed, rate=sr))


# Extract features
mfccs = feature_extraction.extract_mfccs(audio_data=input_window_zeroed)
print("MFCCs shape:", mfccs.shape)

# Prepare input for CNN and predict
input_matrix = mfccs.reshape(1, N_MFCC, T_FRAMES, 1).astype(np.float32)
y_pred = model.predict(input_matrix, verbose=0)[0]
pred_idx = int(np.argmax(y_pred))
pred_label = idx_to_label[pred_idx]
print("\n=====Inference Result =====\n")
print(f"    -Predicted label : {pred_label}\n")
print(f"    -Model output (softmax in final layer, these are the probabilities): \n{y_pred.ravel()}")