<a href="https://colab.research.google.com/github/eugeniapramesti/Speaker-Recognition/blob/main/Speaker_Recognition_Pert_9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!wget 'https://www.googleapis.com/drive/v3/files/15E6vIBf8BBw5VKujsyLKgOFqEGXCEKvN?alt=media&key=AIzaSyAJRCmePTVCsSV6-knDKpcNWLRoSw7Jfrg' -O "data.zip"

--2023-09-25 13:28:43--  https://www.googleapis.com/drive/v3/files/15E6vIBf8BBw5VKujsyLKgOFqEGXCEKvN?alt=media&key=AIzaSyAJRCmePTVCsSV6-knDKpcNWLRoSw7Jfrg
Resolving www.googleapis.com (www.googleapis.com)... 64.233.182.95, 64.233.183.95, 173.194.193.95, ...
Connecting to www.googleapis.com (www.googleapis.com)|64.233.182.95|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 242551504 (231M) [application/x-zip-compressed]
Saving to: ‘data.zip’


2023-09-25 13:28:47 (73.2 MB/s) - ‘data.zip’ saved [242551504/242551504]



In [2]:
!unzip "./data.zip" -d "./data"

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: ./data/16000_pcm_speeches/audio/Nelson_Mandela/300.wav  
  inflating: ./data/16000_pcm_speeches/audio/Nelson_Mandela/469.wav  
  inflating: ./data/16000_pcm_speeches/audio/Julia_Gillard/615.wav  
  inflating: ./data/16000_pcm_speeches/audio/Nelson_Mandela/230.wav  
  inflating: ./data/16000_pcm_speeches/audio/Benjamin_Netanyau/735.wav  
  inflating: ./data/16000_pcm_speeches/audio/Jens_Stoltenberg/157.wav  
  inflating: ./data/16000_pcm_speeches/audio/Julia_Gillard/59.wav  
  inflating: ./data/16000_pcm_speeches/audio/Nelson_Mandela/702.wav  
  inflating: ./data/16000_pcm_speeches/audio/Magaret_Tarcher/766.wav  
  inflating: ./data/16000_pcm_speeches/audio/Magaret_Tarcher/360.wav  
  inflating: ./data/16000_pcm_speeches/audio/Julia_Gillard/383.wav  
  inflating: ./data/16000_pcm_speeches/audio/Benjamin_Netanyau/737.wav  
  inflating: ./data/16000_pcm_speeches/audio/Julia_Gillard/710.wav  
  inflating: ./data/

In [3]:
import os
import numpy as np
import tensorflow as tf
from pathlib import Path
from IPython.display import display, Audio
from tensorflow import keras

In [4]:
# Define Parameters

DATASET_ROOT = "./data/16000_pcm_speeches"
DATASET_AUDIO_PATH = os.path.join(DATASET_ROOT, "audio")
DATASET_NOISE_PATH = os.path.join(DATASET_ROOT, "noise")

In [5]:
VALID_SPLIT = 0.1
SHUFFLE_SEED = 10
SAMPLING_RATE = 16000
EPOCH = 5
BATCH_SIZE = 128

In [6]:
noise_paths = []
for subdir in os.listdir(DATASET_NOISE_PATH):
  subdir_path = os.path.join(DATASET_NOISE_PATH, subdir)

  for file in os.listdir(subdir_path):
    noise_paths += [os.path.join(subdir_path, file)]

print(noise_paths)

['./data/16000_pcm_speeches/noise/other/exercise_bike.wav', './data/16000_pcm_speeches/noise/other/pink_noise.wav', './data/16000_pcm_speeches/noise/_background_noise_/doing_the_dishes.wav', './data/16000_pcm_speeches/noise/_background_noise_/10convert.com_Audience-Claps_daSG5fwdA7o.wav', './data/16000_pcm_speeches/noise/_background_noise_/dude_miaowing.wav', './data/16000_pcm_speeches/noise/_background_noise_/running_tap.wav']


In [7]:
def load_noise_sample(file_path):
  sample, _ = tf.audio.decode_wav(tf.io.read_file(file_path) , desired_channels=1)
  slices = int(sample.shape[0] / SAMPLING_RATE)
  sample = tf.split(sample[ :slices * SAMPLING_RATE], slices)

  return sample

noises = []
for path in noise_paths:
  sample = load_noise_sample(path)
  if sample:
    noises.extend(sample)
noises = tf.stack(noises)

print("Noise File : {}".format(len(noise_paths)))
print("Noise Total Samples : {}".format(noises.shape[0]))

Noise File : 6
Noise Total Samples : 510


In [8]:
def path_to_audio(file_path):
  sample, _ = tf.audio.decode_wav(tf.io.read_file(file_path), 1, SAMPLING_RATE)
  return sample

def paths_and_labels_to_dataset(audio_paths, labels):
  path_ds = tf.data.Dataset.from_tensor_slices(audio_paths) # Path
  audio_ds = path_ds.map(lambda path: path_to_audio(path)) # Audio Dataset
  label_ds = tf.data.Dataset.from_tensor_slices(labels) # Label Dataset

  return tf.data.Dataset.zip((audio_ds, label_ds))

In [9]:
def add_noise(audio, noises):
  tf_random_index = tf.random.uniform( (tf.shape(audio)[0],) , 0, noises.shape[0], dtype = tf.int32)

  # Gather Noise
  noise = tf.gather(noises, tf_random_index, axis = 0)

  # Reduce Max (Normalisasi)
  prop = tf.math.reduce_max(audio, axis = 1)/tf.math.reduce_max(noise, axis = 1)

  # Reshape Dimension
  prop = tf.repeat(tf.expand_dims(prop, axis = 1), tf.shape(audio)[1], axis = 1)

  scale = 0.5
  audio = audio + (noise * prop * scale)

  return audio

In [10]:
audio_paths = []
labels = []

class_names = os.listdir(DATASET_AUDIO_PATH)

for index, name in enumerate(class_names):
  # print(index, name)

  subdir_path = os.path.join(DATASET_AUDIO_PATH, name)

  speaker_sample_paths = []
  for file_path in os.listdir(subdir_path):
    speaker_sample_paths += [os.path.join(subdir_path, file_path)]

  audio_paths += speaker_sample_paths
  labels += [index] * len(speaker_sample_paths)

# print(audio_paths)
# print(labels)

# Load Dataset

rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(audio_paths)
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(labels)

# Split Train and Validation
num_val_samples = int(VALID_SPLIT * len(audio_paths)) # 10 % from Total Data
print("Total {} Files For Train Set".format(len(audio_paths) -  num_val_samples))
print("Total {} Files For Validation Set".format(num_val_samples))

Total 6751 Files For Train Set
Total 750 Files For Validation Set


In [11]:
# Train Set
train_audio_paths = audio_paths[:-num_val_samples]
train_labels = labels[:-num_val_samples]

# Validation Set
val_audio_paths = audio_paths[-num_val_samples:]
val_labels = labels[-num_val_samples:]

In [12]:
# Create Datasets
train_ds = paths_and_labels_to_dataset(train_audio_paths, train_labels)
val_ds = paths_and_labels_to_dataset(val_audio_paths, val_labels)

# Shuffle Dataset + Create Batch
train_ds = train_ds.shuffle(buffer_size = BATCH_SIZE * 8, seed = SHUFFLE_SEED).batch(BATCH_SIZE)
val_ds = val_ds.shuffle(buffer_size = 32 * 8, seed = SHUFFLE_SEED).batch(32)

# Add Noise to Train & Val Dataset
train_ds = train_ds.map(
    lambda data, label:  (add_noise(data, noises), label)
)


# Fourier Transform
def audio_to_fft(audio):
  audio = tf.squeeze(audio, axis = -1)

  # FTT
  audio_complex = tf.complex(real = audio, imag = tf.zeros_like(audio))
  audio_complex_cast = tf.cast(audio_complex, tf.complex64)
  fft = tf.signal.fft(audio_complex_cast)

  # Expand Dims
  fft = tf.expand_dims(fft, axis = -1)

  return tf.math.abs(fft[:, :(audio.shape[1] // 2), :])

In [13]:
train_ds= train_ds.map(
    lambda x, y: (audio_to_fft(x), y), num_parallel_calls= tf.data.AUTOTUNE
)

train_ds= train_ds.prefetch(tf.data.AUTOTUNE)

val_ds= val_ds.map(
    lambda x, y: (audio_to_fft(x), y), num_parallel_calls= tf.data.AUTOTUNE
)

val_ds= val_ds.prefetch(tf.data.AUTOTUNE)


In [14]:
def residual_block(x, filters, conv_num=3, activation="relu"):
  s = keras.layers.Conv1D(filters, 1, padding = "same")(x)
  for i in range (conv_num-1):
    x = keras.layers.Conv1D(filters, 3, padding="same")(x)
    x = keras.layers.Activation(activation)(x)
  x = keras.layers.Conv1D(filters, 3, padding = "same")(x)
  x = keras.layers.Add()([x,s])
  x = keras.layers.Activation(activation)(x)
  return keras.layers.MaxPool1D(pool_size=2, strides=2)(x)


def build_model(input_shape, num_classes):
  inputs = keras.layers.Input(shape = input_shape, name = "input")

  x = residual_block(inputs, 16, 2)
  x = residual_block(x, 32, 2)
  x = residual_block(x, 64, 2)
  x = residual_block(x, 128, 2)
  x = residual_block(x, 128, 2)

  x = keras.layers.AveragePooling1D(pool_size=3, strides=3)(x)
  x = keras.layers.Flatten()(x)

  x = keras.layers.Dense(256, activation= "relu")(x)
  x = keras.layers.Dense(128, activation = "relu")(x)

  outputs = keras.layers.Dense(num_classes, activation = "softmax", name = "output")(x)
  return keras.models.Model(inputs = inputs, outputs = outputs)

model = build_model((SAMPLING_RATE // 2, 1), len(class_names))

model.summary()
model.compile(
    optimizer = "Adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input (InputLayer)          [(None, 8000, 1)]            0         []                            
                                                                                                  
 conv1d_1 (Conv1D)           (None, 8000, 16)             64        ['input[0][0]']               
                                                                                                  
 activation (Activation)     (None, 8000, 16)             0         ['conv1d_1[0][0]']            
                                                                                                  
 conv1d_2 (Conv1D)           (None, 8000, 16)             784       ['activation[0][0]']          
                                                                                              

In [15]:
# cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
#                                                  save_weights_only=True,
#                                                  verbose=1)

history= model.fit(
    train_ds,
    epochs= EPOCH,
    validation_data = val_ds,
    # callbacks=[cp_callback],
)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [16]:
skor= model.evaluate(val_ds)
skor[1]



0.9453333616256714

In [17]:
SAMPLE_TO_DISPLAY=2
test_ds= paths_and_labels_to_dataset(val_audio_paths, val_labels)
test_ds= test_ds.shuffle(buffer_size= BATCH_SIZE*8, seed= SHUFFLE_SEED).batch(BATCH_SIZE)

test_ds= test_ds.map(
    lambda x,y: (add_noise(x, noises), y)
)

for audios, labels in test_ds.take(1):
  ffts= audio_to_fft(audios)

  y_pred= model.predict(ffts)

  rnd= np.random.randint(0, BATCH_SIZE, SAMPLE_TO_DISPLAY)
  audios= audios.numpy()[rnd, :, :]
  labels= labels.numpy()[rnd]

  y_pred= np.argmax(y_pred, axis=-1)[rnd]

  for index in range(SAMPLE_TO_DISPLAY):
    print("speaker: {} predicted: {}".format(class_names[labels[index]], class_names[y_pred[index]]))
    display(Audio(audios[index, :, :].squeeze(), rate=SAMPLING_RATE))

speaker: Julia_Gillard predicted: Julia_Gillard


speaker: Benjamin_Netanyau predicted: Julia_Gillard
