Loading Vox celeb Dataset in the notebook directly from hugging face

In [None]:
%pip install datasets

In [None]:
from datasets import load_dataset

dataset = load_dataset("101arrowz/vox_celeb",'audio',trust_remote_code=True)
for sample in dataset:
    print(sample)
    break  # Process sample-by-sample without full download


In [None]:
import torchaudio
from torchaudio.datasets import VoxCeleb1

dataset = VoxCeleb1(root=, download=False)


In [None]:
import os

def read_voxceleb_trials(trial_file):
    """Reads the VoxCeleb1 trial file."""
    pairs = []
    with open(trial_file, 'r') as f:
        for line in f:
            label, path1, path2 = line.strip().split()
            pairs.append((path1, path2, int(label)))
    return pairs

trial_file = "/path/to/voxceleb1_trials.txt"
trials = read_voxceleb_trials(trial_file)
print(f"Loaded {len(trials)} test pairs.")


In [None]:
import numpy as np
from sklearn.metrics import roc_curve

# Example: Scores from cosine similarity (replace with actual values)
genuine_scores = np.array([0.85, 0.92, 0.88, 0.91])  # Same speaker
impostor_scores = np.array([0.30, 0.45, 0.28, 0.40])  # Different speakers

# Assign labels
genuine_labels = np.ones_like(genuine_scores)  # 1 for same speaker
impostor_labels = np.zeros_like(impostor_scores)  # 0 for different speaker

# Combine scores and labels
scores = np.concatenate([genuine_scores, impostor_scores])
labels = np.concatenate([genuine_labels, impostor_labels])

# Compute ROC curve
fpr, tpr, thresholds = roc_curve(labels, scores, pos_label=1)

# Compute EER where FAR ≈ FRR
fnr = 1 - tpr  # False Negative Rate
eer_threshold = thresholds[np.nanargmin(np.abs(fpr - fnr))]
eer = fpr[np.nanargmin(np.abs(fpr - fnr))]

print(f"Equal Error Rate (EER): {eer:.4f}")
print(f"EER Threshold: {eer_threshold:.4f}")


In [None]:
import os

voxceleb_path = "./wav"  # Change this to your actual dataset path
print(f"Dataset path: {voxceleb_path}")
print(f"Path exists: {os.path.exists(voxceleb_path)}")


In [None]:
import os
import glob
import torch
import torchaudio
import numpy as np
from nemo.collections.asr.models import EncDecSpeakerLabelModel
from sklearn.metrics import roc_curve

# Step 1: Load the Titanet Model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EncDecSpeakerLabelModel.from_pretrained("titanet_large").to(device)

# Step 2: Load VoxCeleb Dataset
voxceleb_path = "./wav"  # Change this to your actual dataset path
wav_files = glob.glob(os.path.join(voxceleb_path, "**", "*.wav"), recursive=True)

# Step 3: Function to Extract Speaker Embeddings
def extract_embedding(file_path):
    emb = model.get_embedding(file_path).detach().cpu().numpy()
    return emb.squeeze()


# Step 4: Generate Speaker Verification Pairs
num_samples = min(1000, len(wav_files))  # Use a subset if dataset is large
cos_sim = []
labels = []

for i in range(num_samples):
    emb1 = extract_embedding(wav_files[i])
    
    
    # Choose a second file randomly
    j = np.random.randint(0, len(wav_files))
    emb2 = extract_embedding(wav_files[j])
    
    
    # Compute cosine similarity
    similarity = np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2))
    
    # Label: 1 if same speaker, 0 otherwise (Assumption: Folder structure contains speaker IDs)
    label = 1 if os.path.dirname(wav_files[i]) == os.path.dirname(wav_files[j]) else 0
    
    cos_sim.append(similarity)
    labels.append(label)


# Step 5: Compute EER
fpr, tpr, thresholds = roc_curve(labels,cos_sim,pos_label=1)
fnr = 1 - tpr
eer_threshold = thresholds[np.nanargmin(np.abs(fpr - fnr))]
eer = fpr[np.nanargmin(np.abs(fpr - fnr))]
print(f"Equal Error Rate (EER): {eer:.4f}")




In [None]:
import os
import glob
import torch
import torchaudio
import numpy as np
from nemo.collections.asr.models import EncDecSpeakerLabelModel
from sklearn.metrics import roc_curve

# Step 1: Load the Titanet Model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EncDecSpeakerLabelModel.from_pretrained("ecapa_tdnn").to(device)

# Step 2: Load VoxCeleb Dataset
voxceleb_path = "./wav"  # Change this to your actual dataset path
wav_files = glob.glob(os.path.join(voxceleb_path, "**", "*.wav"), recursive=True)

# Step 3: Function to Extract Speaker Embeddings
def extract_embedding(file_path):
    emb = model.get_embedding(file_path).detach().cpu().numpy()
    return emb.squeeze()


# Step 4: Generate Speaker Verification Pairs
num_samples = min(1000, len(wav_files))  # Use a subset if dataset is large
cos_sim = []
labels = []

for i in range(num_samples):
    emb1 = extract_embedding(wav_files[i])
    
    
    # Choose a second file randomly
    j = np.random.randint(0, len(wav_files))
    emb2 = extract_embedding(wav_files[j])
    
    
    # Compute cosine similarity
    similarity = np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2))
    
    # Label: 1 if same speaker, 0 otherwise (Assumption: Folder structure contains speaker IDs)
    label = 1 if os.path.dirname(wav_files[i]) == os.path.dirname(wav_files[j]) else 0
    
    cos_sim.append(similarity)
    labels.append(label)


# Step 5: Compute EER
fpr, tpr, thresholds = roc_curve(labels,cos_sim,pos_label=1)
fnr = 1 - tpr
eer_threshold = thresholds[np.nanargmin(np.abs(fpr - fnr))]
eer = fpr[np.nanargmin(np.abs(fpr - fnr))]
print(f"Equal Error Rate (EER): {eer:.4f}")




In [None]:
import os
import glob
import itertools
import torch
import numpy as np
from scipy.spatial.distance import cosine
from nemo.collections.asr.models import EncDecSpeakerLabelModel


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EncDecSpeakerLabelModel.from_pretrained("ecapa_tdnn").to(device)
model.eval()

voxceleb_path = "./wav"  # Set your dataset path
wav_files = glob.glob(os.path.join(voxceleb_path, "**", "*.wav"), recursive=True)

# Dictionary to store speaker-wise files
speaker_dict = {}

for file in wav_files:
    parts = file.split(os.sep)  # Splitting path using OS-specific separator
    speaker_id = parts[-3]  # Extracting speaker ID (Assumes structure: speaker_id/video_id/audio.wav)

    if speaker_id not in speaker_dict:
        speaker_dict[speaker_id] = []
    
    speaker_dict[speaker_id].append(file)

# Placeholder function for extracting embeddings (Replace with real model)
def extract_embedding(file_path):
    emb = model.get_embedding(file_path).detach().cpu().numpy()
    # print("Embedding shape before squeeze:", emb.shape)
    # emb = emb.squeeze()
    # print("Embedding shape after squeeze:", emb.shape)
    return emb.squeeze()


# List to store cosine similarity results
cosine_results = []

# 1️⃣ **Compute cosine similarity for intra-speaker pairs (label 1)**
for speaker, files in speaker_dict.items():
    for f1, f2 in itertools.combinations(files, 2):
        emb1 = extract_embedding(f1)
        emb2 = extract_embedding(f2)
        similarity = 1 - cosine(emb1, emb2)  # Cosine similarity calculation
        cosine_results.append((1, f1, f2, similarity))

# 2️⃣ **Compute cosine similarity for inter-speaker pairs (label 0)**
speakers = list(speaker_dict.keys())

for i in range(len(speakers)):
    for j in range(i + 1, len(speakers)):  # Ensure unique speaker pairs
        spk1_files = speaker_dict[speakers[i]]
        spk2_files = speaker_dict[speakers[j]]
        
        for f1, f2 in itertools.product(spk1_files, spk2_files):
            emb1 = extract_embedding(f1)
            emb2 = extract_embedding(f2)
            similarity = 1 - cosine(emb1, emb2)  # Cosine similarity calculation
            cosine_results.append((0, f1, f2, similarity))

# Print a sample of the cosine similarity results to verify format
for result in cosine_results[:10]:
    print(result[0], result[1], result[2], result[3])


In [None]:
import os
import pickle
import numpy as np
import torch
from nemo.collections.asr.models import EncDecSpeakerLabelModel
from omegaconf import OmegaConf
from tqdm import tqdm

def load_titanet():
    """Load the Titanet speaker embedding model."""
    model = EncDecSpeakerLabelModel.from_pretrained(model_name="titanet_large")
    model.eval()
    return model

def extract_embedding(model, audio_path):
    """Extract speaker embedding from an audio file using the correct API."""
    with torch.no_grad():
        embedding = model.get_embedding(audio_path).cpu().numpy().flatten()
    return embedding

def process_voxceleb(voxceleb_dir, model, output_pickle, output_txt):
    """Process all audio files in the VoxCeleb directory and store embeddings."""
    embeddings = {}
    
    with open(output_txt, "w") as txt_file:
        for root, _, files in os.walk(voxceleb_dir):
            for file in tqdm(files):
                if file.endswith(".wav"):
                    file_path = os.path.join(root, file)
                    speaker_id = os.path.basename(root)  # Assuming structure: VoxCeleb/speaker_id/audio.wav
                    key = f"{speaker_id}@{file}"
                    embedding = extract_embedding(model, file_path)
                    embeddings[key] = embedding
                    
                    # Save in text format
                    txt_file.write(f"{key}: {embedding.tolist()}\n")
    
    with open(output_pickle, "wb") as f:
        pickle.dump(embeddings, f)
    print(f"Embeddings saved to {output_pickle} and {output_txt}")

if __name__ == "__main__":
    voxceleb_dir = "./wav"  # Change this to your local VoxCeleb directory
    output_pickle = "speaker_embeddings.pkl"
    output_txt = "speaker_embeddings.txt"
    
    model = load_titanet()
    process_voxceleb(voxceleb_dir, model, output_pickle, output_txt)


In [None]:
import os
import pickle
import numpy as np
import torch
from nemo.collections.asr.models import EncDecSpeakerLabelModel
from omegaconf import OmegaConf
from tqdm import tqdm

def load_titanet():
    """Load the Titanet speaker embedding model."""
    model = EncDecSpeakerLabelModel.restore_from("titanet-mera.nemo")
    model.eval()
    return model

def extract_embedding(model, audio_path):
    """Extract speaker embedding from an audio file using the correct API."""
    with torch.no_grad():
        embedding = model.get_embedding(audio_path).cpu().numpy().flatten()
    return embedding

def process_voxceleb(voxceleb_dir, model, output_pickle, output_txt):
    """Process all audio files in the VoxCeleb directory and store embeddings."""
    embeddings = {}
    
    with open(output_txt, "w") as txt_file:
        for root, dirs, files in os.walk(voxceleb_dir):
            if files:  # Only process if there are audio files
                speaker_id = os.path.basename(os.path.dirname(root))  # Get the speaker ID from the parent directory
                for file in tqdm(files):
                    if file.endswith(".wav"):
                        file_path = os.path.join(root, file)
                        key = f"{speaker_id}@{file}"
                        embedding = extract_embedding(model, file_path)
                        embeddings[key] = embedding
                        
                        # Save in text format
                        txt_file.write(f"{key}: {embedding.tolist()}\n")
    
    with open(output_pickle, "wb") as f:
        pickle.dump(embeddings, f)
    print(f"Embeddings saved to {output_pickle} and {output_txt}")

if __name__ == "__main__":
    voxceleb_dir = "./wav"  # Change this to your local VoxCeleb directory
    output_pickle = "speaker_embeddings-mera-vox.pkl"
    output_txt = "speaker_embeddings-mera-vox.txt"
    
    model = load_titanet()
    process_voxceleb(voxceleb_dir, model, output_pickle, output_txt)


In [None]:
import os
import re

def verify_files(text_file, root_directory):
    with open(text_file, 'r') as f:
        missing_files = []
        for line in f:
            line = line.strip()
            match = re.match(r"(\w+)@([\w.]+)", line)
            if match:
                folder, filename = match.groups()
                folder_path = os.path.join(root_directory, folder)
                file_path = os.path.join(folder_path, filename)
                if not os.path.exists(file_path):
                    missing_files.append(file_path)
    
    if missing_files:
        print("The following files are missing:")
        for file in missing_files:
            print(file)
    else:
        print("All files are present.")

# Example usage
text_file = "./speaker_embeddings.txt"  # Text file containing the filenames
root_directory = "./wav"  # Update this to your actual root folder
verify_files(text_file, root_directory)

In [None]:
import argparse
import os
import pickle as pkl
import sys

import numpy as np
from scipy.interpolate import interp1d
from scipy.optimize import brentq
from sklearn.metrics import roc_curve
from tqdm import tqdm


"""
This script faciliates to get EER % based on cosine-smilarity 
for Voxceleb dataset.

Args:
    trial_file str: path to voxceleb trial file
    emb : path to pickle file of embeddings dictionary (generated from spkr_get_emb.py)
    save_kaldi_emb: if required pass this argument to save kaldi embeddings for KALDI PLDA training later
    Note: order of audio files in manifest file should match the embeddings
"""


def get_acc(trial_file='', emb='', save_kaldi_emb=False):

    trial_score = open('trial_score.txt', 'w')
    dirname = os.path.dirname(trial_file)
    with open(emb, 'rb') as f:
        emb = pkl.load(f)
    trial_embs = []
    keys = []
    all_scores = []
    all_keys = []

    # for each trials in trial file
    with open(trial_file, 'r') as f:
        tmp_file = f.readlines()
        for line in tqdm(tmp_file):
            line = line.strip()
            truth, x_speaker, y_speaker = line.split()

            x_speaker = x_speaker.split('/')
            x_speaker = '@'.join(x_speaker)

            y_speaker = y_speaker.split('/')
            y_speaker = '@'.join(y_speaker)

            X = emb[x_speaker]
            Y = emb[y_speaker]

            if save_kaldi_emb and x_speaker not in keys:
                keys.append(x_speaker)
                trial_embs.extend([X])

            if save_kaldi_emb and y_speaker not in keys:
                keys.append(y_speaker)
                trial_embs.extend([Y])

            score = np.dot(X, Y) / ((np.dot(X, X) * np.dot(Y, Y)) ** 0.5)
            score = (score + 1) / 2

            all_scores.append(score)
            trial_score.write(str(score) + "\t" + truth)
            truth = int(truth)
            all_keys.append(truth)

            trial_score.write('\n')
    trial_score.close()

    if save_kaldi_emb:
        np.save(dirname + '/all_embs_voxceleb.npy', np.asarray(trial_embs))
        np.save(dirname + '/all_ids_voxceleb.npy', np.asarray(keys))
        print("Saved KALDI PLDA related embeddings to {}".format(dirname))

    return np.asarray(all_scores), np.asarray(all_keys)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--trial_file", help="path to voxceleb trial file", type=str, required=True)
    parser.add_argument("--emb", help="path to numpy file of embeddings", type=str, required=True)
    parser.add_argument(
        "--save_kaldi_emb",
        help=":save kaldi embeddings for KALDI PLDA training later",
        required=False,
        action='store_true',
    )

    args = parser.parse_args()
    trial_file, emb, save_kaldi_emb = args.trial_file, args.emb, args.save_kaldi_emb

    y_score, y = get_acc(trial_file=trial_file, emb=emb, save_kaldi_emb=save_kaldi_emb)
    fpr, tpr, thresholds = roc_curve(y, y_score, pos_label=1)

    eer = brentq(lambda x: 1.0 - x - interp1d(fpr, tpr)(x), 0.0, 1.0)
    sys.stdout.write("{0:.2f}\n".format(eer * 100))

In [None]:
import re

def remove_middle_folder(input_file, output_file):
    with open(input_file, 'r') as infile, open(output_file, 'w') as outfile:
        for line in infile:
            parts = line.strip().split()
            processed_parts = [parts[0]] + [re.sub(r'/[^/]+/', '/', path) for path in parts[1:]]
            outfile.write(" ".join(processed_parts) + "\n")

# Example usage:
input_file = "./vox-test.txt"  # Replace with your input filename
output_file = "./vox_test2.txt"  # Replace with your output filename
remove_middle_folder(input_file, output_file)
