In [21]:
import tensorflow as tf
import os
from os.path import isfile, join
import numpy as np
import shutil
from tensorflow import keras
from pathlib import Path
from IPython.display import display, Audio
import subprocess

In [22]:
data_directory = "./assets/politicians_voices"
audio_folder = "voices"
noise_folder = "noise"

audio_path = os.path.join(data_directory, audio_folder)
noise_path = os.path.join(data_directory, noise_folder)

In [23]:
dataset_split = 0.1

shuffle_seed = 13

sample_rate = 8000

scale = 0.5

batch_size = 128

epochs = 15

In [24]:
noise_paths = []
for filename in os.listdir(noise_path):
    if filename.endswith(".wav"):
        noise_paths.append(os.path.join(noise_path, filename))

In [25]:
command = (
    "for file in `ls -1 " + noise_path + "/*.wav`; do "
    "sample_rate=`ffprobe -hide_banner -loglevel panic -show_streams "
    "$file | grep sample_rate | cut -f2 -d=`; "
    "if [ $sample_rate -ne 16000 ]; then "
    "ffmpeg -hide_banner -loglevel panic -y "
    "-i $file -ar 16000 temp.wav; "
    "mv temp.wav $file; "
    "fi; done"
)


In [26]:
os.system(command)

0

In [27]:
def load_noise_sample(path):
    sample, sampling_rate = tf.audio.decode_wav(
        tf.io.read_file(path), desired_channels=1
    )
    if sampling_rate == sample_rate:
        slices = int(sample.shape[0] / sample_rate)
        sample = tf.split(sample[: slices * sample_rate], slices)
        return sample
    else:
        print("Sampling rate for",path, "is incorrect")
        return None


noises = []
for path in noise_paths:
    sample = load_noise_sample(path)
    if sample:
        noises.extend(sample)
noises = tf.stack(noises)

Sampling rate for ./assets/politicians_voices/noise/pink_noise.wav is incorrect
Sampling rate for ./assets/politicians_voices/noise/dude_miaowing.wav is incorrect
Sampling rate for ./assets/politicians_voices/noise/doing_the_dishes.wav is incorrect
Sampling rate for ./assets/politicians_voices/noise/exercise_bike.wav is incorrect
Sampling rate for ./assets/politicians_voices/noise/10convert.com_Audience-Claps_daSG5fwdA7o.wav is incorrect
Sampling rate for ./assets/politicians_voices/noise/running_tap.wav is incorrect


In [28]:
def path_to_audio(path):
    audio = tf.io.read_file(path)
    audio, _ = tf.audio.decode_wav(audio, 1, sample_rate)
    return audio

In [29]:
def paths_and_labels_to_dataset(audio_paths, labels):
    path_ds = tf.data.Dataset.from_tensor_slices(audio_paths)
    audio_ds = path_ds.map(lambda x: path_to_audio(x))
    label_ds = tf.data.Dataset.from_tensor_slices(labels)
    return tf.data.Dataset.zip((audio_ds, label_ds))

In [30]:
def add_noise(audio, noises=None, scale=0.5):
    if noises is not None:
        tf_rnd = tf.random.uniform(
            (tf.shape(audio)[0],), 0, noises.shape[0], dtype=tf.int32
        )
        noise = tf.gather(noises, tf_rnd, axis=0)

        prop = tf.math.reduce_max(audio, axis=1) / tf.math.reduce_max(noise, axis=1)
        prop = tf.repeat(tf.expand_dims(prop, axis=1), tf.shape(audio)[1], axis=1)

        audio = audio + noise * prop * scale

    return audio

In [31]:
def audio_to_fft(audio):
    audio = tf.squeeze(audio, axis=-1)
    fft = tf.signal.fft(
        tf.cast(tf.complex(real=audio, imag=tf.zeros_like(audio)), tf.complex64)
    )
    fft = tf.expand_dims(fft, axis=-1)

    return tf.math.abs(fft[:, : (audio.shape[1] // 2), :])

In [32]:
class_names = os.listdir(audio_path)
print(class_names,)

['Jens_Stoltenberg', 'Benjamin_Netanyau', 'Julia_Gillard', 'Magaret_Tarcher', 'Nelson_Mandela']


In [33]:
audio_paths = []
labels = []
for label, name in enumerate(class_names):
    print("Speaker:",(name))
    dir_path = Path(audio_path) / name
    speaker_sample_paths = [
        os.path.join(dir_path, filepath)
        for filepath in os.listdir(dir_path)
        if filepath.endswith(".wav")
    ]
    audio_paths += speaker_sample_paths
    labels += [label] * len(speaker_sample_paths)

Speaker: Jens_Stoltenberg
Speaker: Benjamin_Netanyau
Speaker: Julia_Gillard
Speaker: Magaret_Tarcher
Speaker: Nelson_Mandela


In [34]:
rng = np.random.RandomState(shuffle_seed)
rng.shuffle(audio_paths)
rng = np.random.RandomState(shuffle_seed)
rng.shuffle(labels)

In [35]:
num_val_samples = int(dataset_split * len(audio_paths))
train_audio_paths = audio_paths[:-num_val_samples]
train_labels = labels[:-num_val_samples]


valid_audio_paths = audio_paths[-num_val_samples:]
valid_labels = labels[-num_val_samples:]

In [36]:
train_ds = paths_and_labels_to_dataset(train_audio_paths, train_labels)
train_ds = train_ds.shuffle(buffer_size=batch_size * 8, seed=shuffle_seed).batch(
    batch_size
)

valid_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)
valid_ds = valid_ds.shuffle(buffer_size=32 * 8, seed=shuffle_seed).batch(32)

Tensor("DecodeWav:1", shape=(), dtype=int32)
Tensor("DecodeWav:1", shape=(), dtype=int32)


In [37]:
from tensorflow.keras.layers import Conv1D

In [38]:
def residual_block(x, filters, conv_num = 3, activation = "relu"):
    s = keras.layers.Conv1D(filters, 1, padding = "same")(x)
    
    for i in range(conv_num - 1):
        x = keras.layers.Conv1D(filters, 3, padding = "same")(x)
        x = keras.layers.Activation(activation)(x)
    
    x = keras.layers.Conv1D(filters, 3, padding = "same")(x)
    x = keras.layers.Add()([x, s])
    x = keras.layers.Activation(activation)(x)
    
    return keras.layers.MaxPool1D(pool_size = 2, strides = 2)(x)

def build_model(input_shape, num_classes):
    inputs = keras.layers.Input(shape = input_shape, name = "input")
    
    x = residual_block(inputs, 16, 2)
    x = residual_block(inputs, 32, 2)
    x = residual_block(inputs, 64, 3)
    x = residual_block(inputs, 128, 3)
    x = residual_block(inputs, 128, 3)
    x = keras.layers.AveragePooling1D(pool_size=3, strides=3)(x)
    x = keras.layers.Flatten()(x)
    x = keras.layers.Dense(256, activation="relu")(x)
    x = keras.layers.Dense(128, activation="relu")(x)
    
    outputs = keras.layers.Dense(num_classes, activation = "softmax", name = "output")(x)
    
    return keras.models.Model(inputs = inputs, outputs = outputs)

model = build_model((sample_rate, 1), len(class_names))

model.summary()

model.compile(optimizer="Adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) 

model_save_filename = "model.keras"

earlystopping_cb = keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)

mdlcheckpoint_cb = keras.callbacks.ModelCheckpoint(model_save_filename, monitor="val_accuracy", save_best_only=True)

In [39]:
history = model.fit(
    train_ds,
    epochs=epochs,
    validation_data=valid_ds,
    callbacks=[earlystopping_cb, mdlcheckpoint_cb],
)

Epoch 1/15
[1m53/53[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m355s[0m 7s/step - accuracy: 0.3112 - loss: 1.8820 - val_accuracy: 0.5707 - val_loss: 0.9921
Epoch 2/15
[1m53/53[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m346s[0m 7s/step - accuracy: 0.6350 - loss: 0.8542 - val_accuracy: 0.6947 - val_loss: 0.7015
Epoch 3/15
[1m53/53[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m341s[0m 6s/step - accuracy: 0.7416 - loss: 0.6232 - val_accuracy: 0.7040 - val_loss: 0.6657
Epoch 4/15
[1m53/53[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m350s[0m 7s/step - accuracy: 0.7925 - loss: 0.4888 - val_accuracy: 0.7373 - val_loss: 0.6508
Epoch 5/15
[1m53/53[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m362s[0m 7s/step - accuracy: 0.8451 - loss: 0.3683 - val_accuracy: 0.7600 - val_loss: 0.6259
Epoch 6/15
[1m53/53[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m349s[0m 7s/step - accuracy: 0.8763 - loss: 0.2750 - val_accuracy: 0.7693 - val_loss: 0.7021
Epoch 7/15
[1m53/53[0m [32m━━━━

In [58]:
print("Accuracy of model:",model.evaluate(valid_ds))

[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 266ms/step - accuracy: 0.7441 - loss: 0.6533
Accuracy of model: [0.639505922794342, 0.7599999904632568]


In [45]:
SAMPLES_TO_DISPLAY = 10

test_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)
test_ds = test_ds.shuffle(buffer_size=batch_size * 8, seed=shuffle_seed).batch(
    batch_size
)

test_ds = test_ds.map(lambda x, y: (add_noise(x, noises, scale=scale), y))

for audios, labels in test_ds.take(1):
    ffts = audio_to_fft(audios)
    y_pred = model.predict(ffts)
    rnd = np.random.randint(0, batch_size, SAMPLES_TO_DISPLAY)
    audios = audios.numpy()[rnd, :, :]
    labels = labels.numpy()[rnd]
    y_pred = np.argmax(y_pred, axis=-1)[rnd]

    for index in range(SAMPLES_TO_DISPLAY):
        print(
            "Speaker:\33{} {}\33[0m\tPredicted:\33{} {}\33[0m".format(
                "[92m" if labels[index] == y_pred[index] else "[91m",
                class_names[labels[index]],
                "[92m" if labels[index] == y_pred[index] else "[91m",
                class_names[y_pred[index]],
            )
        )
        if labels[index] ==y_pred[index]:
            print("Welcome")
        else:
            print("Sorry")
        print("The speaker is" if labels[index] == y_pred[index] else "", class_names[y_pred[index]])

Tensor("DecodeWav:1", shape=(), dtype=int32)


ValueError: in user code:

    File "/var/folders/0_/nzr6zwms0wnbk944qwmv4bf40000gn/T/ipykernel_26784/2238601770.py", line 8, in None  *
        lambda x, y: (add_noise(x, noises, scale=scale), y)
    File "/var/folders/0_/nzr6zwms0wnbk944qwmv4bf40000gn/T/ipykernel_26784/2727129706.py", line 8, in add_noise  *
        prop = tf.math.reduce_max(audio, axis=1) / tf.math.reduce_max(noise, axis=1)

    ValueError: Invalid reduction dimension 1 for input with 1 dimensions. for '{{node Max_1}} = Max[T=DT_FLOAT, Tidx=DT_INT32, keep_dims=false](GatherV2, Max_1/reduction_indices)' with input shapes: [?], [] and with computed input tensors: input[1] = <1>.


In [64]:
def paths_to_dataset(audio_paths):
    path_ds = tf.data.Dataset.from_tensor_slices(audio_paths)
    return tf.data.Dataset.zip((path_ds))

def predict(path, labels):
    test = paths_and_labels_to_dataset(path, labels)


    test = test.shuffle(buffer_size=batch_size * 8, seed=shuffle_seed).batch(
    batch_size
    )
    test = test.prefetch(tf.data.experimental.AUTOTUNE)


    test = test.map(lambda x, y: (add_noise(x, noises, scale=scale), y))

    for audios, labels in test.take(1):
        ffts = audio_to_fft(audios)
        y_pred = model.predict(ffts)
        rnd = np.random.randint(0, 1, 1)
        audios = audios.numpy()[rnd, :]
        labels = labels.numpy()[rnd]
        y_pred = np.argmax(y_pred, axis=-1)[rnd]

    for index in range(1):
            print(
            "Speaker:\33{} {}\33[0m\tPredicted:\33{} {}\33[0m".format(
            "[92m",y_pred[index],
                "[92m", y_pred[index]
                )
            )
            
            print("Speaker Predicted:",class_names[y_pred[index]])

In [67]:
path = ["./assets/politicians_voices/voices/Jens_Stoltenberg/1013.wav"]
labels = ["unknown"]
try:
    predict(path, labels)
except:
    print("Error! Check if the file correctly passed or not!")


Tensor("DecodeWav:1", shape=(), dtype=int32)
Error! Check if the file correctly passed or not!
