This first block of code simply imports all of our dependencies and sets some environment settings for our code.

In [None]:
!pip install livelossplot
!pip install -q -U keras-tuner
!pip install visualkeras

import keras_tuner as kt
import librosa
import math
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import pathlib
import random
import scipy.io as sio
import seaborn as sns
import shutil
import sys
import tensorflow as tf
import time
import visualkeras

from collections import defaultdict
from google.colab import drive
from keras import regularizers
from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from keras.preprocessing.image import ImageDataGenerator, NumpyArrayIterator
from keras.utils import to_categorical
from keras.utils.vis_utils import plot_model
from livelossplot import PlotLossesKeras
from os import listdir
from os.path import dirname, join as pjoin
from scipy import signal
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
from tensorflow.keras.utils import to_categorical, image_dataset_from_directory
from tensorflow.keras import optimizers, regularizers
from tensorflow.keras.models import load_model, Model, Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization, LeakyReLU
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.python.client import device_lib

%matplotlib inline
mpl.rcParams['figure.dpi'] = 200

device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  print('GPU device not found\nProceeding without one.')
else:
  print('Found GPU at: {}'.format(device_name))

# This is a bit of magic to make matplotlib figures appear inline in the
# notebook rather than in a new window.
%matplotlib inline
plt.rcParams['figure.figsize'] = (2.0, 2.0) # set default size of plots
plt.rcParams['image.interpolation'] = 'nearest'
plt.rcParams['image.cmap'] = 'gray'

# Some more magic so that the notebook will reload external python modules;
# see http://stackoverflow.com/questions/1907993/autoreload-of-modules-in-ipython
%load_ext autoreload
%autoreload 2

drive.mount('/content/drive')

**Load and Plot dataset**

This block of code loads the dataset, and then plots a sample of the training data as both Spectogram and Waveform.

In [None]:
# Set default data path
# data_dir = 'data/'
data_dir = '/content/drive/My Drive/Oticon/hearing_aids_sound_classifier/data/'

# Load the training data
training_data = np.load(data_dir + 'npy/training.npy')
# Load the training labels
training_labels = np.load(data_dir + 'npy/training_labels.npy')

# Find NaN samples in training_data
nan_samples = np.isnan(training_data)

# Find the indices of NaN samples
nan_indices = np.unique(np.where(nan_samples)[0])

# Remove NaN samples from training_data and training_labels
training_data = np.delete(training_data, nan_indices, axis=0)
training_labels = np.delete(training_labels, nan_indices, axis=0)

# Test shape of data
print("Training data shape: ", training_data.shape)
print('Training data datatype: ', training_data.dtype)

# Set up subplots
fig, axs = plt.subplots(ncols=2, figsize=(12, 5))

# Select a random sound sample
sample = training_data[random.randint(0, len(training_data) - 1)]


# Create a time array for the sample
t = np.linspace(0, 2, len(sample[0]))

# Create a frequency array for the sample
f = np.linspace(0, 11025 , len(sample), endpoint=False)

# Plot the spectrogram of the audio sample
axs[0].imshow(sample.T, origin='lower', aspect='auto', cmap='inferno', extent=[0, 2, 0, 11025])
axs[0].set_xlabel('Time (seconds)')
axs[0].set_ylabel('Frequency (Hz)')
axs[0].set_title('Spectrogram of Audio Sample')
fig.colorbar(axs[0].images[0], ax=axs[0])

# Create a time array for the sample
t = np.linspace(0, 2, len(sample))

# Plot the audio waveform
axs[1].plot(t, sample)
axs[1].set_xlabel('Time (seconds)')
axs[1].set_ylabel('Amplitude (dB)')
axs[1].set_title('Audio Waveform')

# Adjust spacing between subplots
fig.tight_layout()

# Display the plot
plt.show()

**Data preprocessing**

This block of code normalises the dataset, as well as splits the data into a 80/10/10 training/validation/test set.

In [None]:
# Calculate the mean and standard deviation of the training data
training_mean = np.mean(training_data, axis=0)
training_std = np.std(training_data, axis=0)

# Normalize the training data
training_data_norm = (training_data - training_mean) / training_std

# # THIS IS NOT USED, AS IT DOES NOT IMPROVE ROBUSTNESS NOR ACCURACY
# # calculate z-score of each sample in the dataset
# z_scores = np.abs((training_data_norm - np.mean(training_data_norm)) / np.std(training_data_norm))

# # threshold for outlier detection
# outlier_threshold = 3.50
# # find samples with z-score higher than the threshold in any of the features
# outliers = np.where(np.max(z_scores, axis=1) > outlier_threshold)[0]

# # remove outliers from training data and labels
# training_data_clean = np.delete(training_data_norm, outliers, axis=0)
# training_labels_clean = np.delete(training_labels, outliers)

# # print number of removed samples
# num_removed = len(outliers)
# print(f"Number of removed samples: {num_removed}")

# Reshape the data
training_data_norm = training_data_norm.reshape(-1, 96, 32, 1)

# Split your data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(training_data_norm, training_labels, test_size=0.2, random_state=int(time.time()))

# Split the validation set further into validation and test sets
X_val, X_test, y_val, y_test = train_test_split(X_val, y_val, test_size=0.5, random_state=int(time.time()))

# Setting up the datagenerators
# Define your ImageDataGenerator for data augmentation
train_datagen = ImageDataGenerator(
    rotation_range=10,
    zoom_range=0.2,
    width_shift_range=0.1,
    height_shift_range=0.1,
    fill_mode="nearest"
)

# One-hot encode the labels
y_train = tf.keras.utils.to_categorical(y_train, num_classes=5, dtype='float32')
y_val = tf.keras.utils.to_categorical(y_val, num_classes=5, dtype='float32')
y_test = tf.keras.utils.to_categorical(y_test, num_classes=5, dtype='float32')

# Verify proper data preprocessing
print('Raw data min & max:', training_data.min(), training_data.max())
print('Normalized data min & max:', training_data_norm.min(), training_data_norm.max())
print('Processed data min & max:', X_train.min(), X_train.max())

# Test shapes of the sets
print('Training data shape: ', X_train.shape)
print('Training labels shape: ', y_train.shape)
print('Validation data shape: ', X_val.shape)
print('Validation labels shape: ', y_val.shape)
print('Test data shape: ', X_test.shape)
print('Test labels shape: ', y_test.shape)

**Setup of NN**

Considering the size of the dataset and the nature of the task (sound classification), a Convolutional Neural Network (CNN) would be a good choice. CNNs are effective in processing and extracting features from images and sound, and they are computationally efficient due to their shared weight architecture.

This uses Keras' automatic hyperparameter tuner to find the best possible combination for our case.

In [None]:
# The types of sounds that exist in the dataset
num_classes = 5

def model_builder(hp):
  model = tf.keras.Sequential()
  hp_layer_1 = hp.Int('layer_1', min_value = 8, max_value=128, step=8)
  hp_layer_2 = hp.Int('layer_2', min_value = 8, max_value=128, step=16)
  hp_layer_3 = hp.Int('layer_3', min_value = 8, max_value=128, step=32)
  hp_layer_4 = hp.Int('layer_4', min_value = 8, max_value=128, step=32)
  hp_dropout_1 = hp.Float('dropout_1', min_value=0.0, max_value=0.8, step=0.05)
  hp_dropout_2 = hp.Float('dropout_2', min_value=0.0, max_value=0.8, step=0.05)
  hp_alpha = hp.Float('alpha_1', min_value=0.001, max_value=0.3, step=0.001)
  hp_learning_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])
    
  model.add(tf.keras.layers.Conv2D(filters=hp_layer_1, kernel_size=(5,3), activation=LeakyReLU(alpha=hp_alpha), input_shape=(96, 32, 1)))
  model.add(tf.keras.layers.MaxPooling2D((2, 2)))
  model.add(tf.keras.layers.Dropout(hp_dropout_1))
  model.add(tf.keras.layers.Conv2D(filters=hp_layer_2, kernel_size=(3,3), activation=LeakyReLU(alpha=hp_alpha)))
  model.add(tf.keras.layers.MaxPooling2D((2, 2)))
  model.add(tf.keras.layers.Conv2D(filters=hp_layer_3, kernel_size=(3,3), activation=LeakyReLU(alpha=hp_alpha)))
  model.add(tf.keras.layers.MaxPooling2D((2, 2)))
  model.add(tf.keras.layers.Flatten())
  model.add(tf.keras.layers.Dense(units=hp_layer_4, activation=LeakyReLU(alpha=hp_alpha)))
  model.add(tf.keras.layers.Dropout(hp_dropout_2))
  model.add(tf.keras.layers.Dense(num_classes, activation='softmax'))

  model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=hp_learning_rate), loss=tf.keras.losses.CategoricalCrossentropy(), metrics=['accuracy'])

  return model

tuner = kt.Hyperband(model_builder,
                     objective='val_accuracy',
                     max_epochs=25,
                     directory='dir',
                     project_name='x')

early_stop = EarlyStopping(patience=5, verbose=1)
reduce_lr = ReduceLROnPlateau(factor=0.1, patience=5, verbose=1)

**Hyperparameter tuning**

Next, the tuner tries a TON of combinations and saves the best one.

In [None]:
# How many epochs to train for
num_epochs = 50

# Batch size for training
batch_size = 32

verbosity = 2

tuner.search(X_train, y_train, epochs=num_epochs, validation_data=(X_val, y_val), callbacks=[early_stop, reduce_lr])

**Model training**

Here, the model with the best hyperparameters is made and then trained on the dataset.

In [None]:
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

model = tuner.hypermodel.build(best_hps)

# Get a summary of the model
model.summary()

If one wants to manually set up the CNN, that can be done here.

In [None]:
# The types of sounds that exist in the dataset
num_classes = 5

# Define the CNN model
model = Sequential([
    Conv2D(32, (5, 3), activation=LeakyReLU(alpha=0.035), input_shape=(96, 32, 1)),
    BatchNormalization(),
    MaxPooling2D((2, 2)),
    Dropout(0.5),
    Conv2D(48, (3, 3), activation=LeakyReLU(alpha=0.035)),
    BatchNormalization(),
    MaxPooling2D((2, 2)),
    Dropout(0.25),
    Conv2D(128, (3, 3), activation=LeakyReLU(alpha=0.035)),
    BatchNormalization(),
    MaxPooling2D((2, 2)),
    Flatten(),
    Dense(96, activation=LeakyReLU(alpha=0.1)),
    BatchNormalization(),
    Dense(num_classes, activation='softmax')
])

# Compile the model with categorical crossentropy loss and Adam optimizer
model.compile(loss='categorical_crossentropy', optimizer=tf.keras.optimizers.Ad0am(learning_rate=1e-2), metrics=['Accuracy'])
# Get a summary of the model
model.summary()

In [None]:
# Define the callbacks
early_stop = EarlyStopping(patience=10, verbose=1)
# model_checkpoint = ModelCheckpoint('models/best_model.h5', save_best_only=True, verbose=1)
model_checkpoint = ModelCheckpoint('/content/drive/My Drive/Oticon/hearing_aids_sound_classifier/models/my_model.h5', save_best_only=True, verbose=1)
reduce_lr = ReduceLROnPlateau(factor=0.9, patience=5, verbose=1)

# How many epochs to train for
num_epochs = 500
# Batch size for training
batch_size = 32
# Verbosity of training
verbosity = 1
# Train the model on fixed dataset with callbacks
start = time.process_time()
history = model.fit(X_train, y_train, epochs=num_epochs, batch_size=batch_size,
          validation_data=(X_val, y_val), verbose=verbosity,
          callbacks=[early_stop, model_checkpoint, reduce_lr])

# # fits the model on batches with real-time data augmentation
# train_datagen.fit(X_train)
# history = model.fit(train_datagen.flow(X_train, y_train, batch_size=batch_size),
#          validation_data=(X_val, y_val),
#          steps_per_epoch=len(X_train) / batch_size, epochs=num_epochs,
#          callbacks=[early_stop, model_checkpoint, reduce_lr])
print('Training the model took ', (time.process_time() - start)/60, 'minutes')

Load the best model so far and evaluate it on the validation set.

In [None]:
# Load the desired model from the models folder
model = tf.keras.models.load_model('/content/drive/My Drive/Oticon/hearing_aids_sound_classifier/models/317k_LReLU_97-percent.h5')

# Get a summary of the model
model.summary(expand_nested=True)

color_map = defaultdict(dict)
color_map[Dense]['fill'] = 'orange'

visualkeras.layered_view(model, legend=True, color_map=color_map).show()

plot_model(model, to_file='/content/drive/My Drive/Oticon/hearing_aids_sound_classifier/models/visualisations/model_plot.png', show_shapes=True, show_layer_names=False)

# Test the model on the validation set (since we have no labels for the test set)
model.evaluate(X_test, y_test)

In [None]:
# Save model
model.save('/content/drive/My Drive/Oticon/hearing_aids_sound_classifier/models/my_model.h5')

**Confusion Matrix**


In [None]:
# Assume that you have a trained model called 'model'
y_pred = model.predict(X_test)  # Make predictions on X_test

# convert the predicted probabilities to predicted class labels
y_pred_labels = np.argmax(y_pred, axis=1)
y_true = np.argmax(y_test, axis=1)

# Assume that the true labels for X_test are stored in 'y_true'
# confusion_mat = confusion_matrix(y_true, y_pred)
# print(confusion_mat)
classes = ["Other", "Music", "Voice", "Engine", "Alarm"]

cm = confusion_matrix(y_true, y_pred_labels)
print("Confusion matrix")
print(cm)
print("\nConfusion matrix (normalised)")
cm_norm = cm / cm.astype(float).sum(axis=1)
# cm_norm = [['{:.4f}'.format(item) for item in sublist] for sublist in (cm / cm.astype(float).sum(axis=1))]
print(cm_norm)

**Making histogram over class frequency**

In [None]:
# Calculating number of samples for every class
# Iterating all classes' indexes in 'y_true' array
# Using Numpy function 'unique'
# Returning sorted unique elements and their frequencies
unique, counts = np.unique(y_true, return_counts=True)

# Setting default size of the plot
plt.rcParams['figure.figsize'] = (10.0, 7.0)

# Plotting histogram of 5 classes with their number of samples
# Defining a figure object 
figure = plt.figure()

plt.bar(unique, counts, align='center', alpha=0.6)

plt.ylabel("Class frequency", fontsize=15)

plt.xticks(np.arange(5), classes)

plt.title("Class frequency Histogram", fontsize=20)


# Saving the plot
figure.savefig('histogram.png', transparent=True, dpi=500)

# Showing the plot
plt.show()

**Displaying Confusion Matrix**

In [None]:
# Setting default size of the plot
# Setting default fontsize used in the plot
plt.rcParams['figure.figsize'] = (10.0, 9.0)
plt.rcParams['font.size'] = 20

# Implementing visualization of Confusion Matrix
display_cm = ConfusionMatrixDisplay(cm_norm, display_labels=classes)
# Normalised cm
# display_cm = ConfusionMatrixDisplay(cm_norm, display_labels=classes)

# Plotting Confusion Matrix
# Setting colour map to be used
display_cm.plot(cmap='OrRd', xticks_rotation=25)
# Other possible options for colour map are:
# 'autumn_r', 'Blues', 'cool', 'Greens', 'Greys', 'PuRd', 'copper_r'

# Setting fontsize for xticks and yticks
plt.xticks(fontsize=15)
plt.yticks(fontsize=15)

# Giving name to the plot
plt.title('Confusion Matrix', fontsize=24)

# Saving plot
plt.savefig('confusion_matrix.png', transparent=True, dpi=500)

# Showing the plot
plt.show()

In [None]:
# Load the desired model from the models folder
model = tf.keras.models.load_model('/content/drive/My Drive/Oticon/hearing_aids_sound_classifier/models/317k_LReLU_97-percent.h5')

# Load the unlabeled test data
unlabeled_test_data = np.load(data_dir + 'npy/test.npy')

unlabeled_test_data = unlabeled_test_data.reshape(-1, 96, 32, 1)

# Get a summary of the model
model.summary(expand_nested=True)

# Predict the test set
preds_labels = model.predict(unlabeled_test_data)

# Get highest probable prediction for each sample
preds_labels = np.argmax(preds_labels, axis=1)

print('predicted labels array size: ', preds_labels.size)
print('predicted labels array shape: ', preds_labels.shape)
print('predicted labels array dtype: ', preds_labels.dtype)
print(preds_labels)

np.savetxt(data_dir + "predictions/predictions.txt", preds_labels, fmt='%i', header='', comments='')

with open(data_dir + "predictions/predictions.txt") as f:
    row_count = sum(1 for line in f)
print(row_count)