**1. Import**

In [None]:
!pip install pydub



In [None]:
import os
import tensorflow as tf
import soundfile as sf
import numpy as np
import librosa
from glob import glob
from IPython.display import Audio
from pydub import AudioSegment
import random
from math import sqrt

In [None]:
# path = '/content/drive/MyDrive/Kaggle/speech_recognition/'
path = '/content/gdrive/MyDrive/speech_recognition/'
classes = ['yes', 'no', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'up', 'down']

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
word_index = 3
audio_files = glob(path + classes[word_index] + "/" + random.choice(os.listdir(path + classes[word_index])))

wav, sr = librosa.load(audio_files[0], sr=16000)
wav = librosa.util.fix_length(wav, size=16000)

Audio(wav, rate=sr)

In [None]:
noises = []
for filename in os.listdir(os.path.join(path, "_noise")):
  file_path = os.path.join(path, "_noise", filename)
  noise = AudioSegment.from_file(file_path)

  max_volume_noise = noise.max
  volume_down = sqrt(max_volume_noise / 40)

  reduced_noise = noise - volume_down

  temp_file = "reduced_volume.wav"
  reduced_noise.export(temp_file, format="wav")

  noise, sr = librosa.load(temp_file, sr=16000)
  start_index = random.randint(0, len(noise) - 16000)
  noise = noise[start_index : start_index + 16000]
  noises.append(noise)
print(len(noises))

6


In [None]:
random_index = random.randint(0, len(noises))-1
wav = wav + noises[random_index]
Audio(wav, rate=sr)

**2. Creating the dataset**

In [None]:
tf_dic_train = {}
tf_dic_val = {}
tf_dic_test = {}
FILES_TO_LOAD = 500

# Read file names from testing_list.txt and validation_list.txt
with open(os.path.join(path, "testing_list.txt"), 'r') as file:
    test_names = file.read().splitlines()
with open(os.path.join(path, "validation_list.txt"), 'r') as file:
    val_names = file.read().splitlines()

for i, label in enumerate(classes):
    train_tmp = []
    val_tmp = []
    test_tmp = []
    for filename in os.listdir(os.path.join(path, label)):
        file_path = os.path.join(path, label, filename)
        random_index = random.randint(0, len(noises))-1
        if "/".join([label, filename]) in test_names:
            if len(test_tmp) >= FILES_TO_LOAD * 0.1:
                pass
            else:
                audio, sample_rate = librosa.load(file_path, sr=16000)
                audio = librosa.util.fix_length(audio, size=16000)
                audio_blend = audio + noises[random_index]
                test_tmp.append(librosa.feature.mfcc(y=audio_blend, sr=sample_rate))
        elif "/".join([label, filename]) in val_names:
            if len(val_tmp) >= FILES_TO_LOAD * 0.1:
                pass
            else:
                audio, sample_rate = librosa.load(file_path, sr=16000)
                audio = librosa.util.fix_length(audio, size=16000)
                audio_blend = audio + noises[random_index]
                val_tmp.append(librosa.feature.mfcc(y=audio_blend, sr=sample_rate))
        else:
            if len(train_tmp) >= FILES_TO_LOAD * 0.8:
                pass
            else:
                audio, sample_rate = librosa.load(file_path, sr=16000)
                audio = librosa.util.fix_length(audio, size=16000)
                audio_blend = audio + noises[random_index]
                train_tmp.append(librosa.feature.mfcc(y=audio_blend, sr=sample_rate))

    train_tmp_tf = tf.data.Dataset.from_tensor_slices(train_tmp)
    val_tmp_tf = tf.data.Dataset.from_tensor_slices(val_tmp)
    test_tmp_tf = tf.data.Dataset.from_tensor_slices(test_tmp)

    tf_dic_train[label] = tf.data.Dataset.zip((train_tmp_tf, tf.data.Dataset.from_tensor_slices(tf.fill((len(train_tmp_tf),), i))))
    tf_dic_val[label] = tf.data.Dataset.zip((val_tmp_tf, tf.data.Dataset.from_tensor_slices(tf.fill((len(val_tmp_tf),), i))))
    tf_dic_test[label] = tf.data.Dataset.zip((test_tmp_tf, tf.data.Dataset.from_tensor_slices(tf.fill((len(test_tmp_tf),), i))))

datasets_train = list(tf_dic_train.values())
datasets_val = list(tf_dic_val.values())
datasets_test = list(tf_dic_test.values())

In [None]:
from functools import reduce

def dataset_reduce(datasets):
  merged_dataset_reduce = reduce(lambda d1, d2: d1.concatenate(d2), datasets)
  return merged_dataset_reduce.shuffle(buffer_size=1000)

In [None]:
datasets_train = dataset_reduce(datasets_train)
datasets_val = dataset_reduce(datasets_val)
datasets_test = dataset_reduce(datasets_test)

In [None]:
len(datasets_train), len(datasets_val), len(datasets_test)

(5171, 650, 650)

In [None]:
def preprocess_multy(mfcc, label):
  return mfcc, tf.one_hot(label, 13)

**3. Preprocessing**

In [None]:
# Train data:
train_data = datasets_train.map(preprocess_multy)
train_data = train_data.cache()
train_data = train_data.shuffle(buffer_size=1000)
train_data = train_data.batch(32)
train_data = train_data.prefetch(16)

In [None]:
# Validation data:
val_data = datasets_val.map(preprocess_multy)
val_data = val_data.cache()
val_data = val_data.shuffle(buffer_size=1000)
val_data = val_data.batch(32)
val_data = val_data.prefetch(16)

In [None]:
# Test data:
test_data = datasets_test.map(preprocess_multy)
test_data = test_data.cache()
test_data = test_data.shuffle(buffer_size=1000)
test_data = test_data.batch(32)
test_data = test_data.prefetch(16)

In [None]:
samples, labels = train_data.as_numpy_iterator().next()
samples.shape, labels.shape

((32, 20, 32), (32, 13))

**4. The model**

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras import layers

In [None]:
model = tf.keras.Sequential([
    layers.Reshape((20, 32, 1), input_shape=(20, 32)),
    layers.Conv2D(32, (3, 3), activation='relu'),
    # layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.5),
    layers.Conv2D(64, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.5),
    layers.Conv2D(128, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.5),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    # layers.Dropout(0.5),
    layers.Dense(64, activation='relu'),
    #layers.Dropout(0.5),
    layers.Dense(13, activation='softmax')
])

In [None]:
# Define the learning rate schedule

initial_learning_rate = 0.001
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate, decay_steps=200, decay_rate=0.99, staircase=True)

# Define the optimizer with the learning rate schedule
optimizer_decay = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

In [None]:
# Compile the model
model.compile(optimizer=optimizer_decay,
              loss=tf.keras.losses.CategoricalCrossentropy(),
              metrics=['accuracy', tf.keras.metrics.Recall(),tf.keras.metrics.Precision()])

# Print model summary
model.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 reshape_1 (Reshape)         (None, 20, 32, 1)         0         
                                                                 
 conv2d_3 (Conv2D)           (None, 18, 30, 32)        320       
                                                                 
 dropout_3 (Dropout)         (None, 18, 30, 32)        0         
                                                                 
 conv2d_4 (Conv2D)           (None, 16, 28, 64)        18496     
                                                                 
 max_pooling2d_2 (MaxPooling  (None, 8, 14, 64)        0         
 2D)                                                             
                                                                 
 dropout_4 (Dropout)         (None, 8, 14, 64)         0         
                                                      

**5. Training the model**

In [None]:
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)
model_checkpoint = tf.keras.callbacks.ModelCheckpoint('model.h5', monitor='val_accuracy', save_best_only=True)

# Train the model with callbacks
history = model.fit(train_data,
                    epochs=50,
                    validation_data=val_data,
                    callbacks=[early_stopping, model_checkpoint])

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50


In [None]:
# Save the model

model.save(path + "model_MFCC_noise_drop_84")



In [None]:
from keras.models import load_model

model_loaded = load_model(path + "model_MFCC_noise_drop_84")

**6. The results**

In [None]:
# Evaluate the model on the test data
test_loss, test_accuracy, test_recall, test_precision = model_loaded.evaluate(test_data)

print('Test Loss:', test_loss)
print('Test Accuracy:', test_accuracy)
print('Test Recall:', test_recall)
print('Test Precision:', test_precision)

Test Loss: 0.5350462198257446
Test Accuracy: 0.8276923298835754
Test Recall: 0.7984615564346313
Test Precision: 0.8841567039489746
