In [1]:
import os
from dotenv import load_dotenv
from io import BytesIO
import requests
from elevenlabs.client import ElevenLabs

In [None]:
# example.py
import os
from dotenv import load_dotenv
from io import BytesIO
import requests
from elevenlabs.client import ElevenLabs

from database.models import User

load_dotenv()

elevenlabs = ElevenLabs(
  api_key="
)

audio_path = "backend/data/noah_and_k_data.wav"

with open(audio_path, "rb") as f:
    transcription = elevenlabs.speech_to_text.convert(
        file=f,
        model_id="scribe_v2", # Model to use
        tag_audio_events=True, # Tag audio events like laughter, applause, etc.
        language_code="eng", # Language of the audio file. If set to None, the model will detect the language automatically.
        diarize=True, # Whether to annotate who is speaking
    )


In [3]:
print(transcription)

language_code='eng' language_probability=1.0 text="Christiana, how's your day been? It's been amazing. How's your day? I'm so tired. [chuckles] Oh, no! I wanna go to bed. Me too. It is what it is. Thank you." words=[SpeechToTextWordResponseModel(text='Christiana,', start=0.66, end=1.319, type='word', speaker_id='speaker_0', logprob=0.0, characters=None), SpeechToTextWordResponseModel(text=' ', start=1.319, end=1.339, type='spacing', speaker_id='speaker_0', logprob=0.0, characters=None), SpeechToTextWordResponseModel(text="how's", start=1.339, end=1.479, type='word', speaker_id='speaker_0', logprob=0.0, characters=None), SpeechToTextWordResponseModel(text=' ', start=1.479, end=1.499, type='spacing', speaker_id='speaker_0', logprob=0.0, characters=None), SpeechToTextWordResponseModel(text='your', start=1.5, end=1.559, type='word', speaker_id='speaker_0', logprob=0.0, characters=None), SpeechToTextWordResponseModel(text=' ', start=1.559, end=1.579, type='spacing', speaker_id='speaker_0', 

In [4]:

from typing import Literal
from pydantic import BaseModel, Field, computed_field

TokenType = Literal["word", "spacing", "audio_event"]

class Token(BaseModel):
    text: str
    start: float
    end: float
    type: TokenType
    speaker_id: str | None = None

class Utterance(BaseModel):
    speaker_id: str
    text: str = Field(..., description="Concatenated text for this utterance.")
    start: float
    end: float

    @computed_field  # pydantic v2
    @property
    def duration(self) -> float:
        return max(0.0, self.end - self.start)


In [5]:
from collections.abc import Iterable


def tokens_to_utterances(
    tokens: Iterable[Token],
    *,
    break_on_silence_s: float | None = 1.2,
    include_audio_events: bool = False,
) -> list[Utterance]:
    out: list[Utterance] = []

    cur_speaker: str | None = None
    cur_parts: list[str] = []
    cur_start: float | None = None
    cur_end: float | None = None
    last_end: float | None = None

    def flush() -> None:
        nonlocal cur_speaker, cur_parts, cur_start, cur_end
        if cur_speaker is None:
            return
        text = "".join(cur_parts).strip()
        if text:
            out.append(Utterance(
                speaker_id=cur_speaker,
                text=text,
                start=float(cur_start or 0.0),
                end=float(cur_end or (cur_start or 0.0)),
            ))
        cur_speaker = None
        cur_parts = []
        cur_start = None
        cur_end = None

    for t in tokens:
        spk = t.speaker_id
        if not spk:
            # drop un-attributed tokens
            last_end = t.end
            continue

        if t.type == "audio_event" and not include_audio_events:
            last_end = t.end
            continue

        # optional silence boundary split
        if (
            break_on_silence_s is not None
            and last_end is not None
            and (t.start - last_end) >= break_on_silence_s
        ):
            flush()

        # speaker change split
        if cur_speaker is not None and spk != cur_speaker:
            flush()

        # start new utterance if needed
        if cur_speaker is None:
            cur_speaker = spk
            cur_start = t.start

        cur_parts.append(t.text)
        cur_end = t.end
        last_end = t.end

    flush()
    return out


In [6]:
def eleven_word_to_token(w) -> Token:
    return Token(
        text=w.text,
        start=float(w.start),
        end=float(w.end),
        type=w.type,              # "word" / "spacing" / "audio_event"
        speaker_id=w.speaker_id,  # "speaker_0", "speaker_1"
    )

words = transcription.words
tokens = [eleven_word_to_token(w) for w in words]
utterances = tokens_to_utterances(tokens, break_on_silence_s=1.2, include_audio_events=False)

# utterances is List[Utterance], each has .duration computed


In [7]:
utterances

[Utterance(speaker_id='speaker_0', text="Christiana, how's your day been?", start=0.66, end=2.699, duration=2.0389999999999997),
 Utterance(speaker_id='speaker_1', text="It's been amazing. How's your day?", start=2.7, end=4.98, duration=2.2800000000000002),
 Utterance(speaker_id='speaker_0', text="I'm so tired.", start=4.98, end=6.0, duration=1.0199999999999996),
 Utterance(speaker_id='speaker_1', text='Oh, no!', start=6.0, end=7.199, duration=1.1989999999999998),
 Utterance(speaker_id='speaker_0', text='I wanna go to bed.', start=7.199, end=8.439, duration=1.2400000000000002),
 Utterance(speaker_id='speaker_1', text='Me too.', start=8.439, end=9.439, duration=1.0),
 Utterance(speaker_id='speaker_0', text='It is what it is. Thank you.', start=9.439, end=11.42, duration=1.9809999999999999)]

In [None]:
from __future__ import annotations

import os
import tempfile
from collections import defaultdict
from pathlib import Path

from pydantic import BaseModel, Field, computed_field

import torch
from pydub import AudioSegment
from speechbrain.inference.speaker import SpeakerRecognition
from speechbrain.dataio import audio_io  # type: ignore


# ---------- Your models ----------
from typing import Literal

# ---------- SpeechBrain verifier ----------
verifier: SpeakerRecognition = SpeakerRecognition.from_hparams(
    source="speechbrain/spkrec-ecapa-voxceleb",
    savedir="pretrained_models/spkrec-ecapa",
)  # type: ignore

def to_mono_batch(wav: torch.Tensor) -> torch.Tensor:
    """
    Accepts wav shaped [T], [C, T], or [B, T].
    Returns [1, T] mono batch.
    """
    if wav.ndim == 2:
        # Treat as channels-first if C is small (like 2 for stereo)
        if wav.shape[0] <= 8:
            wav = wav.mean(dim=0)  # [T]
        # else assume already [B, T] and leave it
    elif wav.ndim != 1:
        raise ValueError(f"Unexpected wav shape: {wav.shape}")

    if wav.ndim == 1:
        wav = wav.unsqueeze(0)  # [1, T]

    return wav

def _clip_utterance_to_temp_wav(
    full_audio: AudioSegment,
    start_s: float,
    end_s: float,
) -> str:
    """
    Export [start_s, end_s] from full_audio to a temporary WAV file, return path.
    """
    start_ms = int(max(0.0, start_s) * 1000)
    end_ms = int(max(0.0, end_s) * 1000)
    seg = full_audio[start_ms:end_ms]

    # Guard: avoid tiny segments that produce unstable speaker decisions
    if len(seg) < 300:  # <300ms
        raise ValueError("segment too short")

    tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
    tmp.close()
    seg.export(tmp.name, format="wav")
    return tmp.name


def _score_paths_with_verify_batch(ref_path: str, utt_path: str) -> float:
    signal_ref, _ = audio_io.load(ref_path)
    signal_utt, _ = audio_io.load(utt_path)
    signal_ref = to_mono_batch(signal_ref)
    signal_utt = to_mono_batch(signal_utt)
    # print(signal_ref.shape)
    # print(signal_utt.shape)

    with torch.inference_mode():
        score, prediction = verifier.verify_batch(signal_ref, signal_utt)

    # score can be:
    # - scalar tensor: [[x]] or [x]
    # - 2-element tensor: [[a, b]] or [a, b] (two-class)
    s = score.detach().cpu().squeeze()

    if s.numel() == 1:
        return float(s.item())

    if s.numel() == 2:
        # We need to decide which element corresponds to "same speaker".
        # SpeechBrain's prediction is the authoritative label; pick the score matching it.
        # prediction is typically [[0]] or [[1]] (or shape that squeezes to scalar).
        pred = int(prediction.detach().cpu().squeeze().item())

        # If pred==1 means "same speaker", use s[1], else use s[0].
        # If SpeechBrain uses the opposite convention in your install, this still ranks correctly
        # across utterances for the same reference because pred is derived from these scores.
        return float(s[pred].item())

    # Unexpected shape: fall back to mean (still provides a sortable scalar)
    return float(s.float().mean().item())



def score_speakers_against_reference(
    *,
    full_audio_path: str,
    utterances: list[Utterance],
    reference_sample_path: str,
    min_utt_s: float = 0.6,
    max_utts_per_speaker: int = 15,
) -> tuple[str, list[tuple[str, float]]]:
    """
    Compute a score for each diarized speaker_id vs the reference sample.

    Returns:
      best_speaker_id,
      scores_sorted_desc = [(speaker_id, mean_score), ...]

    Scoring strategy:
      - clip N utterances per speaker from full audio
      - score each clip vs reference using verifier.verify_batch
      - average scores per speaker
    """
    full_audio = AudioSegment.from_file(full_audio_path)

    # group utterances by diarized speaker_id
    by_speaker: dict[str, list[Utterance]] = defaultdict(list)
    for u in utterances:
        if u.duration >= min_utt_s:
            by_speaker[u.speaker_id].append(u)

    tmp_paths: list[str] = []
    speaker_scores: list[tuple[str, float]] = []

    try:
        for speaker_id, utts in by_speaker.items():
            utts = utts[:max_utts_per_speaker]

            scores: list[float] = []
            for u in utts:
                try:
                    clip_path = _clip_utterance_to_temp_wav(full_audio, u.start, u.end)
                    tmp_paths.append(clip_path)
                    scores.append(_score_paths_with_verify_batch(reference_sample_path, clip_path))
                except ValueError:
                    continue

            if not scores:
                continue

            mean_score = sum(scores) / len(scores)
            speaker_scores.append((speaker_id, mean_score))

        speaker_scores.sort(key=lambda x: x[1], reverse=True)

        if not speaker_scores:
            raise RuntimeError("No usable utterances to score (all too short or failed to load).")

        best_speaker_id = speaker_scores[0][0]
        return best_speaker_id, speaker_scores

    finally:
        for p in tmp_paths:
            try:
                Path(p).unlink(missing_ok=True)
            except Exception:
                pass

In [21]:


best_label, scores = score_speakers_against_reference(
    full_audio_path="./backend/data/noah_and_k_data.wav",
    utterances=utterances,
    reference_sample_path="./backend/data/noah_audio_sample.wav",
)

print("Best label:", best_label)
print("Scores:", scores)

torch.Size([1, 229440])
torch.Size([1, 97872])
torch.Size([1, 229440])
torch.Size([1, 48960])
torch.Size([1, 229440])
torch.Size([1, 59520])
torch.Size([1, 229440])
torch.Size([1, 95088])
torch.Size([1, 229440])
torch.Size([1, 109440])
torch.Size([1, 229440])
torch.Size([1, 57552])
torch.Size([1, 229440])
torch.Size([1, 48000])
Best label: speaker_0
Scores: [('speaker_0', 0.7524326890707016), ('speaker_1', 0.41520418723424274)]
