## Packages, Constants and File paths

The dataset's audio clips are of 6 classes and stored in 6 folders corresponding to each speech command: 
- `ddyo`- `kkono` - `mu maaso` - `emabega` - `yimirira` - `unknown`

In [None]:
from packages.common_imports import *
from packages.dataset_path import *

In [None]:
def list_directory_contents(directory, label):
    contents = np.array(tf.io.gfile.listdir(str(directory)))
    print(f'{label} commands labels: {contents}')
    return contents

In [None]:
train_commands = list_directory_contents(train_data_dir, 'Train')
test_commands = list_directory_contents(test_data_dir, 'Test')

In [None]:
dataset_folder = pathlib.Path('Dataset')

def print_directory_tree(root_dir, indent=''):
    print(indent + os.path.basename(root_dir) + os.path.sep)
    indent += '    '
    for item in os.listdir(root_dir):
        item_path = os.path.join(root_dir, item)
        if os.path.isdir(item_path):
            print_directory_tree(item_path, indent)

print_directory_tree(dataset_folder)

In [None]:
def extract_mfcc(dataset_path, num_mfcc=13, n_fft=2048, hop_length=512, num_segments=5):

    # dictionary to store mapping, labels, and MFCCs
    data = {
        "mapping": [],
        "labels": [],
        "mfcc": []
    }

    samples_per_segment = int(SAMPLES_PER_AUDIO / num_segments)
    num_mfcc_vectors_per_segment = math.ceil(samples_per_segment / hop_length)

    # loop through all sub-folders
    for i, (dirpath, dirnames, filenames) in enumerate(os.walk(dataset_path)):

        # ensure we're processing the sub-folder level
        if dirpath is not dataset_path:

            # save genre label (i.e., sub-folder name) in the mapping
            semantic_label = dirpath.split("/")[-1]
            data["mapping"].append(semantic_label)
            print(f"\nProcessing: {semantic_label}")

            # process all audio files in genre sub-dir
            for f in filenames:

                # load audio file
                file_path = os.path.join(dirpath, f)
                signal, sample_rate = librosa.load(file_path, sr=SAMPLE_RATE)

                # process all segments of audio file
                for d in range(num_segments):

                    # calculate start and finish sample for the current segment
                    start = samples_per_segment * d
                    finish = start + samples_per_segment

                    # extract mfcc
                    mfcc = librosa.feature.mfcc(y=signal[start:finish],
                                                sr=sample_rate,
                                                n_mfcc=num_mfcc,
                                                n_fft=n_fft,
                                                hop_length=hop_length)

                    mfcc = mfcc.T

                    # store only mfcc feature with the expected number of vectors
                    if len(mfcc) == num_mfcc_vectors_per_segment:
                        data["mfcc"].append(mfcc.tolist())
                        data["labels"].append(i-1)
                        print(f"{file_path}, segment:{d+1}")

    return data


In [None]:
num_mfcc = 13
n_fft = 2048
hop_length = 512
num_segments = 5

data = extract_mfcc(train_data_dir, num_mfcc, n_fft, hop_length, num_segments)


## Train and validation Datasets

In [None]:
# Function to load train and validation datasets
def load_train_dataset(json_path, batch_size, validation_split=0.2):
    # Load MFCCs from JSON and create TensorFlow dataset
    with open(json_path, "r") as fp:
        data = json.load(fp)

    mfccs = np.array(data["mfcc"])
    labels = np.array(data["labels"])

    dataset = tf.data.Dataset.from_tensor_slices((mfccs, labels))
    dataset = dataset.shuffle(len(mfccs)).batch(batch_size)

    train_size = int((1 - validation_split) * len(mfccs))
    train_ds = dataset.take(train_size)
    val_ds = dataset.skip(train_size)

    train_ds = train_ds.cache().prefetch(tf.data.AUTOTUNE)
    val_ds = val_ds.cache().prefetch(tf.data.AUTOTUNE)

    return train_ds, val_ds, data["mapping"]

In [None]:
train_mfcc_ds, val_mfcc_ds, mapping = load_train_dataset(TRAIN_JSON_PATH, BATCH_SIZE, VALIDATION_SPLIT)

## Test dataset

In [None]:
# Function to load test dataset
def load_test_dataset(json_path, batch_size):
    # Load MFCCs from JSON and create TensorFlow dataset
    with open(json_path, "r") as fp:
        data = json.load(fp)

    mfccs = np.array(data["mfcc"])
    labels = np.array(data["labels"])

    dataset = tf.data.Dataset.from_tensor_slices((mfccs, labels))
    dataset = dataset.shuffle(len(mfccs)).batch(batch_size)

    test_ds = dataset.cache().prefetch(tf.data.AUTOTUNE)

    return test_ds, data["mapping"]



In [None]:
test_mfcc_ds, mapping = load_test_dataset(TEST_JSON_PATH, BATCH_SIZE)

# Model 1

### Input shape 

In [None]:
example_spectrograms = next(iter(train_mfcc_ds))[0]
input_shape = example_spectrograms.shape[1:]

print('Input shape:', input_shape)
num_labels = len(mapping)

In [None]:
print(f'Labels {mapping}')
print(f'Number of labels: {num_labels}')

In [None]:
# Model artitecture 1
def model(input_shape, num_labels):
    model = models.Sequential([
        layers.Input(shape=input_shape),
        layers.Conv2D(16, 3, activation='relu', padding='same'),
        layers.MaxPooling2D(),
        layers.Conv2D(32, 3, activation='relu', padding='same'),
        layers.MaxPooling2D(),
        layers.Conv2D(64, 3, activation='relu', padding='same'),
        layers.MaxPooling2D(),
        layers.Conv2D(128, 3, activation='relu', padding='same'),
        layers.GlobalMaxPooling2D(),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(num_labels, activation='softmax')
    ])

    return model

model = model(input_shape, num_labels)

### Model Architecture

In [None]:
model.summary()

### Compile and Train the model

In [None]:
Epochs = 35
patience = 10
learning_rate = 0.001
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)

In [None]:
# Function to compile and train the model
def compile_and_train_model(model, train_ds, val_ds, learning_rate=learning_rate):
    try:
        optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
        model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
        early_stopping = EarlyStopping(monitor='val_loss', patience=patience, restore_best_weights=True)
        reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=patience, min_lr=1e-6)
        history = model.fit(train_ds, validation_data=val_ds, epochs=Epochs, callbacks=[early_stopping, reduce_lr])
        return history
    except Exception as e:
        print(f"An error occurred during model compilation and training: {str(e)}")

In [None]:
history = compile_and_train_model(model, train_mfcc_ds, val_mfcc_ds)

### Plot Accuracy and Loss

In [None]:

# Function to plot the training history
def plot_training_history(history):
    try:
        acc = history.history['accuracy']
        val_acc = history.history['val_accuracy']
        loss = history.history['loss']
        val_loss = history.history['val_loss']

        epochs = range(len(acc))

        plt.figure(figsize=(12, 4))
        plt.subplot(1, 2, 1)
        plt.plot(epochs, acc, 'r', label='Training accuracy')
        plt.plot(epochs, val_acc, 'b', label='Validation accuracy')
        plt.title('Training and validation accuracy')
        plt.xlabel('Epochs')
        plt.ylabel('Accuracy')
        plt.legend()

        plt.subplot(1, 2, 2)
        plt.plot(epochs, loss, 'r', label='Training loss')
        plt.plot(epochs, val_loss, 'b', label='Validation loss')
        plt.title('Training and validation loss')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.legend()

        plt.tight_layout()
        plt.show()
    except Exception as e:
        print(f"An error occurred during plotting the training history: {str(e)}")

In [None]:
plot_training_history(history)

### Evaluate the model performance

Run the model on the test set and check the model's performance:

In [None]:
# Function to evaluate the model on the test dataset
def evaluate_model(model, test_ds):
    try:
        y_true = []
        y_pred = []
        for audio, labels in test_ds:
            predictions = model.predict(audio, verbose=0)
            y_true.extend(labels.numpy())
            y_pred.extend(tf.argmax(predictions, axis=1).numpy())

        loss, accuracy = model.evaluate(test_ds, verbose=0)
        precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
        recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)
        f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)

        print(f"Test accuracy:      {int(accuracy * 100)}%")
        print(f"Test loss:          {loss}")
        print(f"Precision:          {precision}")
        print(f"Recall:             {recall}")
        print(f"F1-score:           {f1}")
    except Exception as e:
        print(f"An error occurred during model evaluation: {str(e)}")

In [None]:
evaluate_model(model, test_mfcc_ds)

## Confusion matrix

In [None]:
y_pred = model.predict(test_mfcc_ds)
y_pred = tf.argmax(y_pred, axis=1)
y_true = tf.concat(list(test_mfcc_ds.map(lambda s,lab: lab)), axis=0)
label_names_slice = ['ddyo', 'emabega', 'gaali', 'kkono', 'mumaaso', 'unknown', 'yimirira']

In [None]:
# Function to plot the confusion matrix
def plot_confusion_matrix(y_true, y_pred, label_names):
    try:
        confusion_mtx = tf.math.confusion_matrix(y_true, y_pred)
        plt.figure(figsize=(8, 6))
        sns.heatmap(confusion_mtx,
                    xticklabels=label_names,
                    yticklabels=label_names,
                    annot=True, fmt='g')
        plt.xlabel('Prediction')
        plt.ylabel('Label')
        plt.title('Confusion Matrix')
        plt.show()
    except Exception as e:
        print(f"An error occurred during plotting the confusion matrix: {str(e)}")

In [None]:
plot_confusion_matrix(y_true, y_pred, label_names_slice)

## save the Keras model

In [None]:
KERAS_MODEL_PATH = "model/mfcc_model_1.keras"

In [None]:
model.save(KERAS_MODEL_PATH)

## Size of the keras model

In [None]:
# Function to get the file size
def get_and_convert_file_size(file_path, unit=None):
    size = os.path.getsize(file_path)
    if unit == "KB":
        return print('File size: ' + str(round(size / 1024, 3)) + ' Kilobytes')
    elif unit == "MB":
        return print('File size: ' + str(round(size / (1024 * 1024), 3)) + ' Megabytes')
    else:
        return print('File size: ' + str(size) + ' bytes')


In [None]:
keras_model_size = get_and_convert_file_size(KERAS_MODEL_PATH, 'MB')

In [None]:
model.save(KERAS_MODEL_PATH)
keras_model_size = get_and_convert_file_size(KERAS_MODEL_PATH, 'KB')

In [None]:
# Assuming you have a Keras model named 'model'
# import tensorflow as tf

# converter = tf.lite.TFLiteConverter.from_keras_model(model)
# tflite_model = converter.convert()
# with open('model.tflite', 'wb') as f:
#     f.write(tflite_model)

# Run an inference

In [None]:
from tensorflow.keras.models import load_model

KERAS_MODEL_PATH = "model/model_1.keras"
model = load_model(KERAS_MODEL_PATH)

In [None]:
from modules.inference import predict_audio

In [None]:
# file_path_inference = kkono_file_path
file_path_inference = 'ras1.wav'
predicted_label, probability = predict_audio(file_path_inference, model, SAMPLE_RATE)
print(f"Predicted label: {predicted_label}, Probability: {probability}")

In [None]:
file_path_inference = ddyo_file_path
predicted_label, probability = predict_audio(file_path_inference, model, SAMPLE_RATE)
print(f"Predicted label: {predicted_label}, Probability: {probability}")

In [None]:
file_path_inference = gaali_file_path
predicted_label, probability = predict_audio(file_path_inference, model, SAMPLE_RATE)
print(f"Predicted label: {predicted_label}, Probability: {probability}")

In [None]:
file_path_inference = yimirira_file_path
predicted_label, probability = predict_audio(file_path_inference, model, SAMPLE_RATE)
print(f"Predicted label: {predicted_label}, Probability: {probability}")


In [None]:
file_path_inference = emabega_file_path
predicted_label, probability = predict_audio(file_path_inference, model, SAMPLE_RATE)
print(f"Predicted label: {predicted_label}, Probability: {probability}")

In [None]:
file_path_inference = mumasso_file_path
predicted_label, probability = predict_audio(file_path_inference, model, SAMPLE_RATE)
print(f"Predicted label: {predicted_label}, Probability: {probability}") 
