In [None]:
import torch
import torchaudio
import os
import pickle
import numpy as np
from transformers import AutoModel, AutoProcessor
from scipy.spatial.distance import cosine

import librosa
import soundfile as sf

fs = 48000

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# Load model and processor
MODEL_NAME = "MERaLiON/MERaLiON-SpeechEncoder-v1"
device = "cuda" if torch.cuda.is_available() else "cpu"
model = AutoModel.from_pretrained(MODEL_NAME).to(device)
processor = AutoProcessor.from_pretrained(MODEL_NAME)

A new version of the following files was downloaded from https://huggingface.co/MERaLiON/MERaLiON-SpeechEncoder-v1:
- configuration_bestrq_conformer.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.
A new version of the following files was downloaded from https://huggingface.co/MERaLiON/MERaLiON-SpeechEncoder-v1:
- modeling_bestrq_conformer.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.


OutOfMemoryError: CUDA out of memory. Tried to allocate 16.00 MiB. GPU 0 has a total capacity of 1.95 GiB of which 15.62 MiB is free. Including non-PyTorch memory, this process has 1.93 GiB memory in use. Of the allocated memory 1.88 GiB is allocated by PyTorch, and 9.36 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [None]:
# Paths
KNOWN_SPEAKERS_DIR = "known_speakers"  # Folder containing known speaker recordings
DB_FILE = "speaker_embeddings.pkl"
THRESHOLD = 0.4  # Cosine similarity threshold for identifying known speakers

In [None]:
# Load speaker embeddings
def extract_embedding(audio_path):
    waveform, sample_rate = torchaudio.load(audio_path)

    # If the waveform has more than one channel (e.g., stereo), convert it to mono
    if waveform.shape[0] > 1:
        waveform = waveform.mean(dim=0, keepdim=True)

    # Ensure waveform is in the correct shape (batch, time)
    if len(waveform.shape) == 1:
        waveform = waveform.unsqueeze(0)

    # Process audio with the correct input arguments
    inputs = processor(waveform, sampling_rate=sample_rate, return_tensors="pt")

    # Explicitly set input_lengths
    input_lengths = torch.tensor([waveform.shape[1]])

    # Run inference
    with torch.no_grad():
        embedding = model(inputs["input_values"].to(device), input_lengths=input_lengths.to(device))

    # Take the mean of hidden states to get a single embedding vector
    return embedding.last_hidden_state.mean(dim=1).cpu().numpy().squeeze()


In [None]:
# Step 1: Enroll known speakers
def enroll_speakers():
    speaker_db = {}
    for file in os.listdir(KNOWN_SPEAKERS_DIR):
        if file.endswith(".wav"):
            speaker_name = os.path.splitext(file)[0]  # Assuming filenames are speaker names
            audio_path = os.path.join(KNOWN_SPEAKERS_DIR, file)
            embedding = extract_embedding(audio_path)
            speaker_db[speaker_name] = embedding
    with open(DB_FILE, "wb") as f:
        pickle.dump(speaker_db, f)
    print(f"Enrolled {len(speaker_db)} speakers.")

In [None]:
# Step 2: Identify speaker from new recording
def identify_speaker(new_audio_path):
    if not os.path.exists(DB_FILE):
        print("No enrolled speakers found. Run enroll_speakers() first.")
        return
    with open(DB_FILE, "rb") as f:
        speaker_db = pickle.load(f)
    
    new_embedding = extract_embedding(new_audio_path)
    best_match = None
    best_score = float("inf")
    
    for speaker, embedding in speaker_db.items():
        score = cosine(new_embedding, embedding)
        if score < best_score:
            best_score = score
            best_match = speaker
    
    if best_score < THRESHOLD:
        print(f"Speaker identified: {best_match} (score: {best_score:.3f})")
        return best_match
    else:
        print("Unknown speaker detected.")
        return None

# Example usage:
# enroll_speakers()  # Run once to enroll known speakers
# identify_speaker("new_audio.wav")  # Identify a new speaker
