In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import cupy as np # linear algebra


In [2]:
!rsync -av /kaggle/input/speaker-recognition-dataset/16000_pcm_speeches train

In [25]:
import tensorflow as tf
import os
from os.path import isfile, join
import numpy as np
import shutil
from tensorflow import keras
from pathlib import Path
from IPython.display import display, Audio
from tensorflow.keras.layers import Conv1D
import subprocess
import numpy

Define Variables

In [20]:
dataset = os.path.join( "train/16000_pcm_speeches")

audios = 'audio'
noises = 'noise'

DATASET_AUDIOS_PATH = os.path.join(dataset, audios)
DATASET_NOISES_PATH = os.path.join(dataset, noises)


VALID_SPLIT = 0.1

SHUFFLE_SEED = 43

SAMPLING_RATE = 16000

SCALE = 0.5

BATCH_SIZE = 128
EPOCHS = 25

Arrange Audio and Noise

In [5]:
if os.path.exists(DATASET_AUDIOS_PATH) is False:
    os.makedirs(DATASET_AUDIOS_PATH)

if tf.io.gfile.exists(DATASET_NOISES_PATH) is False:
    tf.io.gfile.makedirs(DATASET_NOISES_PATH)

for folder in os.listdir(dataset):
    if os.path.isdir(os.path.join(dataset, folder)):
        if folder in [audios, noises]:
            
            continue
        elif folder in ["other", "_background_noise_"]:
            
            shutil.move(
                os.path.join(dataset, folder),
                os.path.join(DATASET_NOISES_PATH, folder),
            )
        else:
            shutil.move(
                os.path.join(dataset, folder),
                os.path.join(DATASET_AUDIOS_PATH, folder),
            )

In [6]:
noise_paths = []
for subdir in tf.io.gfile.listdir(DATASET_NOISES_PATH):
    subdir_path = Path(DATASET_NOISES_PATH) / subdir
    if os.path.isdir(subdir_path):
        noise_paths += [
            os.path.join(subdir_path, filepath)
            for filepath in os.listdir(subdir_path)
            if filepath.endswith(".wav")
        ]

print(
    "Found {} files belonging to {} directories".format(
        len(noise_paths), len(os.listdir(DATASET_NOISES_PATH))
    )
)

Convert all Noises to 16000HZ

In [7]:
command = (
    "for dir in `ls -1 " + DATASET_NOISES_PATH + "`; do "
    "for file in `ls -1 " + DATASET_NOISES_PATH + "/$dir/*.wav`; do "
    "sample_rate=`ffprobe -hide_banner -loglevel panic -show_streams "
    "$file | grep sample_rate | cut -f2 -d=`; "
    "if [ $sample_rate -ne 16000 ]; then "
    "ffmpeg -hide_banner -loglevel panic -y "
    "-i $file -ar 16000 temp.wav; "
    "mv temp.wav $file; "
    "fi; done; done"
)

os.system(command)

Split noise into chunks of 16000 each

In [8]:
def load_noise_sample(path):
    sample, sampling_rate = tf.audio.decode_wav(
        tf.io.read_file(path), desired_channels=1
    )
    if sampling_rate == SAMPLING_RATE:
        slices = int(sample.shape[0] / SAMPLING_RATE)
        sample = tf.split(sample[: slices * SAMPLING_RATE], slices)
        return sample
    else:
        print("Sampling rate for {} is incorrect. Ignoring it".format(path))
        return None

In [9]:
noises = []
for path in noise_paths:
    sample = load_noise_sample(path)
    if sample:
        noises.extend(sample)
noises = tf.stack(noises)

print(
    "{} noise files were split into {} noise samples where each is {} sec. long".format(
        len(noise_paths), noises.shape[0], noises.shape[1] // SAMPLING_RATE
    )
)

Create Dataset

In [10]:
def paths_and_labels_to_dataset(audio_paths, labels):
    path_ds = tf.data.Dataset.from_tensor_slices(audio_paths)
    audio_ds = path_ds.map(lambda x : path_to_audio(x))
    label_ds = tf.data.Dataset.from_tensor_slices(labels)
    return tf.data.Dataset.zip((audio_ds, label_ds))


def path_to_audio(path):
    audio = tf.io.read_file(path)
    audio, _ = tf.audio.decode_wav(audio, 1, SAMPLING_RATE)
    return audio


Add Noise

In [11]:
def add_noise(audio, noises=None, scale=0.5):
    if noises is not None:
        tf_rnd = tf.random.uniform(
            (tf.shape(audio)[0],), 0, noises.shape[0], dtype=tf.int32
        )
        noise = tf.gather(noises, tf_rnd, axis=0)

        prop = tf.math.reduce_max(audio, axis=1) / tf.math.reduce_max(noise, axis=1)
        prop = tf.repeat(tf.expand_dims(prop, axis=1), tf.shape(audio)[1], axis=1)

        audio = audio + noise * prop * scale

    return audio

In [12]:
def audio_to_fft(audio):
    audio = tf.squeeze(audio, axis=-1)
    fft = tf.signal.fft(
        tf.cast(tf.complex(real=audio, imag=tf.zeros_like(audio)), tf.complex64)
    )
    fft = tf.expand_dims(fft, axis=-1)

    return tf.math.abs(fft[:, : (audio.shape[1] // 2), :])

In [13]:
class_names = tf.io.gfile.listdir(DATASET_AUDIOS_PATH)
print("Our class names: {}".format(class_names,))

audio_paths = []
labels = []

for label, name in enumerate(class_names):
    print("Processing speaker {}".format(name,))
    dir_path = Path(DATASET_AUDIOS_PATH) / name
    speaker_sample_paths = [
        os.path.join(dir_path, filepath)
        for filepath in os.listdir(dir_path)
        if filepath.endswith(".wav")
    ]
    audio_paths += speaker_sample_paths
    labels += [label] * len(speaker_sample_paths)

print(labels)

print(
    "Found {} files belonging to {} classes.".format(len(audio_paths), len(class_names))
)

In [14]:
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(audio_paths)
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(labels)

Split into training and validation

In [15]:

num_val_samples = int(VALID_SPLIT * len(audio_paths))
print("Using {} files for training.".format(len(audio_paths) - num_val_samples))
train_audio_paths = audio_paths[:-num_val_samples]
train_labels = labels[:-num_val_samples]

print("Using {} files for validation.".format(num_val_samples))
valid_audio_paths = audio_paths[-num_val_samples:]
valid_labels = labels[-num_val_samples:]



# Create 2 datasets, one for training and the other for validation
train_ds = paths_and_labels_to_dataset(train_audio_paths, train_labels)
train_ds = train_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(
    BATCH_SIZE
)

valid_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)
valid_ds = valid_ds.shuffle(buffer_size=32 * 8, seed=SHUFFLE_SEED).batch(32)

Feature Extraction

In [16]:
# Add noise to the training set
train_ds = train_ds.map(
    lambda x, y: (add_noise(x, noises, scale=SCALE), y),
    num_parallel_calls=tf.data.experimental.AUTOTUNE,
)

# Transform audio wave to the frequency domain using `audio_to_fft`
train_ds = train_ds.map(
    lambda x, y: (audio_to_fft(x), y), num_parallel_calls=tf.data.experimental.AUTOTUNE
)

train_ds = train_ds.prefetch(tf.data.experimental.AUTOTUNE)

valid_ds = valid_ds.map(
    lambda x, y: (audio_to_fft(x), y), num_parallel_calls=tf.data.experimental.AUTOTUNE
)
valid_ds = valid_ds.prefetch(tf.data.experimental.AUTOTUNE)

Create Model 

In [17]:
def residual_block(x, filters, conv_num=3, activation='relu'):
    
  s = keras.layers.Conv1D(filters, 1, padding='same')(x)
  x = keras.layers.Conv1D(filters, 3, padding='same')(x)
  x = keras.layers.Add()([x, s])
  x = keras.layers.Activation(activation)(x)
  return keras.layers.MaxPool1D(pool_size=2, strides=2)(x)


def build_model(input_shape, num_classes):
  inputs = keras.layers.Input(shape=input_shape, name='audio_input')
  x = residual_block(inputs, 16, 2)
  x = residual_block(x, 32, 2)
  x = residual_block(x, 64, 3)
  x = residual_block(x, 128, 3)

  x = keras.layers.AveragePooling1D(pool_size=3, strides=3)(x)
  x = keras.layers.Flatten()(x)
  x = keras.layers.Dense(256, activation="relu")(x)
  x = keras.layers.Dense(128, activation="relu")(x)
  outputs = keras.layers.Dense(num_classes, activation="softmax", name="output")(x)

  return keras.models.Model(inputs=inputs, outputs=outputs)


model = build_model((SAMPLING_RATE // 2, 1), len(class_names))

model.summary()

Compile model and fit

In [18]:
model.compile(optimizer='Adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# CallBacks 
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir="./logs")  # https://keras.io/api/callbacks/tensorboard/
model_save_filename = "model.h5"
earlystopping_cb = keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)
mdlcheckpoint_cb = keras.callbacks.ModelCheckpoint( model_save_filename, monitor='val_accuracy', save_best_only=True )

In [21]:
history = model.fit(
    train_ds,
    epochs=EPOCHS,
    validation_data=valid_ds,
    callbacks=[earlystopping_cb, mdlcheckpoint_cb, tensorboard_callback],
)

Accuracy

In [22]:
print("Accuracy of model:",model.evaluate(valid_ds))