In [13]:
#Upload kaggle.json for downloading dataset

from google.colab import files
files.upload()

In [2]:
import os
os.makedirs("/root/.kaggle", exist_ok=True)
os.rename("kaggle.json", "/root/.kaggle/kaggle.json")


In [3]:
!kaggle datasets download -d vjcalling/speaker-recognition-audio-dataset


Dataset URL: https://www.kaggle.com/datasets/vjcalling/speaker-recognition-audio-dataset
License(s): unknown
Downloading speaker-recognition-audio-dataset.zip to /content
100% 3.64G/3.64G [00:43<00:00, 157MB/s]
100% 3.64G/3.64G [00:43<00:00, 90.0MB/s]


In [6]:
!unzip speaker-recognition-audio-dataset.zip -d /content/


Archive:  speaker-recognition-audio-dataset.zip
  inflating: /content/50_speakers_audio_data/Speaker0026/Speaker26_000.wav  
  inflating: /content/50_speakers_audio_data/Speaker0026/Speaker26_001.wav  
  inflating: /content/50_speakers_audio_data/Speaker0026/Speaker26_002.wav  
  inflating: /content/50_speakers_audio_data/Speaker0026/Speaker26_003.wav  
  inflating: /content/50_speakers_audio_data/Speaker0026/Speaker26_004.wav  
  inflating: /content/50_speakers_audio_data/Speaker0026/Speaker26_005.wav  
  inflating: /content/50_speakers_audio_data/Speaker0026/Speaker26_006.wav  
  inflating: /content/50_speakers_audio_data/Speaker0026/Speaker26_007.wav  
  inflating: /content/50_speakers_audio_data/Speaker0026/Speaker26_008.wav  
  inflating: /content/50_speakers_audio_data/Speaker0026/Speaker26_009.wav  
  inflating: /content/50_speakers_audio_data/Speaker0026/Speaker26_010.wav  
  inflating: /content/50_speakers_audio_data/Speaker0026/Speaker26_011.wav  
  inflating: /content/50_spe

In [7]:
import os
import torchaudio
import h5py
import numpy as np
from tqdm import tqdm
import torchaudio.transforms as T

# Function to extract MFCC features (without augmentation)
def extract_mfcc(file_path, n_mfcc=13):
    try:
        waveform, sample_rate = torchaudio.load(file_path)

        # Extract MFCC from the waveform
        transform = T.MFCC(
            sample_rate=sample_rate,
            n_mfcc=n_mfcc,
            melkwargs={"n_fft": 400, "hop_length": 160, "n_mels": 23, "center": False}
        )
        mfcc = transform(waveform)

        mfcc_mean = mfcc.mean(dim=-1).detach().numpy()  # Use .detach() to remove gradient tracking before calling .numpy()

        return mfcc_mean
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None

# Function to process a single speaker folder and save its MFCCs
def process_speaker_folder(speaker_folder, cache_dir="mfcc_cache_v1", n_samples_per_speaker=30, n_mfcc=13):
    speaker_name = os.path.basename(speaker_folder)
    speaker_cache_dir = os.path.join(cache_dir, speaker_name)
    if not os.path.exists(speaker_cache_dir):
        os.makedirs(speaker_cache_dir)

    audio_files = os.listdir(speaker_folder)[:n_samples_per_speaker]
    mfccs = []

    # Process each audio file
    for audio_file in tqdm(audio_files, desc=f"Processing {speaker_name}"):
        file_path = os.path.join(speaker_folder, audio_file)
        mfcc = extract_mfcc(file_path, n_mfcc=n_mfcc)

        if mfcc is not None:
            mfccs.append(mfcc)

            hdf5_filename = f"{audio_file.split('.')[0]}.h5"
            with h5py.File(os.path.join(speaker_cache_dir, hdf5_filename), "w") as f:
                f.create_dataset("mfcc", data=mfcc)
        else:
            print(f"Skipping {audio_file} due to error.")

    return speaker_name, mfccs

# Keeping only 30 samples as of now, for fast and equal processing
def process_all_speakers(data_path, cache_dir="mfcc_cache_v1", n_samples_per_speaker=30, n_mfcc=13):
    speakers = os.listdir(data_path)
    speaker_folders = [os.path.join(data_path, speaker) for speaker in speakers]

    results = []
    for speaker_folder in tqdm(speaker_folders, desc="Processing speakers"):
        try:
            result = process_speaker_folder(speaker_folder, cache_dir, n_samples_per_speaker, n_mfcc)
            results.append(result)
        except Exception as e:
            print(f"Error processing folder {speaker_folder}: {e}")

    return results

# Main function to process and cache MFCC features (without augmentation)
def main(data_path, cache_dir="mfcc_cache_v1", n_samples_per_speaker=30, n_mfcc=13):
    results = process_all_speakers(data_path, cache_dir, n_samples_per_speaker, n_mfcc)
    print(f"Processed {len(results)} speakers.")

data_path = "50_speakers_audio_data"
cache_dir = "mfcc_cache_v1"
main(data_path, cache_dir, n_samples_per_speaker=30, n_mfcc=13)


Processing speakers:   0%|          | 0/50 [00:00<?, ?it/s]
Processing Speaker_0015:   0%|          | 0/30 [00:00<?, ?it/s][A
Processing Speaker_0015:   3%|▎         | 1/30 [00:00<00:09,  2.92it/s][A
Processing Speaker_0015:  10%|█         | 3/30 [00:00<00:03,  7.46it/s][A
Processing Speaker_0015:  17%|█▋        | 5/30 [00:00<00:02, 10.25it/s][A
Processing Speaker_0015:  23%|██▎       | 7/30 [00:00<00:01, 12.08it/s][A
Processing Speaker_0015:  30%|███       | 9/30 [00:00<00:01, 12.75it/s][A
Processing Speaker_0015:  40%|████      | 12/30 [00:01<00:01, 15.39it/s][A
Processing Speaker_0015:  47%|████▋     | 14/30 [00:01<00:01, 15.65it/s][A
Processing Speaker_0015:  53%|█████▎    | 16/30 [00:01<00:00, 15.83it/s][A
Processing Speaker_0015:  60%|██████    | 18/30 [00:01<00:00, 15.65it/s][A
Processing Speaker_0015:  67%|██████▋   | 20/30 [00:01<00:00, 15.84it/s][A
Processing Speaker_0015:  73%|███████▎  | 22/30 [00:01<00:00, 15.96it/s][A
Processing Speaker_0015:  80%|████████  | 

Processed 50 speakers.


In [8]:
import os
import random
import h5py
import numpy as np
import time

base_dir = "mfcc_cache_v1"

speaker_folders = [f for f in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, f))]

def load_mfcc_from_h5(file_path):
    with h5py.File(file_path, 'r') as f:
        return np.array(f['mfcc'])  # Assuming 'mfcc' is the dataset name inside the H5 file

pairs = []
labels = []

start_time = time.time()

target_target_pairs = 0
target_non_target_pairs = 0

# Randomly select 30000 target-target pairs
while target_target_pairs < 30000:
    speaker_folder = random.choice(speaker_folders)
    speaker_path = os.path.join(base_dir, speaker_folder)

    speaker_files = [f for f in os.listdir(speaker_path) if f.endswith('.h5')]

    if len(speaker_files) > 1:
        file1, file2 = random.sample(speaker_files, 2)

        file1_path = os.path.join(speaker_path, file1)
        file2_path = os.path.join(speaker_path, file2)

        mfcc1 = load_mfcc_from_h5(file1_path)
        mfcc2 = load_mfcc_from_h5(file2_path)

        pairs.append((mfcc1, mfcc2))
        labels.append(1)  # 1 indicates a target-target pair

        target_target_pairs += 1

# Randomly select 30000 target-non-target pairs
while target_non_target_pairs < 30000:
    target_speaker_folder = random.choice(speaker_folders)
    target_speaker_path = os.path.join(base_dir, target_speaker_folder)

    non_target_speaker_folder = random.choice([f for f in speaker_folders if f != target_speaker_folder])
    non_target_speaker_path = os.path.join(base_dir, non_target_speaker_folder)

    target_file = random.choice([f for f in os.listdir(target_speaker_path) if f.endswith('.h5')])
    non_target_file = random.choice([f for f in os.listdir(non_target_speaker_path) if f.endswith('.h5')])

    target_file_path = os.path.join(target_speaker_path, target_file)
    non_target_file_path = os.path.join(non_target_speaker_path, non_target_file)

    target_mfcc = load_mfcc_from_h5(target_file_path)
    non_target_mfcc = load_mfcc_from_h5(non_target_file_path)

    pairs.append((target_mfcc, non_target_mfcc))
    labels.append(0)  # 0 indicates a target-non-target pair

    target_non_target_pairs += 1

# End timer and calculate execution time
end_time = time.time()
execution_time = end_time - start_time

# Print the number of pairs and execution time
print(f"Generated {len(pairs)} pairs: {target_target_pairs} target-target pairs and {target_non_target_pairs} target-non-target pairs.")
print(f"Execution time: {execution_time:.2f} seconds.")


# np.save('pairs.npy', pairs)
# np.save('labels.npy', labels)


Generated 60000 pairs: 30000 target-target pairs and 30000 target-non-target pairs.
Execution time: 69.39 seconds.


In [11]:
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
import numpy as np

# Function to create a simple Siamese network model
def create_siamese_model(input_shape):
    # Define the base model for each input
    input_layer = layers.Input(shape=input_shape)
    x = layers.Dense(128, activation='relu')(input_layer)
    x = layers.Dropout(0.2)(x)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dropout(0.2)(x)
    x = layers.Dense(32, activation='relu')(x)

    # Create the shared model for both inputs
    model = models.Model(inputs=input_layer, outputs=x)
    return model

# Custom layer to replace Lambda layer
class L1Distance(layers.Layer):
    def call(self, vectors):
        # Remove extra dimension from output
        return tf.reduce_sum(tf.abs(vectors[0] - vectors[1]), axis=1)

# Function to create the complete model
def create_complete_model(input_shape):
    # Create the shared model
    shared_model = create_siamese_model(input_shape)

    # Create inputs
    input1 = layers.Input(shape=input_shape)
    input2 = layers.Input(shape=input_shape)

    # Get encodings
    encoding1 = shared_model(input1)
    encoding2 = shared_model(input2)

    # Add custom distance layer instead of Lambda
    distance = L1Distance()([encoding1, encoding2])

    # Add final classification layer
    output_layer = layers.Dense(1, activation='sigmoid')(distance)

    # Create the final model
    model = models.Model(inputs=[input1, input2], outputs=output_layer)
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    return model

# Main code
# Prepare pairs and labels (assuming they are loaded in `pairs` and `labels`)
X = np.array(pairs)
X1 = np.array([x[0] for x in X])
X2 = np.array([x[1] for x in X])
y = np.array(labels)

# Reshape labels to match model output
y = y.reshape(-1, 1)

# Split data
X1_train, X1_test, X2_train, X2_test, y_train, y_test = train_test_split(
    X1, X2, y, test_size=0.2, random_state=42
)

# Create and train model
input_shape = X1_train[0].shape
model = create_complete_model(input_shape)

# Train the model
history = model.fit(
    [X1_train, X2_train],
    y_train,
    epochs=10,
    batch_size=32,
    validation_split=0.2
)

# Save the model
model.save('siamese_model.keras')

# Load the model
loaded_model = models.load_model('siamese_model.keras', custom_objects={'L1Distance': L1Distance})

# Evaluate the loaded model
y_pred = loaded_model.predict([X1_test, X2_test])
y_pred_bin = (y_pred > 0.5).astype(int)

# Calculate metrics
accuracy = accuracy_score(y_test.ravel(), y_pred_bin.ravel())
f1 = f1_score(y_test.ravel(), y_pred_bin.ravel())
roc_auc = roc_auc_score(y_test.ravel(), y_pred.ravel())

print(f"Accuracy: {accuracy:.4f}")
print(f"F1-score: {f1:.4f}")
print(f"ROC-AUC: {roc_auc:.4f}")

Epoch 1/10
[1m1200/1200[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 4ms/step - accuracy: 0.6023 - loss: 0.8174 - val_accuracy: 0.8035 - val_loss: 0.4855
Epoch 2/10
[1m1200/1200[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 4ms/step - accuracy: 0.7993 - loss: 0.4492 - val_accuracy: 0.8400 - val_loss: 0.4276
Epoch 3/10
[1m1200/1200[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 5ms/step - accuracy: 0.8305 - loss: 0.3994 - val_accuracy: 0.8649 - val_loss: 0.3879
Epoch 4/10
[1m1200/1200[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 4ms/step - accuracy: 0.8481 - loss: 0.3662 - val_accuracy: 0.8789 - val_loss: 0.3558
Epoch 5/10
[1m1200/1200[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 5ms/step - accuracy: 0.8652 - loss: 0.3427 - val_accuracy: 0.9025 - val_loss: 0.3122
Epoch 6/10
[1m1200/1200[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 4ms/step - accuracy: 0.8761 - loss: 0.3209 - val_accuracy: 0.9243 - val_loss: 0.2697
Epoch 7/10
[1m1

In [13]:
import tensorflow as tf
import numpy as np
from tensorflow.keras import models


class L1Distance(layers.Layer):
    def call(self, vectors):
        # Remove extra dimension from output
        return tf.reduce_sum(tf.abs(vectors[0] - vectors[1]), axis=1)

loaded_model1 = models.load_model("siamese_model.keras", custom_objects={'L1Distance': L1Distance})


# Function to extract MFCC from audio file (you may already have this part)
import torchaudio
import torchaudio.transforms as T

def extract_mfcc(file_path, n_mfcc=13):
    try:
        waveform, sample_rate = torchaudio.load(file_path)

        # Extract MFCC from the waveform
        transform = T.MFCC(
            sample_rate=sample_rate,
            n_mfcc=n_mfcc,
            melkwargs={"n_fft": 400, "hop_length": 160, "n_mels": 23, "center": False}
        )
        mfcc = transform(waveform)

        # Take the mean across time steps (collapse the time dimension)
        mfcc_mean = mfcc.mean(dim=-1).detach().numpy()  # Use .detach() to remove gradient tracking before calling .numpy()

        return mfcc_mean
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None  # Return None if there was an error

# Function to predict whether the pair is target-target or target-non-target
def predict_pair(reference_mfcc, incoming_mfcc, model):

    reference_mfcc = np.expand_dims(reference_mfcc, axis=0)  # Add batch dimension
    incoming_mfcc = np.expand_dims(incoming_mfcc, axis=0)  # Add batch dimension

    # Concatenate the MFCC features (assuming your model is trained to take two inputs)
    input_pair = np.concatenate([reference_mfcc, incoming_mfcc], axis=-1)

    # Predict using the model
    prediction = model.predict([reference_mfcc, incoming_mfcc])


    if prediction[0] > 0.5:
        return "Target-Target Pair", prediction[0]
    else:
        return "Target-Non-Target Pair", prediction[0]

reference_audio_file = "/content/50_speakers_audio_data/Speaker_0001/Speaker_0001_00012.wav"
incoming_audio_file = "/content/50_speakers_audio_data/Speaker_0001/Speaker_0001_00010.wav"

# Extract MFCC features from the reference and incoming audio files
reference_mfcc = extract_mfcc(reference_audio_file)
incoming_mfcc = extract_mfcc(incoming_audio_file)

if reference_mfcc is not None and incoming_mfcc is not None:
    # Predict if the pair is target-target or target-non-target
    result, probability = predict_pair(reference_mfcc, incoming_mfcc, loaded_model)
    print(f"Prediction: {result} (Probability: {probability})")
else:
    print("Error: Could not extract MFCC features for the audio files.")


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 109ms/step
Prediction: Target-Target Pair (Probability: [0.88332])
