In [1]:
import torch.utils.data as data
import torchaudio.compliance.kaldi as kaldi
import torch
import config
import torchaudio
import pandas as pd
import numpy as np
import os
import random



def compute_fbank(
    wavform,
    sample_rate=16000,
    num_mel_bins=80,
    frame_length=25,
    frame_shift=10,
    cmn=True,
):
    feat = kaldi.fbank(
        wavform,
        num_mel_bins=num_mel_bins,
        frame_length=frame_length,
        frame_shift=frame_shift,
        sample_frequency=sample_rate,
    )
    if cmn:
        feat = feat - torch.mean(feat, 0)
    return feat


class CustomNoisyEnrollSet(data.Dataset):
    """
    Dataset for enrollment with a custom directory structure where audio files
    are organized by speaker ID directly without video name subdirectories.

    The directory structure is expected to be:
    root_path/
        speaker_id1/
            audio1.wav
            audio2.wav
            ...
        speaker_id2/
            audio1.wav
            ...

    `__getitem__(idx)` returns `(utter, spk_id)` where:
    `spk_id:str`            - The speaker ID (directory name)
    `utter:torch.Tensor`    - Batched utterance features of shape [batch, time, features]
    """

    def __init__(
        self,
        root_path,
        audio_extension=".wav",
        sys_config=config.SysConfig(),
        exp_config=config.ExpConfig(),
    ) -> None:
        super(CustomNoisyEnrollSet, self).__init__()
        self.path_root_dir = root_path
        self.utter_length = exp_config.test_sample
        self.audio_extension = audio_extension

        # Build speaker-to-audio mapping
        self.samples = []
        for speaker_id in os.listdir(root_path):
            speaker_dir = os.path.join(root_path, speaker_id)
            if os.path.isdir(speaker_dir):
                for audio_file in os.listdir(speaker_dir):
                    if audio_file.endswith(audio_extension):
                        self.samples.append(
                            {
                                "speaker_id": speaker_id,
                                "audio_path": os.path.join(speaker_id, audio_file),
                            }
                        )
        print(f"Found {len(self.samples)} samples in {root_path}")

    def __len__(self):
        return len(self.samples)


    def __getitem__(self, idx):
        sample = self.samples[idx]
        speaker_id = sample["speaker_id"]
        audio_path = sample["audio_path"]

        # Get utterance
        path = os.path.join(self.path_root_dir, audio_path)

        # Load only a portion of the audio for quick debugging
        # Use a smaller segment for faster loading
        utter, sr = torchaudio.load(path, num_frames=self.utter_length)
        utter = torch.squeeze(utter)

        # Apply fbank directly without batching for now
        if utter.dim() == 1:  # If mono
            utter = utter.unsqueeze(0)  # Add channel dimension
        utter = compute_fbank(utter)

        utter = utter.unsqueeze(0)  # Add batch dimension

        return utter, audio_path

In [2]:
# Example usage
custom_dataset = CustomNoisyEnrollSet(

    root_path="../SV-eval/data/noisy/gaussian/vox1_test_segments_snr_10_noisy_gaussian/",  # Path containing speaker id directories

    audio_extension=".wav",  # Change if your files use a different extension

)


# Create a DataLoader

# Create a smaller subset of the dataset for testing


# Create a DataLoader with just a few samples

debug_loader = torch.utils.data.DataLoader(

    custom_dataset, batch_size=1, shuffle=False, num_workers=0  # Use 0 for debugging

)


# show first batch from the debug loader

for batch in debug_loader:

    print(batch[0].shape)  # utterance tensor

    print(batch[1])  # speaker ID

    break

Found 9119 samples in ../SV-eval/data/noisy/gaussian/vox1_test_segments_snr_10_noisy_gaussian/
torch.Size([1, 1, 318, 80])
('id10270\\00001_seg_0.wav',)


In [7]:
# Read the CSV file
import pandas as pd


df = pd.read_csv("label/vox1_test.csv", delim_whitespace=True)

# Display the first few rows to verify the data
print("Shape of dataframe:", df.shape)
print("\nFirst few rows:")
print(df.head())

# Basic statistics about the 'same' column (1 for same speaker, 0 for different)
print("\nDistribution of same/different pairs:")
print(df['label'].value_counts())

Shape of dataframe: (37611, 3)

First few rows:
   label                         audio1                         audio2
0      1  id10270/x6uYqmx31kE/00001.wav  id10270/8jEAjG6SegY/00008.wav
1      0  id10270/x6uYqmx31kE/00001.wav  id10300/ize_eiCFEg0/00003.wav
2      1  id10270/x6uYqmx31kE/00001.wav  id10270/GWXujl-xAVM/00017.wav
3      0  id10270/x6uYqmx31kE/00001.wav  id10273/0OCW1HUxZyg/00001.wav
4      1  id10270/x6uYqmx31kE/00001.wav  id10270/8jEAjG6SegY/00022.wav

Distribution of same/different pairs:
label
0    18809
1    18802
Name: count, dtype: int64


  df = pd.read_csv("label/vox1_test.csv", delim_whitespace=True)


In [9]:
# Create a function to generate trial list CSV for custom dataset with format id10270\00001_seg_0.wav
import random


def create_custom_trial_csv(
    dataset, output_path, num_positive_per_speaker=10, num_negative_total=None
):
    """
    Generate a trial list CSV file from a CustomNoisyEnrollSet dataset

    Args:
        dataset: CustomNoisyEnrollSet dataset
        output_path: Path to save the CSV file
        num_positive_per_speaker: Maximum number of positive (same speaker) trials per speaker
        num_negative_total: Total number of negative trials (if None, matches positive count)
    """
    # Group samples by speaker ID
    speakers = {}
    for sample in dataset.samples:
        speaker_id = sample["speaker_id"]
        audio_path = sample["audio_path"]

        if speaker_id not in speakers:
            speakers[speaker_id] = []
        speakers[speaker_id].append(audio_path)

    print(f"Found {len(speakers)} speakers")

    # Create list for CSV (label, audio1, audio2)
    trial_pairs = []

    # Create positive pairs (same speaker, label=1)
    positive_count = 0
    for speaker_id, audio_paths in speakers.items():
        # Skip speakers with too few samples
        if len(audio_paths) < 2:
            continue

        # Limit number of positive trials per speaker
        speaker_positive_count = 0
        max_positive = min(
            num_positive_per_speaker, len(audio_paths) * (len(audio_paths) - 1) // 2
        )

        # Create random positive pairs for this speaker
        audio_indices = list(range(len(audio_paths)))
        random.shuffle(audio_indices)

        for i in range(len(audio_indices)):
            for j in range(i + 1, len(audio_indices)):
                if speaker_positive_count >= max_positive:
                    break

                idx1, idx2 = audio_indices[i], audio_indices[j]
                trial_pairs.append((1, audio_paths[idx1], audio_paths[idx2]))
                speaker_positive_count += 1
                positive_count += 1

            if speaker_positive_count >= max_positive:
                break

    # Calculate how many negative trials to create
    if num_negative_total is None:
        num_negative_total = positive_count

    # Create negative pairs (different speakers, label=0)
    negative_count = 0
    speaker_ids = list(speakers.keys())
    max_attempts = num_negative_total * 3  # Allow for duplicates
    attempts = 0

    negative_pairs_set = set()  # To check for duplicates efficiently

    while negative_count < num_negative_total and attempts < max_attempts:
        attempts += 1

        # Select two random speakers
        i = random.randint(0, len(speaker_ids) - 1)
        j = random.randint(0, len(speaker_ids) - 1)

        if i != j:  # Ensure different speakers
            speaker1 = speaker_ids[i]
            speaker2 = speaker_ids[j]

            # Select random utterances from each speaker
            audio1 = random.choice(speakers[speaker1])
            audio2 = random.choice(speakers[speaker2])

            # Create a unique identifier for this pair (order doesn't matter for uniqueness check)
            pair_key = tuple(sorted([audio1, audio2]))

            # Check for duplicates
            if pair_key not in negative_pairs_set:
                negative_pairs_set.add(pair_key)
                trial_pairs.append((0, audio1, audio2))
                negative_count += 1

    # Shuffle the trials for randomness
    random.shuffle(trial_pairs)

    # Write to CSV
    with open(output_path, "w") as f:
        # Write trials directly without a header (match VoxCeleb format exactly)
        for label, audio1, audio2 in trial_pairs:
            f.write(f"{label} {audio1} {audio2}\n")

    print(
        f"Created trial list with {positive_count} positive and {negative_count} negative pairs"
    )
    print(f"Total trials: {len(trial_pairs)}")
    print(f"Saved to {output_path}")

    return trial_pairs, positive_count, negative_count


# Execute the function to create the CSV
output_file = "label/gaussian_trials.csv"
trials, pos_count, neg_count = create_custom_trial_csv(
    dataset=custom_dataset,
    output_path=output_file,
    num_positive_per_speaker=400,  # Adjust as needed
    num_negative_total=None,  # Will match positive count for balance
)


# Load and verify the created CSV
def verify_trial_csv(csv_path):
    """Verify the created trial list CSV"""
    df = pd.read_csv(
        csv_path,
        delim_whitespace=True,
        header=None,
        names=["label", "audio1", "audio2"],
    )

    print(f"\nVerifying trial list from {csv_path}")
    print(f"Shape of dataframe: {df.shape}")
    print("\nFirst few rows:")
    print(df.head())

    print("\nDistribution of same/different pairs:")
    print(df["label"].value_counts())

    # Check some of the speaker IDs
    print("\nSample speaker pairings:")
    for i in range(min(5, len(df))):
        row = df.iloc[i]
        speaker1 = (
            row["audio1"].split("/")[0]
            if "/" in row["audio1"]
            else row["audio1"].split("\\")[0]
        )
        speaker2 = (
            row["audio2"].split("/")[0]
            if "/" in row["audio2"]
            else row["audio2"].split("\\")[0]
        )
        print(f"{speaker1} vs {speaker2}: Same = {row['label']}")

    return df


# Verify the created CSV
trial_df = verify_trial_csv(output_file)

Found 40 speakers
Created trial list with 16000 positive and 16000 negative pairs
Total trials: 32000
Saved to label/gaussian_trials.csv

Verifying trial list from label/gaussian_trials.csv
Shape of dataframe: (32000, 3)

First few rows:
   label                    audio1                   audio2
0      1  id10299\00004_seg_18.wav  id10299\00006_seg_6.wav
1      1   id10286\00004_seg_7.wav  id10286\00007_seg_0.wav
2      1  id10282\00007_seg_24.wav  id10282\00005_seg_3.wav
3      1   id10308\00003_seg_7.wav  id10308\00004_seg_4.wav
4      1   id10271\00005_seg_4.wav  id10271\00004_seg_3.wav

Distribution of same/different pairs:
label
1    16000
0    16000
Name: count, dtype: int64

Sample speaker pairings:
id10299 vs id10299: Same = 1
id10286 vs id10286: Same = 1
id10282 vs id10282: Same = 1
id10308 vs id10308: Same = 1
id10271 vs id10271: Same = 1


  df = pd.read_csv(


In [10]:
import os
def load_custom_trial_list(csv_path):
    """Load a custom trial list from CSV file for evaluation"""
    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"Trial list CSV not found at {csv_path}")

    trials = []
    with open(csv_path, "r") as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) == 3:
                label = int(parts[0])
                audio1 = parts[1]
                audio2 = parts[2]
                trials.append([audio1, audio2, bool(label)])

    return trials

In [11]:
trail_list = load_custom_trial_list(output_file)
# Print the first few trials to verify
print("First few trials:")
for trial in trail_list[:5]:
    print(trial)

First few trials:
['id10299\\00004_seg_18.wav', 'id10299\\00006_seg_6.wav', True]
['id10286\\00004_seg_7.wav', 'id10286\\00007_seg_0.wav', True]
['id10282\\00007_seg_24.wav', 'id10282\\00005_seg_3.wav', True]
['id10308\\00003_seg_7.wav', 'id10308\\00004_seg_4.wav', True]
['id10271\\00005_seg_4.wav', 'id10271\\00004_seg_3.wav', True]
