<a href="https://colab.research.google.com/github/abdullahks7/mlproj/blob/main/Untitled43.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -q "numpy<2.0"

!pip install -q \
  torch torchaudio --index-url https://download.pytorch.org/whl/cu121

!pip install -q \
  speechbrain \
  "transformers[audio]==4.45.2" \
  "datasets==2.18.0" \
  "modelscope==1.14.0" \
  soundfile


[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m44.4/44.4 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m9.9/9.9 MB[0m [31m59.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m3.0/3.0 MB[0m [31m41.5 MB/s[0m eta [36m0:00:00[0m
[?25h

In [4]:
import os
from typing import Dict, Any, Optional, List, Tuple

import numpy as np
import torch




def cosine_sim(a: np.ndarray, b: np.ndarray) -> float:
    """L2-normalized cosine similarity."""
    a = a.astype(np.float32)
    b = b.astype(np.float32)
    a = a / (np.linalg.norm(a) + 1e-6)
    b = b / (np.linalg.norm(b) + 1e-6)
    return float(np.dot(a, b))


def verdict_from_cosine(cos: float, threshold: float = 0.6) -> str:
    """
    Simple rule:
      - cos >= threshold  -> SAME speaker
      - cos < threshold   -> DIFFERENT speaker
    """
    if cos is None:
        return "n/a"
    return "same" if cos >= threshold else "different"


def crop_center(wav: torch.Tensor, sr: int, max_duration_s: Optional[float]) -> torch.Tensor:
    """
    Center-crop waveform to max_duration_s if longer.
    wav: [1, T]
    """
    if max_duration_s is None:
        return wav
    max_len = int(sr * max_duration_s)
    T = wav.shape[-1]
    if T <= max_len:
        return wav
    start = (T - max_len) // 2
    return wav[..., start:start + max_len]


def load_wave(
    path: str,
    target_sr: Optional[int] = 16000,
    device: str = "cpu",
    max_duration_s: Optional[float] = None,
) -> Tuple[torch.Tensor, int]:
    """
    Load audio with torchaudio, convert to mono, resample to target_sr,
    and optionally center-crop to max_duration_s.

    Handles WAV, MP3, etc. as long as torchaudio + ffmpeg are available.
    Returns (waveform [1, T] on device, sr).
    """
    import torchaudio

    if not os.path.exists(path):
        raise FileNotFoundError(f"Audio file not found: {path}")

    # torchaudio.load works for .wav, .mp3, etc.
    wav, sr = torchaudio.load(path)  # [channels, time]

    # Ensure [1, time]
    if wav.dim() == 1:
        wav = wav.unsqueeze(0)

    # Convert to mono if needed
    if wav.shape[0] > 1:
        wav = wav.mean(dim=0, keepdim=True)

    # Resample if needed
    if target_sr is not None and sr != target_sr:
        resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=target_sr)
        wav = resampler(wav)
        sr = target_sr

    # Center-crop
    wav = crop_center(wav, sr, max_duration_s)

    # Ensure float32
    wav = wav.to(device=device, dtype=torch.float32)
    return wav, sr


# ===============================
# Model caches (no re-download)
# ===============================

_speechbrain_cache: Dict[Tuple[str, str], Any] = {}
_hf_cache: Dict[Tuple[str, str], Dict[str, Any]] = {}


def get_speechbrain_classifier(source: str, device: str = "cpu"):
    """Cache SpeechBrain classifiers per (source, device)."""
    from speechbrain.inference.speaker import EncoderClassifier

    key = (source, device)
    if key not in _speechbrain_cache:
        _speechbrain_cache[key] = EncoderClassifier.from_hparams(
            source=source,
            run_opts={"device": device},
        )
    return _speechbrain_cache[key]


def get_hf_model(
    model_name: str,
    feature_extractor_cls,
    model_cls,
    device: str = "cpu",
):
    """Cache HF feature_extractor + model per (model_name, device)."""
    key = (model_name, device)
    if key not in _hf_cache:
        feature_extractor = feature_extractor_cls.from_pretrained(model_name)
        model = model_cls.from_pretrained(model_name).to(device)
        model.eval()
        _hf_cache[key] = {
            "feature_extractor": feature_extractor,
            "model": model,
        }
    return _hf_cache[key]["feature_extractor"], _hf_cache[key]["model"]




# ----- SpeechBrain models -----

def ecapa_embedding(path: str, device: str = "cpu", max_duration_s: Optional[float] = 5.0) -> np.ndarray:
    classifier = get_speechbrain_classifier("speechbrain/spkrec-ecapa-voxceleb", device)
    wav, _ = load_wave(path, target_sr=16000, device=device, max_duration_s=max_duration_s)
    emb = classifier.encode_batch(wav)  # [batch, 1, emb_dim] or [batch, emb_dim]
    emb = emb.squeeze().detach().cpu().numpy()
    return emb


def resnet_embedding(path: str, device: str = "cpu", max_duration_s: Optional[float] = 5.0) -> np.ndarray:
    classifier = get_speechbrain_classifier("speechbrain/spkrec-resnet-voxceleb", device)
    wav, _ = load_wave(path, target_sr=16000, device=device, max_duration_s=max_duration_s)
    emb = classifier.encode_batch(wav).squeeze().detach().cpu().numpy()
    return emb


def xvect_embedding(path: str, device: str = "cpu", max_duration_s: Optional[float] = 5.0) -> np.ndarray:
    classifier = get_speechbrain_classifier("speechbrain/spkrec-xvect-voxceleb", device)
    wav, _ = load_wave(path, target_sr=16000, device=device, max_duration_s=max_duration_s)
    emb = classifier.encode_batch(wav).squeeze().detach().cpu().numpy()
    return emb


# ----- HF x-vector SV models -----

def wavlm_sv_embedding(path: str, device: str = "cpu", max_duration_s: Optional[float] = 5.0) -> np.ndarray:
    """
    microsoft/wavlm-base-plus-sv using WavLMForXVector.
    """
    from transformers import Wav2Vec2FeatureExtractor, WavLMForXVector

    model_name = "microsoft/wavlm-base-plus-sv"
    fe, model = get_hf_model(model_name, Wav2Vec2FeatureExtractor, WavLMForXVector, device)

    wav, _ = load_wave(path, target_sr=16000, device=device, max_duration_s=max_duration_s)
    wav_np = wav.squeeze().cpu().numpy()

    inputs = fe(
        [wav_np],
        sampling_rate=16000,
        padding=True,
        return_tensors="pt",
    ).to(device)

    with torch.no_grad():
        outputs = model(**inputs)
        emb = outputs.embeddings  # [batch, emb_dim]

    emb = emb.squeeze().cpu().numpy()
    return emb


def unispeech_sv_embedding(path: str, device: str = "cpu", max_duration_s: Optional[float] = 5.0) -> np.ndarray:
    """
    microsoft/unispeech-sat-base-plus-sv using UniSpeechSatForXVector.
    """
    from transformers import Wav2Vec2FeatureExtractor, UniSpeechSatForXVector

    model_name = "microsoft/unispeech-sat-base-plus-sv"
    fe, model = get_hf_model(model_name, Wav2Vec2FeatureExtractor, UniSpeechSatForXVector, device)

    wav, _ = load_wave(path, target_sr=16000, device=device, max_duration_s=max_duration_s)
    wav_np = wav.squeeze().cpu().numpy()

    inputs = fe(
        [wav_np],
        sampling_rate=16000,
        padding=True,
        return_tensors="pt",
    ).to(device)

    with torch.no_grad():
        outputs = model(**inputs)
        emb = outputs.embeddings  # [batch, emb_dim]

    emb = emb.squeeze().cpu().numpy()
    return emb


def wav2vec2_superb_embedding(path: str, device: str = "cpu", max_duration_s: Optional[float] = 5.0) -> np.ndarray:
    """
    anton-l/wav2vec2-base-superb-sv using Wav2Vec2ForXVector.
    """
    from transformers import Wav2Vec2FeatureExtractor, Wav2Vec2ForXVector

    model_name = "anton-l/wav2vec2-base-superb-sv"
    fe, model = get_hf_model(model_name, Wav2Vec2FeatureExtractor, Wav2Vec2ForXVector, device)

    wav, _ = load_wave(path, target_sr=16000, device=device, max_duration_s=max_duration_s)
    wav_np = wav.squeeze().cpu().numpy()

    inputs = fe(
        [wav_np],
        sampling_rate=16000,
        padding=True,
        return_tensors="pt",
    ).to(device)

    with torch.no_grad():
        outputs = model(**inputs)
        emb = outputs.embeddings  # [batch, emb_dim]

    emb = emb.squeeze().cpu().numpy()
    return emb


# ----- HF generic encoders (pooled) -----

def wavlm_base_pooled_embedding(path: str, device: str = "cpu", max_duration_s: Optional[float] = 5.0) -> np.ndarray:
    """
    microsoft/wavlm-base as a generic encoder:
    mean-pool last_hidden_state over time.
    """
    from transformers import AutoFeatureExtractor, WavLMModel

    model_name = "microsoft/wavlm-base"
    fe, model = get_hf_model(model_name, AutoFeatureExtractor, WavLMModel, device)

    wav, _ = load_wave(path, target_sr=16000, device=device, max_duration_s=max_duration_s)
    wav_np = wav.squeeze().cpu().numpy()

    inputs = fe(
        [wav_np],
        sampling_rate=16000,
        padding=True,
        return_tensors="pt",
    ).to(device)

    with torch.no_grad():
        outputs = model(**inputs)
        hidden = outputs.last_hidden_state  # [B, T, C]
        emb = hidden.mean(dim=1)            # [B, C]

    emb = emb.squeeze().cpu().numpy()
    return emb


def wav2vec2_base_pooled_embedding(path: str, device: str = "cpu", max_duration_s: Optional[float] = 5.0) -> np.ndarray:
    """
    facebook/wav2vec2-base as a generic encoder:
    mean-pool last_hidden_state over time.
    """
    from transformers import AutoFeatureExtractor, Wav2Vec2Model

    model_name = "facebook/wav2vec2-base"
    fe, model = get_hf_model(model_name, AutoFeatureExtractor, Wav2Vec2Model, device)

    wav, _ = load_wave(path, target_sr=16000, device=device, max_duration_s=max_duration_s)
    wav_np = wav.squeeze().cpu().numpy()

    inputs = fe(
        [wav_np],
        sampling_rate=16000,
        padding=True,
        return_tensors="pt",
    ).to(device)

    with torch.no_grad():
        outputs = model(**inputs)
        hidden = outputs.last_hidden_state  # [B, T, C]
        emb = hidden.mean(dim=1)

    emb = emb.squeeze().cpu().numpy()
    return emb




def run_benchmark(
    audio1: str,
    audio2: str,
    device: Optional[str] = None,
    threshold: float = 0.6,
    max_duration_s: Optional[float] = 5.0,
) -> List[Dict[str, Any]]:
    """
    Run multiple speaker models on two audio files and print a comparison table.
    Returns a list of result dicts.

    threshold: cosine threshold for same/different decision.
    max_duration_s: audio is center-cropped to this many seconds before embedding.
    """
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"

    print(f"Using device: {device}")
    print(f"Audio1: {audio1}")
    print(f"Audio2: {audio2}")
    print(f"Decision threshold: {threshold}")
    print(f"Max duration (crop): {max_duration_s}s")
    print()

    if not os.path.exists(audio1):
        raise FileNotFoundError(f"audio1 not found: {audio1}")
    if not os.path.exists(audio2):
        raise FileNotFoundError(f"audio2 not found: {audio2}")

    rows: List[Dict[str, Any]] = []

    def run_model(name: str, fn, kind: str):
        """Run embedding+cosine model safely and compute extra metrics."""
        try:
            emb1 = fn(audio1, device=device, max_duration_s=max_duration_s)
            emb2 = fn(audio2, device=device, max_duration_s=max_duration_s)

            # Raw norms
            norm1 = float(np.linalg.norm(emb1) + 1e-6)
            norm2 = float(np.linalg.norm(emb2) + 1e-6)

            # Normalized embeddings
            emb1n = emb1 / norm1
            emb2n = emb2 / norm2

            # Cosine similarity (dot of normalized)
            cos = float(np.dot(emb1n.astype(np.float32), emb2n.astype(np.float32)))

            # L2 distance between normalized vectors
            l2dist = float(np.linalg.norm(emb1n - emb2n))

            verdict = verdict_from_cosine(cos, threshold=threshold)

            rows.append(
                {
                    "model": name,
                    "type": kind,
                    "cosine": cos,
                    "l2dist": l2dist,
                    "norm1": norm1,
                    "norm2": norm2,
                    "verdict": verdict,
                    "extra": "",
                }
            )
        except Exception as e:
            rows.append(
                {
                    "model": name,
                    "type": kind,
                    "cosine": None,
                    "l2dist": None,
                    "norm1": None,
                    "norm2": None,
                    "verdict": "error",
                    "extra": f"ERROR: {repr(e)}",
                }
            )

    # üîπ SpeechBrain models
    run_model("ecapa-voxceleb",       ecapa_embedding,         "speechbrain-sv")
    run_model("resnet-voxceleb",      resnet_embedding,        "speechbrain-sv")
    run_model("xvect-voxceleb",       xvect_embedding,         "speechbrain-sv")

    # üîπ HF x-vector SV models
    run_model("wavlm-base-plus-sv",   wavlm_sv_embedding,      "hf-xvector-sv")
    run_model("unispeech-sat-plus-sv",unispeech_sv_embedding,  "hf-xvector-sv")
    run_model("wav2vec2-superb-sv",   wav2vec2_superb_embedding,"hf-xvector-sv")

    # üîπ HF generic encoders (pooled)
    run_model("wavlm-base-pooled",    wavlm_base_pooled_embedding, "hf-generic-pooled")
    run_model("wav2vec2-base-pooled", wav2vec2_base_pooled_embedding,"hf-generic-pooled")

    # Pretty print
    print("Results:\n")
    print(
        f"{'Model':24} {'Kind':20} {'Cosine':10} {'L2dist':10} "
        f"{'Norm1':10} {'Norm2':10} {'Verdict':10} Extra"
    )
    print("-" * 130)
    for r in rows:
        cosine_str = f"{r['cosine']:.4f}" if isinstance(r["cosine"], float) else "None"
        l2_str     = f"{r['l2dist']:.4f}" if isinstance(r["l2dist"], float) else "None"
        n1_str     = f"{r['norm1']:.2f}" if isinstance(r["norm1"], float) else "None"
        n2_str     = f"{r['norm2']:.2f}" if isinstance(r["norm2"], float) else "None"

        print(
            f"{r['model']:24} {r['type']:20} {cosine_str:10} {l2_str:10} "
            f"{n1_str:10} {n2_str:10} {r['verdict']:10} {r['extra']}"
        )

    # Majority verdict (ignoring errors)
    valid = [r for r in rows if isinstance(r["cosine"], float)]
    same_count = sum(1 for r in valid if r["verdict"] == "same")
    diff_count = sum(1 for r in valid if r["verdict"] == "different")

    if valid:
        print("\n--- Overall ---")
        if same_count > diff_count:
            overall = "SAME SPEAKER"
        elif diff_count > same_count:
            overall = "DIFFERENT SPEAKERS"
        else:
            overall = "UNCERTAIN / TIE"

        print(
            f"Majority verdict (@threshold={threshold}): {overall} "
            f"(same={same_count}, different={diff_count}, total_valid={len(valid)})"
        )

    return rows


INFO:speechbrain.utils.fetching:Fetch hyperparams.yaml: Fetching from HuggingFace Hub 'speechbrain/spkrec-ecapa-voxceleb' if not cached


Using device: cpu
Loading SpeechBrain models...


DEBUG:speechbrain.utils.fetching:Fetch: Local file found, creating symlink '/root/.cache/huggingface/hub/models--speechbrain--spkrec-ecapa-voxceleb/snapshots/0f99f2d0ebe89ac095bcc5903c4dd8f72b367286/hyperparams.yaml' -> '/content/pretrained_models/ecapa/hyperparams.yaml'
INFO:speechbrain.utils.fetching:Fetch custom.py: Fetching from HuggingFace Hub 'speechbrain/spkrec-ecapa-voxceleb' if not cached
DEBUG:speechbrain.utils.parameter_transfer:Collecting files (or symlinks) for pretraining in pretrained_models/ecapa.
INFO:speechbrain.utils.fetching:Fetch embedding_model.ckpt: Fetching from HuggingFace Hub 'speechbrain/spkrec-ecapa-voxceleb' if not cached
DEBUG:speechbrain.utils.fetching:Fetch: Local file found, creating symlink '/root/.cache/huggingface/hub/models--speechbrain--spkrec-ecapa-voxceleb/snapshots/0f99f2d0ebe89ac095bcc5903c4dd8f72b367286/embedding_model.ckpt' -> '/content/pretrained_models/ecapa/embedding_model.ckpt'
DEBUG:speechbrain.utils.parameter_transfer:Set local path in 

Loading HF pooled models (WavLM, Wav2Vec2)...




Loading 3D-Speaker pipelines (ERes2Net, ERes2NetV2, CAMP++)...


2025-11-16 10:27:50,291 - modelscope - INFO - initiate model from /root/.cache/modelscope/hub/iic/speech_eres2net_base_sv_zh-cn_3dspeaker_16k
2025-11-16 10:27:50,293 - modelscope - INFO - initiate model from location /root/.cache/modelscope/hub/iic/speech_eres2net_base_sv_zh-cn_3dspeaker_16k.
2025-11-16 10:27:50,295 - modelscope - INFO - initialize model from /root/.cache/modelscope/hub/iic/speech_eres2net_base_sv_zh-cn_3dspeaker_16k
2025-11-16 10:27:50,298 - modelscope - INFO - cuda is not available, using cpu instead.
2025-11-16 10:27:50,659 - modelscope - INFO - cuda is not available, using cpu instead.
2025-11-16 10:27:53,139 - modelscope - INFO - initiate model from /root/.cache/modelscope/hub/iic/speech_eres2netv2_sv_zh-cn_16k-common
2025-11-16 10:27:53,139 - modelscope - INFO - initiate model from location /root/.cache/modelscope/hub/iic/speech_eres2netv2_sv_zh-cn_16k-common.
2025-11-16 10:27:53,142 - modelscope - INFO - initialize model from /root/.cache/modelscope/hub/iic/spee