<a href="https://colab.research.google.com/github/21bcs10985/-Functions-and-Errors---ETH-AVAX/blob/main/speaker-identification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [27]:
import shutil

shutil.rmtree("/content/noise")


In [20]:
import os
import shutil
import numpy as np
import tensorflow as tf
from tensorflow import keras
from pathlib import Path
from IPython.display import display, Audio

In [21]:
DATASET_ROOT = "./"

AUDIO_SUBFOLDER = "audio"
NOISE_SUBFOLDER = "noise"

DATASET_AUDIO_PATH = os.path.join(DATASET_ROOT, AUDIO_SUBFOLDER)
DATASET_NOISE_PATH = os.path.join(DATASET_ROOT, NOISE_SUBFOLDER)

In [22]:
VALID_SPLIT = 0.1

SHUFFLE_SEED = 43

SAMPLING_RATE = 16000

SCALE = 0.5

BATCH_SIZE = 128

EPOCHS = 100

Pre-processing DataSet

In [28]:
import os
import shutil
import zipfile

# Define paths
zip_path = "/content/archive.zip"  # Update if needed
extract_path = "/content/extracted_data"
audio_folder = "/content/audio"
noise_folder = "/content/noise"

# Create required folders if they don't exist
os.makedirs(audio_folder, exist_ok=True)
os.makedirs(noise_folder, exist_ok=True)

# Unzip the dataset
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

# Define the dataset path
dataset_path = os.path.join(extract_path, "16000_pcm_speeches")

if not os.path.exists(dataset_path):
    print("Dataset extraction failed! Check the ZIP structure.")
else:
    print("Dataset extracted successfully.")

# Function to generate a unique filename
def get_unique_filename(directory, filename):
    base, ext = os.path.splitext(filename)
    counter = 1
    new_filename = filename

    while os.path.exists(os.path.join(directory, new_filename)):
        new_filename = f"{base}_{counter}{ext}"
        counter += 1

    return new_filename

# Move files to respective folders
for root, dirs, files in os.walk(dataset_path):
    for dir_name in dirs:
        dir_path = os.path.join(root, dir_name)

        # Move noise files
        if "_background_noise_" in dir_name.lower():
            for file in os.listdir(dir_path):
                if file.endswith(".wav"):
                    dest_file = get_unique_filename(noise_folder, file)
                    shutil.move(os.path.join(dir_path, file), os.path.join(noise_folder, dest_file))

        # Move audio files
        else:
            for file in os.listdir(dir_path):
                if file.endswith(".wav"):
                    dest_file = get_unique_filename(audio_folder, file)
                    shutil.move(os.path.join(dir_path, file), os.path.join(audio_folder, dest_file))

print("All files sorted successfully!")
print(f"Audio files are in: {audio_folder}")
print(f"Noise files are in: {noise_folder}")


Dataset extracted successfully.
All files sorted successfully!
Audio files are in: /content/audio
Noise files are in: /content/noise


Noise

In [31]:
!find /content/noise -type f -name "*.wav"


/content/noise/10convert.com_Audience-Claps_daSG5fwdA7o.wav
/content/noise/dude_miaowing.wav
/content/noise/doing_the_dishes.wav
/content/noise/running_tap.wav


In [32]:
from pathlib import Path

# Define the noise dataset path
DATASET_NOISE_PATH = "/content/noise"  # Ensure this is correct

# Find all .wav files recursively
noise_paths = [str(filepath) for filepath in Path(DATASET_NOISE_PATH).rglob("*.wav")]

print("Found {} noise files in '{}'".format(len(noise_paths), DATASET_NOISE_PATH))

# Check if files are actually found
if len(noise_paths) > 0:
    print("Sample file:", noise_paths[0])  # Print the first file found


Found 4 noise files in '/content/noise'
Sample file: /content/noise/10convert.com_Audience-Claps_daSG5fwdA7o.wav


In [34]:
!pip install pydub ffmpeg


Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting ffmpeg
  Downloading ffmpeg-1.4.tar.gz (5.1 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Building wheels for collected packages: ffmpeg
  Building wheel for ffmpeg (setup.py) ... [?25l[?25hdone
  Created wheel for ffmpeg: filename=ffmpeg-1.4-py3-none-any.whl size=6082 sha256=11bb2ce19c579f447b5f2c1c315debbca935ee340ceb934dabaa0461c46ef77d
  Stored in directory: /root/.cache/pip/wheels/56/30/c5/576bdd729f3bc062d62a551be7fefd6ed2f761901568171e4e
Successfully built ffmpeg
Installing collected packages: pydub, ffmpeg
Successfully installed ffmpeg-1.4 pydub-0.25.1


In [35]:
!pip install pydub ffmpeg tensorflow




In [36]:
import os
import tensorflow as tf
from pydub import AudioSegment

# Define dataset paths
DATASET_NOISE_PATH = "/content/noise"
SAMPLING_RATE = 16000  # Target sample rate

### 🛠️ STEP 1: Convert all noise files to 16kHz ###
def convert_to_16kHz(directory):
    for subdir, _, files in os.walk(directory):
        for file in files:
            if file.endswith(".wav"):
                file_path = os.path.join(subdir, file)

                # Load and check sample rate
                audio = AudioSegment.from_wav(file_path)
                if audio.frame_rate != SAMPLING_RATE:
                    print(f"Converting {file} from {audio.frame_rate} Hz to {SAMPLING_RATE} Hz")
                    audio = audio.set_frame_rate(SAMPLING_RATE)
                    audio.export(file_path, format="wav")

# Convert noise files
convert_to_16kHz(DATASET_NOISE_PATH)

### 🛠️ STEP 2: Load noise samples ###
def load_noise_sample(path):
    audio_binary = tf.io.read_file(path)
    sample, sampling_rate = tf.audio.decode_wav(audio_binary, desired_channels=1)

    # Ensure sample rate is correct
    if sampling_rate.numpy() == SAMPLING_RATE:
        slices = sample.shape[0] // SAMPLING_RATE  # Number of 1-second slices
        return tf.split(sample[: slices * SAMPLING_RATE], slices)
    else:
        print(f"Skipping {path} due to incorrect sampling rate: {sampling_rate.numpy()} Hz")
        return None

# Get list of all noise files
noise_paths = []
for subdir, _, files in os.walk(DATASET_NOISE_PATH):
    for file in files:
        if file.endswith(".wav"):
            noise_paths.append(os.path.join(subdir, file))

# Load noise samples
noises = []
for path in noise_paths:
    samples = load_noise_sample(path)
    if samples:
        noises.extend(samples)

# Ensure noises is not empty before stacking
if not noises:
    raise ValueError("No valid noise samples found! Check file paths and conversions.")

# Stack all noise samples
noises = tf.stack(noises)

### 🛠️ STEP 3: Print Summary ###
print(
    "{} noise files were split into {} noise samples where each is {} sec. long".format(
        len(noise_paths), noises.shape[0], noises.shape[1] // SAMPLING_RATE
    )
)


Converting 10convert.com_Audience-Claps_daSG5fwdA7o.wav from 44100 Hz to 16000 Hz
Converting dude_miaowing.wav from 22050 Hz to 16000 Hz
Converting doing_the_dishes.wav from 22050 Hz to 16000 Hz
Converting running_tap.wav from 22050 Hz to 16000 Hz
4 noise files were split into 233 noise samples where each is 1 sec. long


Data-set Gen

In [39]:
# Get the list of audio file paths along with their corresponding labels

audio_paths = []
labels = []

# Iterate over all files in the directory
for file in os.listdir(DATASET_AUDIO_PATH):
    file_path = os.path.join(DATASET_AUDIO_PATH, file)

    # Ensure it is a .wav file before processing
    if os.path.isfile(file_path) and file.endswith(".wav"):
        audio_paths.append(file_path)
        labels.append(0)  # Assign a dummy label (Modify this if you have actual labels)

print("Found {} audio files.".format(len(audio_paths)))

# Shuffle dataset
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(audio_paths)
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(labels)

# Split into training and validation
num_val_samples = int(VALID_SPLIT * len(audio_paths))
train_audio_paths = audio_paths[:-num_val_samples]
train_labels = labels[:-num_val_samples]

valid_audio_paths = audio_paths[-num_val_samples:]
valid_labels = labels[-num_val_samples:]

print("Using {} files for training.".format(len(train_audio_paths)))
print("Using {} files for validation.".format(len(valid_audio_paths)))

# Create TensorFlow datasets
train_ds = paths_and_labels_to_dataset(train_audio_paths, train_labels)
train_ds = train_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(BATCH_SIZE)

valid_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)
valid_ds = valid_ds.shuffle(buffer_size=32 * 8, seed=SHUFFLE_SEED).batch(32)

# Add noise to the training set
train_ds = train_ds.map(
    lambda x, y: (add_noise(x, noises, scale=SCALE), y),
    num_parallel_calls=tf.data.experimental.AUTOTUNE,
)

# Convert to frequency domain
train_ds = train_ds.map(
    lambda x, y: (audio_to_fft(x), y), num_parallel_calls=tf.data.experimental.AUTOTUNE
)
train_ds = train_ds.prefetch(tf.data.experimental.AUTOTUNE)

valid_ds = valid_ds.map(
    lambda x, y: (audio_to_fft(x), y), num_parallel_calls=tf.data.experimental.AUTOTUNE
)
valid_ds = valid_ds.prefetch(tf.data.experimental.AUTOTUNE)


Found 7503 audio files.
Using 6753 files for training.
Using 750 files for validation.


Model

In [41]:
import tensorflow as tf
from tensorflow import keras

def residual_block(x, filters, conv_num=3, activation="relu"):
    s = keras.layers.Conv1D(filters, 1, padding="same")(x)  # Shortcut connection

    for i in range(conv_num - 1):
        x = keras.layers.Conv1D(filters, 3, padding="same")(x)  # Fixed capitalization
        x = keras.layers.Activation(activation)(x)

    x = keras.layers.Conv1D(filters, 3, padding="same")(x)
    x = keras.layers.Add()([x, s])  # Residual connection
    x = keras.layers.Activation(activation)(x)  # Fixed typo

    return keras.layers.MaxPool1D(pool_size=2, strides=2)(x)

def build_model(input_shape, num_classes):
    inputs = keras.layers.Input(shape=input_shape, name="input")

    x = residual_block(inputs, 16, 2)
    x = residual_block(x, 32, 2)  # Fixed: Pass `x` instead of `inputs`
    x = residual_block(x, 64, 3)
    x = residual_block(x, 128, 3)
    x = residual_block(x, 128, 3)

    x = keras.layers.AveragePooling1D(pool_size=3, strides=3)(x)
    x = keras.layers.Flatten()(x)
    x = keras.layers.Dense(256, activation="relu")(x)
    x = keras.layers.Dense(128, activation="relu")(x)

    outputs = keras.layers.Dense(num_classes, activation="softmax", name="output")(x)

    return keras.models.Model(inputs=inputs, outputs=outputs)

# Define model
SAMPLING_RATE = 16000  # Adjust if needed
class_names = ["class1", "class2", "class3"]  # Replace with actual class names

model = build_model((SAMPLING_RATE // 2, 1), len(class_names))

model.summary()

# Compile model
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])

# Callbacks
model_save_filename = "model.h5"
earlystopping_cb = keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)
mdlcheckpoint_cb = keras.callbacks.ModelCheckpoint(model_save_filename, monitor="val_accuracy", save_best_only=True)


Training

In [None]:
history = model.fit(
    train_ds,
    epochs=EPOCHS,
    validation_data=valid_ds,
    callbacks=[earlystopping_cb, mdlcheckpoint_cb],
)

Epoch 1/100
[1m45/53[0m [32m━━━━━━━━━━━━━━━━[0m[37m━━━━[0m [1m1:03[0m 8s/step - accuracy: 0.9878 - loss: 0.0239

Saving the models

In [None]:
# saving the standard h5 model
model.save('model.h5')

# saving if the user want to use for edge devices using tflite
tf.saved_model.save(model, "model_keras_tflite")
# zipping the folder
!zip -r model_keras_tflite.zip model_keras_tflite/
# removing the folder
!rm -rf model_keras_tflite/

Accuracy

In [None]:
print(model.evaluate(valid_ds))

Predict

In [None]:
SAMPLES_TO_DISPLAY = 10

test_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)
test_ds = test_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(
    BATCH_SIZE
)

test_ds = test_ds.map(lambda x, y: (add_noise(x, noises, scale=SCALE), y))

for audios, labels in test_ds.take(1):
    # Get the signal FFT
    ffts = audio_to_fft(audios)
    # Predict
    y_pred = model.predict(ffts)
    # Take random samples
    rnd = np.random.randint(0, BATCH_SIZE, SAMPLES_TO_DISPLAY)
    audios = audios.numpy()[rnd, :, :]
    labels = labels.numpy()[rnd]
    y_pred = np.argmax(y_pred, axis=-1)[rnd]

    for index in range(SAMPLES_TO_DISPLAY):
        # For every sample, print the true and predicted label
        # as well as run the voice with the noise
        print(
            "Speaker:\33{} {}\33[0m\tPredicted:\33{} {}\33[0m".format(
                "[92m" if labels[index] == y_pred[index] else "[91m",
                class_names[labels[index]],
                "[92m" if labels[index] == y_pred[index] else "[91m",
                class_names[y_pred[index]],
            )
        )
        if labels[index] ==y_pred[index]:
            print("Welcome")
        else:
            print("Sorry")
        print("The speaker is" if labels[index] == y_pred[index] else "", class_names[y_pred[index]])

In [None]:
#Predcit the speaker from the test dataset for real time pred.

In [None]:
def paths_to_dataset(audio_paths):
	"""Constructs a dataset of audios and labels."""
	path_ds = tf.data.Dataset.from_tensor_slices(audio_paths)
	# audio_ds = path_ds.map(lambda x: path_to_audio(x))
	return tf.data.Dataset.zip((path_ds))

def predict(path, labels):
	test = paths_and_labels_to_dataset(path, labels)


	test = test.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(
	BATCH_SIZE
	)
	test = test.prefetch(tf.data.experimental.AUTOTUNE)


	test = test.map(lambda x, y: (add_noise(x, noises, scale=SCALE), y))

	for audios, labels in test.take(1):
		# Get the signal FFT
		ffts = audio_to_fft(audios)
		# Predict
		y_pred = model.predict(ffts)
		# Take random samples
		rnd = np.random.randint(0, 1, 1)
		audios = audios.numpy()[rnd, :]
		labels = labels.numpy()[rnd]
		y_pred = np.argmax(y_pred, axis=-1)[rnd]

		for index in range(1):
			# For every sample, print the true and predicted label
			# as well as run the voice with the noise
			print(
				"Speaker:\33{} {}\33[0m\tPredicted:\33{} {}\33[0m".format(
					"[92m",y_pred[index],
					"[92m", y_pred[index]
				)
			)
			if class_names[y_pred[index]] == "Julia_Gillard":
				print("Welcome")
			else:
				print("Sorry")
			print(class_names[y_pred[index]])
			# display(Audio(audios[index, :, :].squeeze(), rate=SAMPLING_RATE))

# predict("content/1000.wav")

path = ["/content/0.wav"]
labels = ["unknown"]

# path_ds = tf.data.Dataset.from_tensor_slices(audio_paths)
# audio_ds = path_ds.map(lambda x: path_to_audio(x))
# label_ds = tf.data.Dataset.from_tensor_slices(labels)
# return tf.data.Dataset.zip((audio_ds, label_ds))

try:
    predict(path, labels)
except:
    print("Error! Check if the file correctly passed or not!")