In [1]:
from dashscope.audio.asr import *
import pyaudiowpatch as pyaudio
import numpy as np


def getDefaultSpeakers(mic: pyaudio.PyAudio, info = True):
    """
    获取默认的系统音频输出的回环设备
    Args:
        mic (pyaudio.PyAudio): pyaudio对象
        info (bool, optional): 是否打印设备信息. Defaults to True.

    Returns:
        dict: 统音频输出的回环设备
    """
    try:
        WASAPI_info = mic.get_host_api_info_by_type(pyaudio.paWASAPI)
    except OSError:
        print("Looks like WASAPI is not available on the system. Exiting...")
        exit()

    default_speaker = mic.get_device_info_by_index(WASAPI_info["defaultOutputDevice"])
    if(info): print("wasapi_info:\n", WASAPI_info, "\n")
    if(info): print("default_speaker:\n", default_speaker, "\n")

    if not default_speaker["isLoopbackDevice"]:
        for loopback in mic.get_loopback_device_info_generator():
            if default_speaker["name"] in loopback["name"]:
                default_speaker = loopback
                if(info): print("Using loopback device:\n", default_speaker, "\n")
                break
        else:
            print("Default loopback output device not found.")
            print("Run `python -m pyaudiowpatch` to check available devices.")
            print("Exiting...")
            exit()
            
    if(info): print(f"Recording Device: #{default_speaker['index']} {default_speaker['name']}")
    return default_speaker


class Callback(TranslationRecognizerCallback):
    """
    语音大模型流式传输回调对象
    """
    def __init__(self):
        super().__init__()
        self.usage = 0
        self.sentences = []
        self.translations = []
    
    def on_open(self) -> None:
        print("\n流式翻译开始...\n")

    def on_close(self) -> None:
        print(f"\nTokens消耗：{self.usage}")
        print(f"流式翻译结束...\n")
        for i in range(len(self.sentences)):
            print(f"\n{self.sentences[i]}\n{self.translations[i]}\n")

    def on_event(
        self,
        request_id,
        transcription_result: TranscriptionResult,
        translation_result: TranslationResult,
        usage
    ) -> None:
        if transcription_result is not None:
            id = transcription_result.sentence_id
            text = transcription_result.text
            if transcription_result.stash is not None:
                stash = transcription_result.stash.text
            else:
                stash = ""
            print(f"#{id}: {text}{stash}")
            if usage: self.sentences.append(text)
        
        if translation_result is not None:
            lang = translation_result.get_language_list()[0]
            text = translation_result.get_translation(lang).text
            if translation_result.get_translation(lang).stash is not None:
                stash = translation_result.get_translation(lang).stash.text
            else:
                stash = ""
            print(f"#{lang}: {text}{stash}")
            if usage: self.translations.append(text)
        
        if usage: self.usage += usage['duration']

In [2]:
mic = pyaudio.PyAudio()
default_speaker = getDefaultSpeakers(mic, False)

SAMP_WIDTH = pyaudio.get_sample_size(pyaudio.paInt16)
FORMAT = pyaudio.paInt16
CHANNELS = default_speaker["maxInputChannels"]
RATE = int(default_speaker["defaultSampleRate"])
CHUNK = RATE // 10
INDEX = default_speaker["index"]

dev_info = f"""
采样输入设备：
    - 序号：{default_speaker['index']}
    - 名称：{default_speaker['name']}
    - 最大输入通道数：{default_speaker['maxInputChannels']}
    - 默认低输入延迟：{default_speaker['defaultLowInputLatency']}s
    - 默认高输入延迟：{default_speaker['defaultHighInputLatency']}s
    - 默认采样率：{default_speaker['defaultSampleRate']}Hz
    - 是否回环设备：{default_speaker['isLoopbackDevice']}

音频样本块大小：{CHUNK}
样本位宽：{SAMP_WIDTH}
音频数据格式：{FORMAT}
音频通道数：{CHANNELS}
音频采样率：{RATE}
"""
print(dev_info)


采样输入设备：
    - 序号：37
    - 名称：耳机 (HUAWEI FreeLace 活力版) [Loopback]
    - 最大输入通道数：2
    - 默认低输入延迟：0.003s
    - 默认高输入延迟：0.01s
    - 默认采样率：44100.0Hz
    - 是否回环设备：True

音频样本块大小：4410
样本位宽：2
音频数据格式：8
音频通道数：2
音频采样率：44100



In [5]:
RECORD_SECONDS = 20 # 监听时长(s)

stream = mic.open(
    format = FORMAT,
    channels = CHANNELS,
    rate = RATE,
    input = True,
    input_device_index = INDEX
)
translator = TranslationRecognizerRealtime(
    model = "gummy-realtime-v1",
    format = "pcm",
    sample_rate = RATE,
    transcription_enabled = True,
    translation_enabled = True,
    source_language = "ja",
    translation_target_languages = ["zh"],
    callback = Callback()
)
translator.start()

for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
    data = stream.read(CHUNK)
    data_np = np.frombuffer(data, dtype=np.int16)
    data_np_r = data_np.reshape(-1, CHANNELS)
    print(data_np_r.shape)
    mono_data = np.mean(data_np_r.astype(np.float32), axis=1)
    mono_data = mono_data.astype(np.int16)
    mono_data_bytes = mono_data.tobytes()
    translator.send_audio_frame(mono_data_bytes)

translator.stop()
stream.stop_stream()
stream.close()


流式翻译开始...

(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)
(4410, 2)


KeyboardInterrupt: 