<a href="https://colab.research.google.com/github/icees8/Speech-Understanding-Programming-Assignment-2/blob/main/Q1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import random
from pathlib import Path
from typing import List, Tuple, Union

import torch
from torch import Tensor
from torch.utils.data import Dataset
from torchaudio._internal import download_url_to_file
from torchaudio.datasets.utils import _extract_zip, _load_waveform

device_name = "cuda" if torch.cuda.is_available() else "cpu"
device = torch.device(device_name)

In [None]:
!pip install speechbrain

Collecting speechbrain
  Downloading speechbrain-1.0.0-py3-none-any.whl (760 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m760.1/760.1 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting hyperpyyaml (from speechbrain)
  Downloading HyperPyYAML-1.2.2-py3-none-any.whl (16 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch>=1.9->speechbrain)
  Downloading nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.7/23.7 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting nvidia-cuda-runtime-cu12==12.1.105 (from torch>=1.9->speechbrain)
  Downloading nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m823.6/823.6 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting nvidia-cuda-cupti-cu12==12.1.105 (from torch>=1.9->speechbrain)
  Downloading nvidia_cuda_cupti_cu12

# VoxCeleb Dataset from Torchaudio

The below code has been taken from [here](https://pytorch.org/audio/main/_modules/torchaudio/datasets/voxceleb1.html)

In [None]:
SAMPLE_RATE = 16000
_ARCHIVE_CONFIGS = {
    # "dev": {
    #     "archive_name": "vox1_dev_wav.zip",
    #     "urls": [
    #         "https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox1_dev_wav_partaa",
    #         "https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox1_dev_wav_partab",
    #         "https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox1_dev_wav_partac",
    #         "https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox1_dev_wav_partad",
    #     ],
    #     "checksums": [
    #         "21ec6ca843659ebc2fdbe04b530baa4f191ad4b0971912672d92c158f32226a0",
    #         "311d21e0c8cbf33573a4fce6c80e5a279d80736274b381c394319fc557159a04",
    #         "92b64465f2b2a3dc0e4196ae8dd6828cbe9ddd1f089419a11e4cbfe2e1750df0",
    #         "00e6190c770b27f27d2a3dd26ee15596b17066b715ac111906861a7d09a211a5",
    #     ],
    # },
    "test": {
        "archive_name": "vox1_test_wav.zip",
        "url": "https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox1_test_wav.zip",
        "checksum": "8de57f347fe22b2c24526e9f444f689ecf5096fc2a92018cf420ff6b5b15eaea",
    },
}
_IDEN_SPLIT_URL = "https://www.robots.ox.ac.uk/~vgg/data/voxceleb/meta/iden_split.txt"
_VERI_TEST_URL = "https://www.robots.ox.ac.uk/~vgg/data/voxceleb/meta/veri_test.txt"


def _download_extract_wavs(root: str):
    for archive in ["test"]:
        archive_name = _ARCHIVE_CONFIGS[archive]["archive_name"]
        archive_path = os.path.join(root, archive_name)
        # The zip file of dev data is splited to 4 chunks.
        # Download and combine them into one file before extraction.
        if archive == "dev":
            urls = _ARCHIVE_CONFIGS[archive]["urls"]
            checksums = _ARCHIVE_CONFIGS[archive]["checksums"]
            with open(archive_path, "wb") as f:
                for url, checksum in zip(urls, checksums):
                    file_path = os.path.join(root, os.path.basename(url))
                    download_url_to_file(url, file_path, hash_prefix=checksum)
                    with open(file_path, "rb") as f_split:
                        f.write(f_split.read())
        else:
            url = _ARCHIVE_CONFIGS[archive]["url"]
            checksum = _ARCHIVE_CONFIGS[archive]["checksum"]
            download_url_to_file(url, archive_path, hash_prefix=checksum)
        _extract_zip(archive_path)


def _get_flist(root: str, file_path: str, subset: str) -> List[str]:
    f_list = []
    if subset == "train":
        index = 1
    elif subset == "dev":
        index = 2
    else:
        index = 3
    with open(file_path, "r") as f:
        for line in f:
            id, path = line.split()
            if int(id) == index:
                f_list.append(path)
    return sorted(f_list)


def _get_paired_flist(root: str, veri_test_path: str):
    f_list = []
    with open(veri_test_path, "r") as f:
        for line in f:
            label, path1, path2 = line.split()
            f_list.append((label, path1, path2))
    return f_list


def _get_file_id(file_path: str, _ext_audio: str):
    speaker_id, youtube_id, utterance_id = file_path.split("/")[-3:]
    utterance_id = utterance_id.replace(_ext_audio, "")
    file_id = "-".join([speaker_id, youtube_id, utterance_id])
    return file_id


class VoxCeleb1(Dataset):
    """*VoxCeleb1* :cite:`nagrani2017voxceleb` dataset.

    Args:
        root (str or Path): Path to the directory where the dataset is found or downloaded.
        download (bool, optional):
            Whether to download the dataset if it is not found at root path. (Default: ``False``).
    """

    _ext_audio = ".wav"

    def __init__(self, root: Union[str, Path], download: bool = False) -> None:
        # Get string representation of 'root' in case Path object is passed
        root = os.fspath(root)
        self._path = os.path.join(root, "wav")
        if not os.path.isdir(self._path):
            if not download:
                raise RuntimeError(
                    f"Dataset not found at {self._path}. Please set `download=True` to download the dataset."
                )
            _download_extract_wavs(root)

    def get_metadata(self, n: int):
        raise NotImplementedError

    def __getitem__(self, n: int):
        raise NotImplementedError

    def __len__(self) -> int:
        raise NotImplementedError


class VoxCeleb1Identification(VoxCeleb1):
    """*VoxCeleb1* :cite:`nagrani2017voxceleb` dataset for speaker identification task.

    Each data sample contains the waveform, sample rate, speaker id, and the file id.

    Args:
        root (str or Path): Path to the directory where the dataset is found or downloaded.
        subset (str, optional): Subset of the dataset to use. Options: ["train", "dev", "test"]. (Default: ``"train"``)
        meta_url (str, optional): The url of meta file that contains the list of subset labels and file paths.
            The format of each row is ``subset file_path". For example: ``1 id10006/nLEBBc9oIFs/00003.wav``.
            ``1``, ``2``, ``3`` mean ``train``, ``dev``, and ``test`` subest, respectively.
            (Default: ``"https://www.robots.ox.ac.uk/~vgg/data/voxceleb/meta/iden_split.txt"``)
        download (bool, optional):
            Whether to download the dataset if it is not found at root path. (Default: ``False``).

    Note:
        The file structure of `VoxCeleb1Identification` dataset is as follows:

        └─ root/

         └─ wav/

         └─ speaker_id folders

        Users who pre-downloaded the ``"vox1_dev_wav.zip"`` and ``"vox1_test_wav.zip"`` files need to move
        the extracted files into the same ``root`` directory.
    """

    def __init__(
        self, root: Union[str, Path], subset: str = "train", meta_url: str = _IDEN_SPLIT_URL, download: bool = False
    ) -> None:
        super().__init__(root, download)
        if subset not in ["train", "dev", "test"]:
            raise ValueError("`subset` must be one of ['train', 'dev', 'test']")
        # download the iden_split.txt to get the train, dev, test lists.
        meta_list_path = os.path.join(root, os.path.basename(meta_url))
        if not os.path.exists(meta_list_path):
            download_url_to_file(meta_url, meta_list_path)
        self._flist = _get_flist(self._path, meta_list_path, subset)

    def get_metadata(self, n: int) -> Tuple[str, int, int, str]:
        """Get metadata for the n-th sample from the dataset. Returns filepath instead of waveform,
        but otherwise returns the same fields as :py:func:`__getitem__`.

        Args:
            n (int): The index of the sample

        Returns:
            Tuple of the following items;

            str:
                Path to audio
            int:
                Sample rate
            int:
                Speaker ID
            str:
                File ID
        """
        file_path = self._flist[n]
        file_id = _get_file_id(file_path, self._ext_audio)
        speaker_id = file_id.split("-")[0]
        speaker_id = int(speaker_id[3:])
        return file_path, SAMPLE_RATE, speaker_id, file_id


    def __getitem__(self, n: int) -> Tuple[Tensor, int, int, str]:
        """Load the n-th sample from the dataset.

        Args:
            n (int): The index of the sample to be loaded

        Returns:
            Tuple of the following items;

            Tensor:
                Waveform
            int:
                Sample rate
            int:
                Speaker ID
            str:
                File ID
        """
        metadata = self.get_metadata(n)
        waveform = _load_waveform(self._path, metadata[0], metadata[1])
        return (waveform,) + metadata[1:]


    def __len__(self) -> int:
        return len(self._flist)



class VoxCeleb1Verification(VoxCeleb1):
    """*VoxCeleb1* :cite:`nagrani2017voxceleb` dataset for speaker verification task.

    Each data sample contains a pair of waveforms, sample rate, the label indicating if they are
    from the same speaker, and the file ids.

    Args:
        root (str or Path): Path to the directory where the dataset is found or downloaded.
        meta_url (str, optional): The url of meta file that contains a list of utterance pairs
            and the corresponding labels. The format of each row is ``label file_path1 file_path2".
            For example: ``1 id10270/x6uYqmx31kE/00001.wav id10270/8jEAjG6SegY/00008.wav``.
            ``1`` means the two utterances are from the same speaker, ``0`` means not.
            (Default: ``"https://www.robots.ox.ac.uk/~vgg/data/voxceleb/meta/veri_test.txt"``)
        download (bool, optional):
            Whether to download the dataset if it is not found at root path. (Default: ``False``).

    Note:
        The file structure of `VoxCeleb1Verification` dataset is as follows:

        └─ root/

         └─ wav/

         └─ speaker_id folders

        Users who pre-downloaded the ``"vox1_dev_wav.zip"`` and ``"vox1_test_wav.zip"`` files need to move
        the extracted files into the same ``root`` directory.
    """

    def __init__(self, root: Union[str, Path], meta_url: str = _VERI_TEST_URL, download: bool = False) -> None:
        super().__init__(root, download)
        # download the veri_test.txt to get the list of training pairs and labels.
        meta_list_path = os.path.join(root, os.path.basename(meta_url))
        if not os.path.exists(meta_list_path):
            download_url_to_file(meta_url, meta_list_path)
        self._flist = _get_paired_flist(self._path, meta_list_path)

    def get_metadata(self, n: int) -> Tuple[str, str, int, int, str, str]:
        """Get metadata for the n-th sample from the dataset. Returns filepaths instead of waveforms,
        but otherwise returns the same fields as :py:func:`__getitem__`.

        Args:
            n (int): The index of the sample

        Returns:
            Tuple of the following items;

            str:
                Path to audio file of speaker 1
            str:
                Path to audio file of speaker 2
            int:
                Sample rate
            int:
                Label
            str:
                File ID of speaker 1
            str:
                File ID of speaker 2
        """
        label, file_path_spk1, file_path_spk2 = self._flist[n]
        label = int(label)
        file_id_spk1 = _get_file_id(file_path_spk1, self._ext_audio)
        file_id_spk2 = _get_file_id(file_path_spk2, self._ext_audio)
        return file_path_spk1, file_path_spk2, SAMPLE_RATE, label, file_id_spk1, file_id_spk2


    def __getitem__(self, n: int) -> Tuple[Tensor, Tensor, int, int, str, str]:
        """Load the n-th sample from the dataset.

        Args:
            n (int): The index of the sample to be loaded.

        Returns:
            Tuple of the following items;

            Tensor:
                Waveform of speaker 1
            Tensor:
                Waveform of speaker 2
            int:
                Sample rate
            int:
                Label
            str:
                File ID of speaker 1
            str:
                File ID of speaker 2
        """
        metadata = self.get_metadata(n)
        waveform_spk1 = _load_waveform(self._path, metadata[0], metadata[2])
        waveform_spk2 = _load_waveform(self._path, metadata[1], metadata[2])
        return (waveform_spk1, waveform_spk2) + metadata[2:]


    def __len__(self) -> int:
        return len(self._flist)


In [None]:
dataset = VoxCeleb1Verification(root="/content/", download = True)

100%|██████████| 1.00G/1.00G [00:57<00:00, 18.7MB/s]
100%|██████████| 2.23M/2.23M [00:00<00:00, 3.40MB/s]


# load models


## ecapa tdnn

In [None]:
import torchaudio
from speechbrain.inference.speaker import SpeakerRecognition

In [None]:
ecapa = SpeakerRecognition.from_hparams(source="speechbrain/spkrec-ecapa-voxceleb", savedir="pretrained_models/spkrec-ecapa-voxceleb", run_opts={"device":device_name})

hyperparams.yaml:   0%|          | 0.00/1.92k [00:00<?, ?B/s]

embedding_model.ckpt:   0%|          | 0.00/83.3M [00:00<?, ?B/s]

mean_var_norm_emb.ckpt:   0%|          | 0.00/1.92k [00:00<?, ?B/s]

classifier.ckpt:   0%|          | 0.00/5.53M [00:00<?, ?B/s]

label_encoder.txt:   0%|          | 0.00/129k [00:00<?, ?B/s]

## wavlm base plus

In [None]:
from transformers import AutoFeatureExtractor, WavLMForXVector

In [None]:
wavlm_base_plus_feature_extractor = AutoFeatureExtractor.from_pretrained("microsoft/wavlm-base-plus-sv")
wavlm_base_plus = WavLMForXVector.from_pretrained("microsoft/wavlm-base-plus-sv")

wavlm_base_plus = wavlm_base_plus.to(device)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


preprocessor_config.json:   0%|          | 0.00/215 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/58.6k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Some weights of the model checkpoint at microsoft/wavlm-base-plus-sv were not used when initializing WavLMForXVector: ['wavlm.encoder.pos_conv_embed.conv.weight_g', 'wavlm.encoder.pos_conv_embed.conv.weight_v']
- This IS expected if you are initializing WavLMForXVector from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing WavLMForXVector from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of WavLMForXVector were not initialized from the model checkpoint at microsoft/wavlm-base-plus-sv and are newly initialized: ['wavlm.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wavlm.encoder.pos_conv_embed.conv.parametrizations.weight.original1']
You should probably TRAIN this model on a d

## wavlm large

In [None]:
from transformers import AutoFeatureExtractor, WavLMForXVector

In [None]:
wavlm_large_feature_extractor = AutoFeatureExtractor.from_pretrained("microsoft/wavlm-large")
wavlm_large = WavLMForXVector.from_pretrained("microsoft/wavlm-large")

wavlm_large = wavlm_large.to(device)

preprocessor_config.json:   0%|          | 0.00/214 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/2.22k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/1.26G [00:00<?, ?B/s]

Some weights of WavLMForXVector were not initialized from the model checkpoint at microsoft/wavlm-large and are newly initialized: ['classifier.bias', 'classifier.weight', 'encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'encoder.pos_conv_embed.conv.parametrizations.weight.original1', 'feature_extractor.bias', 'feature_extractor.weight', 'objective.weight', 'projector.bias', 'projector.weight', 'tdnn.0.kernel.bias', 'tdnn.0.kernel.weight', 'tdnn.1.kernel.bias', 'tdnn.1.kernel.weight', 'tdnn.2.kernel.bias', 'tdnn.2.kernel.weight', 'tdnn.3.kernel.bias', 'tdnn.3.kernel.weight', 'tdnn.4.kernel.bias', 'tdnn.4.kernel.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


# Equal error rate

In [None]:
import numpy as np
from sklearn.metrics import roc_curve

def equal_error_rate(y_true, y_pred):

    fpr, tpr, _ = roc_curve(y_true, y_pred, pos_label=1)
    fnr = 1 - tpr

    EER_p = fpr[np.nanargmin(np.abs((fnr - fpr)))]
    EER_n = fnr[np.nanargmin(np.abs((fnr - fpr)))]

    return np.mean([EER_p , EER_n])

# Model EER on voxceleb

## ecapa tdnn

In [None]:
REAL = []
PRED = []

PATH = "/content/wav/"

from tqdm.notebook import tqdm

for index in tqdm(random.choices(range(len(dataset)),1000)):
    path1 , path2 , sr , label , _ , _ = dataset.get_metadata(index)

    PRED.append(ecapa.verify_files(PATH + path1,PATH + path2)[0].item())
    REAL.append(label)

  0%|          | 0/1000 [00:00<?, ?it/s]

In [None]:
ecapa_err = equal_error_rate(REAL , PRED)
print(ecapa_err)

0.46299999999999997


## wavlm base plus

In [None]:
PRED = []
REAL = []

def get_output_wavlm_base_plus(audio1 , audio2 , sr):

    audio1 = audio1.tolist()[0]
    audio2 = audio2.tolist()[0]

    with torch.no_grad():
        inputs = wavlm_base_plus_feature_extractor([audio1, audio2],sampling_rate = sr, padding=True, return_tensors="pt")
        inputs = inputs.to(device)

        embeddings = wavlm_base_plus(**inputs).embeddings

        embeddings = torch.nn.functional.normalize(embeddings, dim=-1)
        cosine_sim = torch.nn.CosineSimilarity(dim=-1)
        similarity = cosine_sim(embeddings[0], embeddings[1])

    return similarity.item()

for index in tqdm(random.choices(range(len(dataset)),1000)):

    audio1 , audio2 , sr , label , _ , _ = dataset[index]

    output = get_output_wavlm_base_plus(audio1 , audio2 , sr)

    PRED.append(output)
    REAL.append(label)

  0%|          | 0/1000 [00:00<?, ?it/s]



In [None]:
wavlm_base_plus_eer = equal_error_rate(REAL , PRED)
print(wavlm_base_plus_eer)

0.010000000000000005


## wavlm large

In [None]:
PRED = []
REAL = []

def get_output_wavlm_large(audio1 , audio2 , sr):

    audio1 = audio1.tolist()[0]
    audio2 = audio2.tolist()[0]

    with torch.no_grad():
        inputs = wavlm_large_feature_extractor([audio1, audio2],sampling_rate = sr, padding=True, return_tensors="pt")
        inputs = inputs.to(device)

        embeddings = wavlm_large(**inputs).embeddings

        embeddings = torch.nn.functional.normalize(embeddings, dim=-1)
        cosine_sim = torch.nn.CosineSimilarity(dim=-1)
        similarity = cosine_sim(embeddings[0], embeddings[1])

    return similarity.item()

for index in tqdm(random.choices(range(len(dataset)),1000)):

    audio1 , audio2 , sr , label , _ , _ = dataset[index]

    output = get_output_wavlm_large(audio1 , audio2 , sr)

    PRED.append(output)
    REAL.append(label)

  0%|          | 0/1000 [00:00<?, ?it/s]



In [None]:
wavlm_large_eer = equal_error_rate(REAL , PRED)
print(wavlm_large_eer)

0.381
