<a href="https://colab.research.google.com/github/ameer-alwadiya/binary-claasification-ml/blob/main/ACS61011_Project_Starter_2024.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ACS61011 Project Starter Python Code

**You should first go to File and 'Save a copy in Drive' for this file**

This code is the start code for the ACS61011 project on automated speech recognition.

The code does the following:

*   Pulls the data in from github
*   Unzips the data
*   Creates Keras training and validation datasets
*   Extracts input-output data from the Keras datasets


In [None]:
# NOTE: YOU SHOULD ONLY NEED TO RUN THIS STEP THE FIRST TIME IN A SESSION
import tensorflow as tf

# get the data from github and unzip
# The term "wget" stands for "World Wide Web get." It's a command-line utility for downloading files from the internet.
!wget https://raw.githubusercontent.com/andrsn/data/main/speechImageData.zip
!unzip -q /content/speechImageData.zip


## Pre-process data into training and validation sets, using Keras dataset objects

Note that when the data is unzipped it is stored locally to Google Colab in the content folder and the unzipped folder is called

'speechImageData - Copy'

and it contains:

the training data in the folder TrainData and

the validation in the folder ValData

There are 12 classes of different spoken words and the spectrograms, which form the input image data are of size 98x50 pixels.

In [None]:
import tensorflow as tf

train_ds = tf.keras.utils.image_dataset_from_directory(
    directory='/content/speechImageData - Copy/TrainData',
    labels='inferred',
    color_mode="grayscale",
    label_mode='categorical',
    batch_size=128,
    image_size=(98, 50)
)

val_ds = tf.keras.utils.image_dataset_from_directory(
    directory='/content/speechImageData - Copy/ValData',
    labels='inferred',
    color_mode="grayscale",
    label_mode='categorical',
    batch_size=128,
    image_size=(98, 50)
)

## Extract input-output data into arrays, which can be more useful

In [None]:
import numpy as np

# Extract the  training input images and output class labels
x_train = []
y_train = []

"""The take method is used to create a new dataset containing a specified number of elements from the original dataset.
When -1 is provided as the argument, it essentially means "take all elements" from the original dataset."""
for images, labels in train_ds.take(-1):
    x_train.append(images.numpy())
    y_train.append(labels.numpy())

# axis=0 means the arrays will be concatenated along the first axis (i.e., rows will be stacked vertically).
x_train = np.concatenate(x_train, axis=0)
y_train = np.concatenate(y_train, axis=0)

# Extract the validation input images and output class labels
x_val = []
y_val = []
for images, labels in val_ds.take(-1):
    x_val.append(images.numpy())
    y_val.append(labels.numpy())

x_val = np.concatenate(x_val, axis=0)
y_val = np.concatenate(y_val, axis=0)

In [None]:
print('Shape of the feature training data: ', x_train.shape)
print('Shape of the label training data: ', y_train.shape)
print('--------------------------------------------------')
print('Shape of an example of the feature training data: ', x_train[0].shape)
print('Shape of an example of the label training data: ', y_train[0].shape)
print('--------------------------------------------------')
print('Shape of the feature testing data: ', x_val.shape)
print('Shape of the label testing data: ', y_val.shape)
print('--------------------------------------------------')
# Get the class names (labels)
label_names = train_ds.class_names
print('Label names: ', label_names)

In [None]:
import matplotlib.pyplot as plt
import random

# Plot some examples
num_examples = 5
plt.figure(figsize=(10, 5))

for i in range(num_examples):
    plt.subplot(1, num_examples, i + 1)
    rand_index = np.random.randint(0, x_train.shape[0])
    plt.imshow(x_train[rand_index], plt.cm.binary)
    plt.title(f"Label: {label_names[np.argmax(y_train[rand_index])]}") # np.argmax() returns the index of the maximum value in the array.
    plt.axis('off')
plt.show()

# Spectrogram Augmentation:

**1- Frequency and time masks:**

In [None]:
import numpy as np

def spec_augment(spec, num_freq_masks=1, num_time_masks=1, freq_masking_max_percentage=0.1, time_masking_max_percentage=0.1):
    spec_aug = spec.copy()
    max_percentage_freq = int(freq_masking_max_percentage * spec.shape[0])
    max_percentage_time = int(time_masking_max_percentage * spec.shape[1])

    for _ in range(num_freq_masks):
        f = np.random.randint(0, max_percentage_freq)
        f0 = np.random.randint(0, spec.shape[0] - f)
        spec_aug[f0:f0 + f, :] = 0

    for _ in range(num_time_masks):
        t = np.random.randint(0, max_percentage_time)
        t0 = np.random.randint(0, spec.shape[1] - t)
        spec_aug[:, t0:t0 + t] = 0

    return spec_aug

In [None]:
x_train_augmented = np.array([spec_augment(spec) for spec in x_train])

print(x_train_augmented.shape)

In [None]:
x_train_combined1 = np.concatenate([x_train, x_train_augmented], axis=0)
y_train_combined1 = np.concatenate([y_train, y_train], axis=0)

print(x_train_combined1.shape)

In [None]:
# Display the original spectrogram
plt.figure(figsize=(5, 5))
plt.subplot(1, 2, 1)
plt.title('Original Spectrogram')
plt.imshow(x_train[3], plt.cm.binary)

# Display the augmented spectrogram
plt.subplot(1, 2, 2)
plt.title('Augmented Spectrogram')
plt.imshow(x_train_augmented[3], plt.cm.binary)
plt.tight_layout()
plt.show()

**2- Spectograms mixing:**

In [None]:
def mixup(original_melspecs, original_labels, alpha=0.5):
    indices = np.random.permutation(original_melspecs.shape[0])

    lam = np.random.beta(alpha, alpha)

    augmented_melspecs = original_melspecs * lam + original_melspecs[indices] * (1 - lam)
    augmented_labels = original_labels * lam + original_labels[indices] * (1 - lam)

    return augmented_melspecs, augmented_labels

In [None]:
augmented_x_train, augmented_y_train = mixup(x_train, y_train)
print(augmented_x_train.shape)

In [None]:
x_train_combined2 = np.concatenate([x_train_combined1, augmented_x_train], axis=0)
y_train_combined2 = np.concatenate([y_train_combined1, augmented_y_train], axis=0)

print(x_train_combined2.shape)
print(y_train_combined2.shape)

In [None]:
# Display the original spectrogram
plt.figure(figsize=(5, 5))
plt.subplot(1, 2, 1)
plt.title('Original Spectrogram')
plt.imshow(x_train[6], plt.cm.binary)


# Display the augmented spectrogram
plt.subplot(1, 2, 2)
plt.title('Augmented Spectrogram')
plt.imshow(augmented_x_train[6], plt.cm.binary)

plt.tight_layout()
plt.show()

# Main Model:

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPooling2D, Flatten, Dense, Softmax, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2  # Import l2 regularizer

def create_model(num_layers=4, filters=(32, 64, 128), dropout_rate=0.25, learning_rate=0.001, weight_decay=0.001):
    model = Sequential()

    model.add(Conv2D(filters[0], kernel_size=(3, 3), padding='same', input_shape=(98, 50, 1), kernel_regularizer=l2(weight_decay)))  # Add L2 to kernel
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding='same'))

    for i in range(num_layers-1):
        model.add(Conv2D(filters[1], kernel_size=(3, 3), padding='same', kernel_regularizer=l2(weight_decay)))  # Add L2 to all convolutional layers
        model.add(BatchNormalization())
        model.add(Activation('relu'))
        model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding='same'))

    model.add(Conv2D(filters[2], kernel_size=(3, 3), padding='same', kernel_regularizer=l2(weight_decay)))  # Add L2 to the last convolutional layer
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size =(12, 1), strides=(1, 1), padding = 'same'))
    model.add(Dropout(dropout_rate))

    model.add(Flatten())

    model.add(Dense(12))  # No regularization typically applied to Dense layers
    model.add(Softmax())

    optimizer = Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

    return model

**1- Training using x_train:**

In [None]:
optimal_model = create_model(num_layers=2, filters=(8, 16, 32), dropout_rate=0, weight_decay=0)

# Train the model
optimal_model.fit(x_train, y_train, batch_size=64, epochs=15, validation_split=0.5)

# Evaluate the model on test data
test_loss, test_accuracy = optimal_model.evaluate(x_val, y_val)
print("Test Accuracy:", test_accuracy)

**2- Training using x_train_combined1:**

In [None]:
optimal_model = create_model(num_layers=2, filters=(8, 16, 32), dropout_rate=0, weight_decay=0)

# Train the model
optimal_model.fit(x_train_combined1, y_train_combined1, batch_size=64, epochs=15, validation_split=0.5)

# Evaluate the model on test data
test_loss, test_accuracy = optimal_model.evaluate(x_val, y_val)
print("Test Accuracy:", test_accuracy)

**3- Training using x_train_combined2:**

In [None]:
optimal_model = create_model(num_layers=2, filters=(8, 16, 32), dropout_rate=0, weight_decay=0)

# Train the model
optimal_model.fit(x_train_combined2, y_train_combined2, batch_size=64, epochs=15, validation_split=0.5)

# Evaluate the model on test data
test_loss, test_accuracy = optimal_model.evaluate(x_val, y_val)
print("Test Accuracy:", test_accuracy)

*The accurcy we got from training the model using x_trained_combined2 (mixing spectogram) indicate the mixing spectogram augmentations has made no improvements to the model as x_trained_combined1 (frequency and time masks) has got the same accurcy.*

# Grid Search Optimizatiom:

In [None]:
!pip install scikeras # need to install scikeras in colab

In [None]:
from scikeras.wrappers import KerasClassifier
from sklearn.model_selection import GridSearchCV

# Create KerasClassifier
model = KerasClassifier(build_fn=create_model, epochs=5,
                        verbose=1, num_layers=2,
                        filters=(8, 16, 32))

# Define the grid search parameters
param_grid = {
    'num_layers': [2, 3, 4, 6, 8],
    'filters': [(8, 16, 32), (16, 32, 64), (32, 64, 128), (64, 128, 256), (128, 256, 512)],
}

# Perform grid search
grid = GridSearchCV(estimator=model, param_grid=param_grid, cv=3)
grid_result = grid.fit(x_train, y_train, batch_size=32, epochs=15, validation_split=0.3)

# Print results
print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_))
means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']
for mean, stdev, param in zip(means, stds, params):
    print("%f (%f) with: %r" % (mean, stdev, param))

**1- Training using x_train with num_layers=4 and filters=(32, 64, 128):**

In [None]:
optimal_model = create_model(num_layers=4, filters=(32, 64, 128), dropout_rate=0, weight_decay=0)

# Train the model
optimal_model.fit(x_train, y_train, batch_size=64, epochs=15, validation_split=0.5)

# Evaluate the model on test data
test_loss, test_accuracy = optimal_model.evaluate(x_val, y_val)
print("Test Accuracy:", test_accuracy)

**2- Training using x_train_combined1 with num_layers=4 and filters=(32, 64, 128):**

In [None]:
optimal_model = create_model(num_layers=4, filters=(32, 64, 128), dropout_rate=0, weight_decay=0)

# Train the model
optimal_model.fit(x_train_combined1, y_train_combined1, batch_size=64, epochs=15, validation_split=0.5)

# Evaluate the model on test data
test_loss, test_accuracy = optimal_model.evaluate(x_val, y_val)
print("Test Accuracy:", test_accuracy)

**2- Training using x_train_combined2 with num_layers=4 and filters=(32, 64, 128):**

In [None]:
optimal_model = create_model(num_layers=4, filters=(32, 64, 128), dropout_rate=0, weight_decay=0)

# Train the model
optimal_model.fit(x_train_combined2, y_train_combined2, batch_size=64, epochs=15, validation_split=0.5)

# Evaluate the model on test data
test_loss, test_accuracy = optimal_model.evaluate(x_val, y_val)
print("Test Accuracy:", test_accuracy)

# Bayesian Optimization:


In [None]:
!pip install hyperopt

In [None]:
import numpy as np
from hyperopt import fmin, tpe, hp
# fmin: Stands for "Function Minimization".
# tpe: Stands for "Tree-structured Parzen Estimator".
# hp: Stands for "Hyperparameters".

# Define the search space
space = {
    'num_layers': hp.choice('num_layers', [2, 3, 4, 6, 8]),
    'filters': hp.choice('filters', [(8, 16, 32), (16, 32, 64), (32, 64, 128), (64, 128, 256), (128, 256, 512)]),

    'dropout_rate': hp.choice('dropout_rate', [0.0, 0.1, 0.15, 0.2, 0.25]),
    'weight_decay': hp.choice('weight_decay', [0.0, 0.1, 0.01, 0.001, 0.0001]),
}

# Define the objective function
def objective(params):
    model = create_model(**params)  # Create model with current parameters

    # Train the model and evaluate on the test set
    model.fit(x_train, y_train, batch_size=64, epochs=15, validation_split=0.5)
    loss, accuracy = model.evaluate(x_val, y_val)
    return {'loss': -accuracy, 'status': 'ok'}

# Run the optimization
best = fmin(fn=objective, space=space, algo=tpe.suggest, max_evals=20)

print("Best parameters found:")
print(best)

**1- Training using x_train with dropout_rate=0.25, weight_decay=0.0, layers=4, and filters=(64, 128, 256):**




In [None]:
optimal_model = create_model(num_layers=4, filters=(64, 128, 256), dropout_rate=0.25, weight_decay=0.0)

# Train the model
optimal_model.fit(x_train, y_train, batch_size=64, epochs=15, validation_split=0.5)

# Evaluate the model on test data
test_loss, test_accuracy = optimal_model.evaluate(x_val, y_val)
print("Test Accuracy:", test_accuracy)

**2- Training using x_train_combined1 with dropout_rate=0.25, weight_decay=0.0, layers=4, and filters=(64, 128, 256):**

In [None]:
optimal_model = create_model(num_layers=4, filters=(64, 128, 256), dropout_rate=0.25, weight_decay=0.0)

# Train the model
optimal_model.fit(x_train_combined1, y_train_combined1, batch_size=64, epochs=15, validation_split=0.5)

# Evaluate the model on test data
test_loss, test_accuracy = optimal_model.evaluate(x_val, y_val)
print("Test Accuracy:", test_accuracy)

**3- Training using x_train_combined2 with dropout_rate=0.25, weight_decay=0.0, layers=4, and filters=(64, 128, 256):**

In [None]:
optimal_model = create_model(num_layers=4, filters=(64, 128, 256), dropout_rate=0.25, weight_decay=0.0)

# Train the model
optimal_model.fit(x_train_combined2, y_train_combined2, batch_size=64, epochs=15, validation_split=0.5)

# Evaluate the model on test data
test_loss, test_accuracy = optimal_model.evaluate(x_val, y_val)
print("Test Accuracy:", test_accuracy)

# Monitoring Training Progress:

In [None]:
# Define the model, compile it with appropriate optimizer and loss

# Train the model
history = optimal_model.fit(x_train_combined1, y_train_combined1, batch_size=16, epochs=20, validation_split=0.3)

# Access training history
train_loss = history.history['loss']
val_loss = history.history['val_loss']
train_accuracy = history.history['accuracy']
val_accuracy = history.history['val_accuracy']

# Plot training history
import matplotlib.pyplot as plt

epochs = range(1, len(train_loss) + 1)

plt.plot(epochs, train_loss, 'b', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.figure()

plt.plot(epochs, train_accuracy, 'b', label='Training accuracy')
plt.plot(epochs, val_accuracy, 'r', label='Validation accuracy')
plt.title('Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.show()


In [None]:
# Evaluate the model on test data
test_loss, test_accuracy = optimal_model.evaluate(x_val, y_val)
print("Test Accuracy:", test_accuracy)

In [None]:
from keras.callbacks import EarlyStopping

# Define the early stopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

# Train the model with early stopping
history = optimal_model.fit(x_train_combined1, y_train_combined1, batch_size=64, epochs=15,
                             validation_split=0.5, callbacks=[early_stopping])

# Evaluate the model on test data
test_loss, test_accuracy = optimal_model.evaluate(x_val, y_val)
print("Test Loss:", test_loss)
print("Test Accuracy:", test_accuracy)


In [None]:
from sklearn.model_selection import ParameterGrid

# Define the grid search parameters
param_grid = {
    'batch_size': [16, 32, 64, 128, 256],
    'epochs': [20, 30, 40, 50, 60],
    'validation_split': [0.1, 0.2, 0.3, 0.4, 0.5],
}

# Generate combinations of hyperparameters
grid = ParameterGrid(param_grid)

best_params = None
best_score = float('inf')

# Iterate over each combination of hyperparameters
for params in grid:
    print("Training with params: ", params)

    # Create model
    model = create_model(num_layers=4,
                         filters=(64, 128, 256),
                         dropout_rate=0.25,
                         weight_decay=0.0)

    # Fit model
    history = model.fit(x_train_combined1,
                        y_train_combined1,
                        batch_size=params['batch_size'],
                        epochs=params['epochs'],
                        validation_split=params['validation_split'],
                        )

    # Evaluate model
    val_loss = model.evaluate(x_val, y_val, verbose=0)

    print("Validation Loss:", val_loss)

    # Check if this is the best model so far
    if val_loss[0] < best_score:
        best_score = val_loss
        best_params = params

print("Best parameters found:", best_params)