In [1]:
import torch
from speechbrain.pretrained import EncoderClassifier
from scipy.spatial.distance import cosine

# 加载预训练的语音嵌入模型
speaker_model = EncoderClassifier.from_hparams(source="speechbrain/spkrec-ecapa-voxceleb")

# 存储注册用户的嵌入向量
registered_speakers = {}

def register_speaker(audio_path, speaker_name):
    """注册用户声音"""
    # 加载音频
    audio_signal = torch.tensor([load_audio(audio_path)])

    embeddings = speaker_model.encode_batch(audio_signal)
    embedding_vector = embeddings.squeeze().detach().numpy()
    registered_speakers[speaker_name] = embedding_vector
    print(f"Speaker '{speaker_name}' registered successfully!")


INFO:speechbrain.utils.quirks:Applied quirks (see `speechbrain.utils.quirks`): [allow_tf32, disable_jit_profiling]
INFO:speechbrain.utils.quirks:Excluded quirks specified by the `SB_DISABLE_QUIRKS` environment (comma-separated list): []
  from speechbrain.pretrained import EncoderClassifier
INFO:speechbrain.utils.fetching:Fetch hyperparams.yaml: Fetching from HuggingFace Hub 'speechbrain/spkrec-ecapa-voxceleb' if not cached
INFO:speechbrain.utils.fetching:Fetch custom.py: Fetching from HuggingFace Hub 'speechbrain/spkrec-ecapa-voxceleb' if not cached
  wrapped_fwd = torch.cuda.amp.custom_fwd(fwd, cast_inputs=cast_inputs)
INFO:speechbrain.utils.fetching:Fetch embedding_model.ckpt: Fetching from HuggingFace Hub 'speechbrain/spkrec-ecapa-voxceleb' if not cached
INFO:speechbrain.utils.fetching:Fetch mean_var_norm_emb.ckpt: Fetching from HuggingFace Hub 'speechbrain/spkrec-ecapa-voxceleb' if not cached
INFO:speechbrain.utils.fetching:Fetch classifier.ckpt: Fetching from HuggingFace Hub 'spe

In [2]:
def verify_speaker(audio_path, threshold=0.4):
    """验证用户声音是否匹配"""
    if not registered_speakers:
        print("No registered speakers found. Please register speakers first.")
        return None

    # 提取待验证音频的嵌入
    audio_signal = torch.tensor([load_audio(audio_path)])
    test_embeddings = speaker_model.encode_batch(audio_signal).squeeze().detach().numpy()

    # 逐个用户计算相似度
    best_match = None
    best_score = float("-inf")
    for speaker_name, embedding_vector in registered_speakers.items():
        similarity = 1 - cosine(test_embeddings, embedding_vector)
        print(f"Similarity score with '{speaker_name}': {similarity}")

        # 找到最相似的用户
        if similarity > best_score:
            best_score = similarity
            best_match = speaker_name

    # 判定结果
    if best_score >= threshold:
        print(f"Voice is from '{best_match}', welcome back!")
        return best_match
    else:
        print("Voice not recognized.")
        return None


In [3]:
import torchaudio

def load_audio(audio_path):
    """加载音频文件并转换为特定采样率"""
    waveform, sample_rate = torchaudio.load(audio_path)
    if sample_rate != 16000:
        resample_transform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)
        waveform = resample_transform(waveform)
    return waveform.squeeze().numpy()


In [4]:
import soundfile as sf
import tempfile
import numpy as np

def save_to_temp_wav(audio_data, sample_rate=16000):
    """
    将音频字节流保存为临时 WAV 文件
    Args:
        audio_data (bytes): 音频数据字节流
        sample_rate (int): 音频采样率，默认为 16kHz
    Returns:
        str: 保存的临时 WAV 文件路径
    """
    # 创建一个临时文件
    temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
    temp_file.close()

    # 将字节流写入 WAV 文件
    with sf.SoundFile(temp_file.name, mode="w", samplerate=sample_rate, channels=1, subtype="PCM_16") as f:
        # 将字节流转换为 NumPy 数组
        audio_array = np.frombuffer(audio_data, dtype=np.int16)
        f.write(audio_array)
    
    return temp_file.name


In [5]:
audio_cache = []  # 全局音频缓存

def verify_realtime_speaker(audio_chunk, speaker_name, threshold=0.1, min_duration=3.0, adaptive=True):
    """
    实时音频的说话人验证，支持累积音频和动态阈值调整
    """
    global audio_cache

    if speaker_name not in registered_speakers:
        print(f"Error: Speaker '{speaker_name}' not registered.")
        return False

    audio_cache.append(audio_chunk)
    total_audio = b"".join(audio_cache)
    duration = len(total_audio) / (16000 * 2)  # 累积音频时长

    if duration >= min_duration:
        print("Performing speaker verification with accumulated audio...")
        try:
            audio_array = np.frombuffer(total_audio, dtype=np.int16)
            embeddings = speaker_model.encode_batch(torch.tensor([audio_array], dtype=torch.float32))
            embeddings = embeddings.squeeze().detach().numpy()
        except Exception as e:
            print(f"Error during embedding extraction: {e}")
            return False

        similarity = 1 - cosine(embeddings, registered_speakers[speaker_name])
        print(f"Similarity score with '{speaker_name}': {similarity}")

        # 动态调整阈值
        adjusted_threshold = threshold
        if adaptive:
            adjusted_threshold = max(threshold, similarity - 0.1)

        audio_cache = []  # 清空缓存
        return similarity > adjusted_threshold

    return False  # 音频累积不足


In [6]:
import soundfile as sf
import numpy as np

def save_realtime_audio(audio_cache, sample_rate=16000, output_path="/Users/7one/Documents/Work/mangoesai/livekit_paddle/realtime_audio_debug.wav"):
    """
    将缓存的实时音频保存为 WAV 文件
    Args:
        audio_cache (list): 实时录音缓存，每个元素是 bytes 类型
        sample_rate (int): 采样率，默认为 16kHz
        output_path (str): 保存的音频文件路径
    """
    # 合并所有音频块
    full_audio_data = b"".join(audio_cache)

    # 将字节流转换为 NumPy 数组
    audio_array = np.frombuffer(full_audio_data, dtype=np.int16)

    # 保存为 WAV 文件
    sf.write(output_path, audio_array, samplerate=sample_rate, subtype="PCM_16")
    print(f"Realtime audio saved to: {output_path}")


In [8]:
from transformers.pipelines.audio_utils import ffmpeg_microphone_live
import soundfile as sf
import numpy as np
import os
import time
from datetime import datetime

# 保存实时音频的函数
def save_realtime_audio(audio_cache, sample_rate=16000, output_path="realtime_audio_debug.wav"):
    """
    将缓存的实时音频保存为 WAV 文件
    Args:
        audio_cache (list): 实时录音缓存，每个元素是 bytes 类型
        sample_rate (int): 采样率，默认为 16kHz
        output_path (str): 保存的音频文件路径
    """
    # 合并所有音频块
    full_audio_data = b"".join(audio_cache)

    # 将字节流转换为 NumPy 数组
    audio_array = np.frombuffer(full_audio_data, dtype=np.int16)

    # 保存为 WAV 文件
    sf.write(output_path, audio_array, samplerate=sample_rate, subtype="PCM_16")
    print(f"Realtime audio saved to: {output_path}")


# 实时录音、验证和转录的主函数
def transcribe_with_speaker_verification(chunk_length_s=10.0, stream_chunk_s=1.0, max_duration=30, threshold=0.4):
    mic = ffmpeg_microphone_live(
        sampling_rate=16000,
        chunk_length_s=chunk_length_s,
        stream_chunk_s=stream_chunk_s,
    )

    print(f"Listening for speaker ...")
    audio_cache = []  # 用于缓存音频块
    full_audio = b""  # 用于累积完整音频
    identified_user = None  # 最终识别的用户
    start_time = time.time()  # 记录开始时间

    try:
        for i, audio_chunk in enumerate(mic):
            elapsed_time = time.time() - start_time  # 计算已运行时间
            if elapsed_time >= max_duration:
                print("Reached max duration, stopping recording.")
                break

            print(f"Processing chunk {i + 1} (elapsed time: {elapsed_time:.2f}s)...")
            raw_audio = audio_chunk["raw"]  # 获取音频字节流

            # 检查并转换音频块为字节流
            if isinstance(raw_audio, np.ndarray):
                if raw_audio.dtype == np.float32:  # 如果是浮点数，转换为 int16
                    raw_audio = (raw_audio * 32768).astype(np.int16)
                raw_audio = raw_audio.tobytes()
            
            # 缓存音频块
            audio_cache.append(raw_audio)
            full_audio += raw_audio  # 累积完整音频

            # 验证用户声音
            audio_array = np.frombuffer(raw_audio, dtype=np.int16)
            similarity_scores = {}
            for speaker_name, embedding_vector in registered_speakers.items():
                test_embeddings = speaker_model.encode_batch(torch.tensor([audio_array]))
                test_embeddings = test_embeddings.squeeze().detach().numpy()
                similarity = 1 - cosine(test_embeddings, embedding_vector)
                similarity_scores[speaker_name] = similarity
            
            # 找出最相似的用户
            best_match = max(similarity_scores, key=similarity_scores.get)
            best_score = similarity_scores[best_match]
            print(f"Similarity score with '{best_match}': {best_score}")

            if best_score >= threshold:
                print(f"Voice is from '{best_match}', welcome back, Say 'Hi ZZX'!")
                identified_user = best_match
                break
            else:
                print("Voice not recognized. Continuing to listen...")

    except KeyboardInterrupt:
        print("Recording interrupted by user.")

    finally:
        # 保存缓存的音频（无论验证是否成功）
        if audio_cache:
            output_path = "/Users/7one/Documents/Work/mangoesai/livekit_paddle/realtime_audio_debug.wav"
            save_realtime_audio(audio_cache, sample_rate=16000, output_path=output_path)

        if identified_user:
            return identified_user
        else:
            print("No speaker identified.")
            return None


SyntaxError: invalid syntax. Perhaps you forgot a comma? (3529796771.py, line 77)

In [None]:
register_speaker("/Users/7one/Documents/Work/mangoesai/livekit_paddle/99.wav", "Dengfeng")
register_speaker("/Users/7one/Documents/Work/mangoesai/livekit_paddle/amit.wav", "Amit")


In [None]:
identified_user = transcribe_with_speaker_verification(
    chunk_length_s=10.0,
    stream_chunk_s=1.0,
    max_duration=30,
    threshold=0.4
)
if identified_user:
    print(f"Hello, {identified_user}!")
else:
    print("No speaker recognized.")

In [None]:
from transformers import pipeline
import torch

device = "cuda:0" if torch.cuda.is_available() else "cpu"

classifier = pipeline(
    "audio-classification", model="MIT/ast-finetuned-speech-commands-v2", device=device
)

In [None]:
classifier.model.config.id2label

In [12]:
from transformers.pipelines.audio_utils import ffmpeg_microphone_live


def launch_fn(
    wake_word="marvin",
    prob_threshold=0.5,
    chunk_length_s=2.0,
    stream_chunk_s=0.25,
    debug=False,
):
    if wake_word not in classifier.model.config.label2id.keys():
        raise ValueError(
            f"Wake word {wake_word} not in set of valid class labels, pick a wake word in the set {classifier.model.config.label2id.keys()}."
        )

    sampling_rate = classifier.feature_extractor.sampling_rate

    mic = ffmpeg_microphone_live(
        sampling_rate=sampling_rate,
        chunk_length_s=chunk_length_s,
        stream_chunk_s=stream_chunk_s,
    )

    print("Listening for wake word...")
    for prediction in classifier(mic):
        prediction = prediction[0]
        if debug:
            print(prediction)
        if prediction["label"] == wake_word:
            if prediction["score"] > prob_threshold:
                return True

In [None]:
launch_fn(debug=True)

In [None]:
transcriber = pipeline(
    "automatic-speech-recognition", model="openai/whisper-small.en", device=device
)

In [15]:
import sys


def transcribe(chunk_length_s=10.0, stream_chunk_s=2.0):
    sampling_rate = transcriber.feature_extractor.sampling_rate

    mic = ffmpeg_microphone_live(
        sampling_rate=sampling_rate,
        chunk_length_s=chunk_length_s,
        stream_chunk_s=stream_chunk_s,
    )

    print("Start speaking...")
    for item in transcriber(mic, generate_kwargs={"max_new_tokens": 256}):
        sys.stdout.write("\033[K")
        print(item["text"], end="\r")
        if not item["partial"][0]:
            break

    return item["text"]

In [None]:
transcribe()

In [17]:
from huggingface_hub import HfFolder
import requests


def query(text, model_id="tiiuae/falcon-7b-instruct"):
    api_url = f"https://api-inference.huggingface.co/models/{model_id}"
    headers = {"Authorization": f"Bearer {HfFolder().get_token()}"}
    payload = {"inputs": text}

    print(f"Querying...: {text}")
    response = requests.post(api_url, headers=headers, json=payload)
    return response.json()[0]["generated_text"][len(text) + 1 :]

In [None]:
query("What does Hugging Face do?")

In [None]:
from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan

processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts")

model = SpeechT5ForTextToSpeech.from_pretrained("microsoft/speecht5_tts").to(device)
vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan").to(device)

In [None]:
from datasets import load_dataset

embeddings_dataset = load_dataset("Matthijs/cmu-arctic-xvectors", split="validation")
speaker_embeddings = torch.tensor(embeddings_dataset[7306]["xvector"]).unsqueeze(0)

In [21]:
def synthesise(text):
    inputs = processor(text=text, return_tensors="pt")
    speech = model.generate_speech(
        inputs["input_ids"].to(device), speaker_embeddings.to(device), vocoder=vocoder
    )
    return speech.cpu()

In [None]:
from IPython.display import Audio

audio = synthesise(
    "Hugging Face is a company that provides natural language processing and machine learning tools for developers."
)

Audio(audio, rate=16000)

In [None]:
from IPython.display import Audio

# Step 1: 说话人验证
transcribe_with_speaker_verification(
    chunk_length_s=10.0,
    stream_chunk_s=1.0,
    max_duration=10,
    speaker_name="dengfeng"
)

if not transcription:
    print("Speaker verification failed or no audio recorded. Exiting...")
    exit()

# Step 2: 唤醒词检测
if launch_fn():
    print("Wake word detected. Proceeding to transcription...")
else:
    print("No wake word detected. Exiting...")
    exit()

# Step 3: 实时语音转录
transcription = transcribe()
if not transcription:
    print("No transcription available. Exiting...")
    exit()

# Step 4: 调用 LLM 生成回复
response = query(transcription)
if not response:
    print("No response from LLM. Exiting...")
    exit()

# Step 5: 将 LLM 回复转为语音
audio = synthesise(response)

# Step 6: 播放合成语音
Audio(audio, rate=16000, autoplay=True)
