<a href="https://colab.research.google.com/github/TevinMusau/Integrating_Voice_to_Mobile_Payment_Systems_Using_Convolutional_Neural_Networks-A_Case_of_MPESA/blob/model/Keyword_Spotting_Model_Training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import json
import numpy as np
import tensorflow as tf
from tensorflow import keras
from sklearn.model_selection import train_test_split

In [None]:
# Mounting Google Drive to obtain the data and store it
from google.colab import drive
drive.mount('/content/drive')

# Path to the preprocessed JSON file data
DATA_PATH = "/content/drive/MyDrive/ICS_PROJECT/Modules/Keyword_Spotting/Outputs/prepared_data.json"

# constant to be used in model optimization
LEARNING_RATE = 0.0001

# Number of Iterations
EPOCHS = 40

# Number of samples to consider per epoch
BATCH_SIZE = 32

# Path the where the model will be saved
SAVED_MODEL_PATH = "/content/drive/MyDrive/ICS_PROJECT/Modules/Keyword_Spotting/Outputs/model.h5"

# Number of neurons at output layer
NUM_KEYWORDS = 12

In [None]:
def load_dataset(data_path):

  # open the path in "read mode" and read the dataset
  with open(data_path, "r") as fp:
    data = json.load(fp)
  
  # extract the inputs (X) and outputs (y) from the prepared_data.json file
    # Recall: the data was stored in a dictionary
    # convert the lists to numpy arrays
  X = np.array(data["MFCCs"])
  y = np.array(data["labels"])

  return X, y

def get_data_splits(data_path, test_size = 0.1, test_validation = 0.1):

  # load the dataset
  X, y = load_dataset(data_path)

  # create the train/ validation/ test splits
    # these will be 2D arrays as per the prepared_data.json created
    # test_size = proportion of the dataset for testing/ validation (10% of dataset)
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = test_size)
  X_train, X_validation, y_train, y_validation = train_test_split(X_train, y_train, test_size = test_validation)
  
  # convert inputs from 2D to 3D arrays
  X_train = X_train[..., np.newaxis]
  # X_validation = X_train[..., np.newaxis]
  # X_test = X_train[..., np.newaxis]
  # print(X_train.shape, X_test.shape)
  # print(y_train.shape, y_test.shape)

  return X_train, X_validation, X_test, y_train, y_validation, y_test


In [None]:
# load train/validation/test data splits

X_train, X_validation, X_test, y_train, y_validation, y_test = get_data_splits(DATA_PATH)

In [None]:
def build_model(input_shape, learning_rate, error = "sparse_categorical_crossentropy"):

  # build the network
    # create a sequential network
  model = keras.Sequential()

    # 3 Convolutional Layers
      # conv layer 1
        # Params
          # 64 = number of filters
          # (3, 3) = Kernal size
          # activation = activation function for non-linearity (set to ReLu)
          # input_shape = shape/ order of input data
          # kernel_regularizer = prevents overfitting (using l2 regularization)
  model.add(keras.layers.Conv2D(64, (3, 3), activation="relu", input_shape = input_shape, kernel_regularizer = keras.regularizers.l2(0.001)))
        
        # another layer that does batch normalization
  model.add(keras.layers.BatchNormalization())

        # another layer for Max Pooling
  model.add(keras.layers.MaxPool2D((3, 3), strides = (2, 2), padding = "same"))

      # conv layer 2
      # Params
          # 32 = number of filters
          # (3, 3) = Kernal size
          # activation = activation function for non-linearity (set to ReLu)
          # input_shape = shape/ order of input data
          # kernel_regularizer = prevents overfitting (using l2 regularization)
  model.add(keras.layers.Conv2D(32, (3, 3), activation="relu", kernel_regularizer = keras.regularizers.l2(0.001)))
        
        # another layer that does batch normalization
  model.add(keras.layers.BatchNormalization())

        # another layer for Max Pooling
  model.add(keras.layers.MaxPool2D((3, 3), strides = (2, 2), padding = "same"))

      # conv layer 3
      # Params
          # 32 = number of filters
          # (2, 2) = Kernal size
          # activation = activation function for non-linearity (set to ReLu)
          # input_shape = shape/ order of input data
          # kernel_regularizer = prevents overfitting (using l2 regularization)
  model.add(keras.layers.Conv2D(32, (2, 2), activation="relu", kernel_regularizer = keras.regularizers.l2(0.001)))
        
        # another layer that does batch normalization
  model.add(keras.layers.BatchNormalization())

        # another layer for Max Pooling
  model.add(keras.layers.MaxPool2D((2, 2), strides = (2, 2), padding = "same"))

    # flatten the output to a 1D array and feed it into a dense layer
  model.add(keras.layers.Flatten())

      # the Dense Layer (fully connected)
        # Params
          # 64 = number of neurons
          # activation = activation function for non-linearity (set to ReLu)
  model.add(keras.layers.Dense(64, activation = "relu"))

      # adding dropout layer to reduce overfitting (30% of the neurons)
  model.add(keras.layers.Dropout(0.3))

    # softmax classifer (output layer)
  model.add(keras.layers.Dense(NUM_KEYWORDS, activation = "softmax"))

  # compile the model
    # using the ADAM optimizer
  optimizer = keras.optimizers.Adam(learning_rate = learning_rate)

    # Params
      # loss -> the loss algo used to minimise error (set to sparse categorical cross entropy loss function)
      # metrics -> what we track during training
  model.compile(optimizer = optimizer, loss = error, metrics = ["accuracy"])

  # print model overview
  model.summary()

  return model


In [None]:
# build the CNN Model

# X_train is an N-D Array of input data (3D Input)
  # X_train.shape[1] = shape for first dimension -> Number of segments (length of segment = total_samples/ hop_length)
  # X_train.shape[2] = shape for second dimension -> Number of co-effients (MFCC co-efficients = 13)
  # X_train.shape[3] = shape for third dimension -> Carries information about the depth (Set to 1)
input_shape = (X_train.shape[1], X_train.shape[2], X_train.shape[3])

# Params
  # input_shape = shape/ order of the input data being fed to the CNN model
  # LEARNING_RATE = constant to be used in model optimization
model = build_model(input_shape, LEARNING_RATE)

In [None]:
# train the model

# Params
  # X_train = inputs
  # y_train = outputs
  # epochs = iterations
  # batch_size = number of data samples to consider per epoch
  # validation_data = has inputs and outputs for model optimization
model.fit(X_train, y_train, epochs = EPOCHS, batch_size = BATCH_SIZE, validation_data = (X_validation, y_validation))

In [None]:
# evaluate the model on the test set

test_error, test_accuracy = model.evaluate(X_test, y_test)
print(f"Test Error: {test_error}, Test Accuracy: {test_accuracy}")

In [None]:
# save the model

model.save(SAVED_MODEL_PATH)