In [11]:
import os
import sys
import time
import threading
import queue
import logging
import traceback
import tempfile
import subprocess
import shutil
from typing import Dict, List, Generator, Iterator, Optional
import numpy as np
import torch
import ffmpeg
import librosa
import soundfile as sf
from faster_whisper import WhisperModel

创建定义faster-whisper音转文类

In [12]:
# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("FasterWhisperTranscribe")

class FasterWhisperTranscribe:
    def __init__(self, 
                 whisper_model: str = "large-v3", 
                 device: str = "cuda",
                 compute_type: str = "int8",
                 chunk_size: int = 30,  # 单位：秒
                 max_file_size: int = 1 * 1024 * 1024,  # 1MB
                 vad_threshold: float = 0.5,
                 beam_size: int = 5,
                 best_of: int = 5,
                 temperature: float = 0.0,
                 compression_ratio_threshold: float = 2.4,
                 log_prob_threshold: float = -1.0,
                 no_speech_threshold: float = 0.6,
                 condition_on_previous_text: bool = True,
                 initial_prompt: str = None,
                 word_timestamps: bool = False,
                 debug: bool = False):
        # 设置调试模式
        self.debug = debug
        if debug:
            logger.setLevel(logging.DEBUG)
        else:
            logger.setLevel(logging.INFO)
            
        # 初始化Whisper模型
        compute_device = device if (device == "cuda" and torch.cuda.is_available()) else "cpu"
        logger.debug(f"使用设备: {compute_device}, 计算类型: {compute_type}")
        
        try:
            self.model = WhisperModel(whisper_model, device=compute_device, compute_type=compute_type)
            logger.debug(f"成功加载Whisper模型: {whisper_model}")
        except Exception as e:
            logger.error(f"加载Whisper模型失败: {str(e)}")
            raise RuntimeError(f"无法加载Whisper模型: {str(e)}")
        
        # 配置参数
        self.chunk_size = chunk_size
        self.max_file_size = max_file_size
        self.vad_threshold = vad_threshold
        
        # 转写参数
        self.beam_size = beam_size
        self.best_of = best_of
        self.temperature = temperature
        self.compression_ratio_threshold = compression_ratio_threshold
        self.log_prob_threshold = log_prob_threshold
        self.no_speech_threshold = no_speech_threshold
        self.condition_on_previous_text = condition_on_previous_text
        self.initial_prompt = initial_prompt
        self.word_timestamps = word_timestamps
        
        logger.debug(f"初始化参数 - 分块大小: {chunk_size}秒, 最大文件大小: {max_file_size/1024/1024}MB, VAD阈值: {vad_threshold}")
        logger.debug(f"转写参数 - 束搜索大小: {beam_size}, 采样数量: {best_of}, 温度: {temperature}")

    def transcribe_file(self, audio_path: str, language: str = "zh") -> Generator[Dict, None, None]:
        try:
            # 检查文件是否存在
            if not os.path.exists(audio_path):
                logger.error(f"文件不存在: {audio_path}")
                raise FileNotFoundError(f"文件不存在: {audio_path}")
            
            # 检查文件大小
            file_size = os.path.getsize(audio_path)
            logger.debug(f"文件大小: {file_size/1024/1024:.2f}MB")
            
            # 如果文件较小，直接处理
            if file_size <= self.max_file_size:
                logger.debug(f"文件大小小于阈值，直接处理")
                yield from self._transcribe_audio_direct(audio_path, language)
            else:
                # 对大文件进行分块处理
                logger.debug(f"文件大小超过阈值，进行分块处理")
                yield from self._transcribe_audio_chunked(audio_path, language)
            
        except Exception as e:
            logger.error(f"音频转写异常: {str(e)}")
            if self.debug:
                logger.error(traceback.format_exc())
            raise

    def transcribe_file_direct(self, audio_path: str, language: str = "zh") -> Generator[Dict, None, None]:
        try:
            # 检查文件是否存在
            if not os.path.exists(audio_path):
                logger.error(f"文件不存在: {audio_path}")
                raise FileNotFoundError(f"文件不存在: {audio_path}")
            
            yield from self._transcribe_audio_direct(audio_path, language)
            
        except Exception as e:
            logger.error(f"音频直接转写异常: {str(e)}")
            if self.debug:
                logger.error(traceback.format_exc())
            raise

    def transcribe_file_chunked(self, audio_path: str, language: str = "zh") -> Generator[Dict, None, None]:
        try:
            # 检查文件是否存在
            if not os.path.exists(audio_path):
                logger.error(f"文件不存在: {audio_path}")
                raise FileNotFoundError(f"文件不存在: {audio_path}")
            
            yield from self._transcribe_audio_chunked(audio_path, language)
            
        except Exception as e:
            logger.error(f"音频分块转写异常: {str(e)}")
            if self.debug:
                logger.error(traceback.format_exc())
            raise

    def _transcribe_audio_direct(self, audio_path: str, language: str) -> Generator[Dict, None, None]:
        logger.debug(f"开始直接转写: {audio_path}")
        
        try:
            segments, info = self.model.transcribe(
                audio_path, 
                language=language,
                beam_size=self.beam_size,
                best_of=self.best_of,
                temperature=self.temperature,
                compression_ratio_threshold=self.compression_ratio_threshold,
                log_prob_threshold=self.log_prob_threshold,
                no_speech_threshold=self.no_speech_threshold,
                condition_on_previous_text=self.condition_on_previous_text,
                initial_prompt=self.initial_prompt,
                word_timestamps=self.word_timestamps,
                without_timestamps=False
            )
            
            detected_language = info.language if hasattr(info, 'language') else language
            logger.debug(f"检测到语言: {detected_language}")
            
            # 处理结果
            for segment in segments:
                segment_dict = {
                    "text": segment.text,
                    "start": segment.start,
                    "end": segment.end,
                    "language": detected_language
                }
                
                # 如果启用了单词级时间戳，添加单词信息
                if self.word_timestamps and hasattr(segment, 'words') and segment.words:
                    segment_dict["words"] = [
                        {"word": word.word, "start": word.start, "end": word.end, "probability": word.probability}
                        for word in segment.words
                    ]
                
                logger.debug(f"转写片段: {segment.start:.2f}s - {segment.end:.2f}s")
                yield segment_dict
                
        except Exception as e:
            logger.error(f"直接转写处理异常: {str(e)}")
            if self.debug:
                logger.error(traceback.format_exc())
            raise

    def _transcribe_audio_chunked(self, audio_path: str, language: str) -> Generator[Dict, None, None]:
        """分块处理大型音频文件"""
        logger.debug(f"开始分块转写: {audio_path}")
        
        try:
            # 加载音频文件信息
            y, sr = librosa.load(audio_path, sr=None)
            duration = len(y) / sr
            
            logger.debug(f"音频时长: {duration:.2f}秒, 采样率: {sr}Hz")
            logger.debug(f"分块大小: {self.chunk_size}秒")
            
            detected_language = None
            
            # 分块处理
            for start_time in range(0, int(duration), self.chunk_size):
                end_time = min(start_time + self.chunk_size, duration)
                logger.debug(f"处理时间段: {start_time}s - {end_time}s")
                
                # 提取音频片段
                start_sample = int(start_time * sr)
                end_sample = int(end_time * sr)
                chunk = y[start_sample:end_sample]
                
                # 保存临时文件
                with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file:
                    temp_path = temp_file.name
                    sf.write(temp_path, chunk, sr)
                
                try:
                    # 处理该片段
                    segments, info = self.model.transcribe(
                        temp_path,
                        language=language,
                        beam_size=self.beam_size,
                        best_of=self.best_of,
                        temperature=self.temperature,
                        compression_ratio_threshold=self.compression_ratio_threshold,
                        log_prob_threshold=self.log_prob_threshold,
                        no_speech_threshold=self.no_speech_threshold,
                        condition_on_previous_text=self.condition_on_previous_text,
                        initial_prompt=self.initial_prompt,
                        word_timestamps=self.word_timestamps,
                        without_timestamps=False
                    )
                    
                    # 记录检测到的语言（使用第一个块的语言）
                    if detected_language is None and hasattr(info, 'language'):
                        detected_language = info.language
                        logger.debug(f"检测到语言: {detected_language}")
                    
                    # 处理结果
                    for segment in segments:
                        segment_dict = {
                            "text": segment.text,
                            "start": segment.start + start_time,  # 调整时间戳
                            "end": segment.end + start_time,      # 调整时间戳
                            "language": detected_language or language
                        }
                        
                        # 如果启用了单词级时间戳，添加单词信息并调整时间戳
                        if self.word_timestamps and hasattr(segment, 'words') and segment.words:
                            segment_dict["words"] = [
                                {
                                    "word": word.word, 
                                    "start": word.start + start_time, 
                                    "end": word.end + start_time, 
                                    "probability": word.probability
                                }
                                for word in segment.words
                            ]
                        
                        logger.debug(f"转写片段: {segment_dict['start']:.2f}s - {segment_dict['end']:.2f}s")
                        yield segment_dict
                
                finally:
                    # 删除临时文件
                    try:
                        os.remove(temp_path)
                    except Exception as e:
                        logger.warning(f"删除临时文件失败: {str(e)}")
            
        except Exception as e:
            logger.error(f"分块处理异常: {str(e)}")
            if self.debug:
                logger.error(traceback.format_exc())
            raise

创建对应的音频流的处理分析类

In [13]:
class AudioStreamProcessor:
    """音频流处理器"""
    
    def __init__(self, 
                 stream_url: str,
                 segment_duration: int = 10,
                 sample_rate: int = 16000,
                 channels: int = 1,
                 output_format: str = "wav",
                 debug: bool = False,
                 debug_dir: str = "./debug_audio",
                 log_level: int = logging.INFO,
                 max_retries: int = 3,
                 retry_delay: int = 5,
                 ffmpeg_loglevel: str = "warning"):
        # 基本配置
        self.stream_url = stream_url
        self.segment_duration = segment_duration
        self.sample_rate = sample_rate
        self.channels = channels
        self.output_format = output_format.lower()
        
        # 确保输出格式有效
        if self.output_format not in ["wav", "flac"]:
            raise ValueError("输出格式必须是 'wav' 或 'flac'")
        
        # 调试配置
        self.debug = debug
        self.debug_dir = debug_dir
        if self.debug and not os.path.exists(self.debug_dir):
            os.makedirs(self.debug_dir, exist_ok=True)
        
        # 临时目录配置
        self.temp_dir = tempfile.mkdtemp(prefix="audio_stream_")
        
        # 重试配置
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.ffmpeg_loglevel = ffmpeg_loglevel
        
        # 设置日志
        self.logger = self._setup_logger(log_level)
        
        # 运行状态
        self.is_running = False
        self.is_paused = False
        self.stop_event = threading.Event()
        self.pause_event = threading.Event()
        self.segment_queue = queue.Queue()
        self.segment_index = 0
        self.ffmpeg_process = None
        
        # 检查ffmpeg是否可用
        self._check_ffmpeg()
        
        self.logger.info(f"初始化音频流处理器: {stream_url}")
        self.logger.info(f"音频格式: {channels}通道, {sample_rate}Hz, 输出格式: {output_format}")

    def _setup_logger(self, log_level: int) -> logging.Logger:
        """设置日志记录器"""
        logger = logging.getLogger("AudioStreamProcessor")
        logger.setLevel(log_level)
        
        if not logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            handler.setFormatter(formatter)
            logger.addHandler(handler)
        
        return logger
    
    def _check_ffmpeg(self):
        """检查ffmpeg是否可用"""
        try:
            subprocess.run(["ffmpeg", "-version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            self.logger.debug("ffmpeg可用")
        except Exception as e:
            self.logger.error("ffmpeg不可用，请确保已安装ffmpeg并添加到PATH中")
            raise RuntimeError("ffmpeg不可用") from e

    def start(self) -> Iterator[str]:
        if self.is_running:
            self.logger.warning("处理器已在运行中")
            return
        
        self.is_running = True
        self.is_paused = False
        self.stop_event.clear()
        self.pause_event.clear()
        
        process_thread = threading.Thread(target=self._process_stream)
        process_thread.daemon = True
        process_thread.start()
        
        try:
            while not self.stop_event.is_set() or not self.segment_queue.empty():
                try:
                    segment_file = self.segment_queue.get(timeout=1)
                    if segment_file is None:
                        break
                    yield segment_file
                except queue.Empty:
                    if not process_thread.is_alive() and self.is_running:
                        self.logger.warning("处理线程已结束，但处理器仍在运行状态")
                        break
                    continue
                
        except KeyboardInterrupt:
            self.logger.info("用户中断，停止处理...")
            self.stop()
        
        finally:
            if process_thread.is_alive():
                process_thread.join(timeout=5)
            self.is_running = False
            self.logger.info("音频流处理已完成")

    def pause(self):
        """暂停处理音频流"""
        if not self.is_running:
            self.logger.warning("处理器未运行，无法暂停")
            return
        
        if self.is_paused:
            self.logger.warning("处理器已经处于暂停状态")
            return
        
        self.logger.info("暂停音频流处理")
        self.is_paused = True
        self.pause_event.set()
        
        # 如果ffmpeg进程正在运行，终止它
        if self.ffmpeg_process and self.ffmpeg_process.poll() is None:
            self.ffmpeg_process.terminate()
            self.ffmpeg_process.wait(timeout=5)
            self.ffmpeg_process = None

    def resume(self):
        """恢复处理音频流"""
        if not self.is_running:
            self.logger.warning("处理器未运行，无法恢复")
            return
        
        if not self.is_paused:
            self.logger.warning("处理器未处于暂停状态")
            return
        
        self.logger.info("恢复音频流处理")
        self.is_paused = False
        self.pause_event.clear()

    def stop(self):
        """停止处理音频流"""
        if not self.is_running:
            self.logger.warning("处理器未运行")
            return
        
        self.logger.info("停止音频流处理")
        self.stop_event.set()
        self.pause_event.clear()
        
        # 如果ffmpeg进程正在运行，终止它
        if self.ffmpeg_process and self.ffmpeg_process.poll() is None:
            self.ffmpeg_process.terminate()
            self.ffmpeg_process.wait(timeout=5)
            self.ffmpeg_process = None
        
        # 确保队列中有一个None，以便迭代器结束
        self.segment_queue.put(None)

    def _process_stream(self):
        """处理音频流的主线程"""
        self.logger.info(f"开始处理音频流: {self.stream_url}")
        
        retry_count = 0
        while not self.stop_event.is_set() and retry_count <= self.max_retries:
            try:
                if self.is_paused:
                    self.logger.info("处理器已暂停，等待恢复...")
                    while self.is_paused and not self.stop_event.is_set():
                        time.sleep(0.5)
                    if self.stop_event.is_set():
                        break
                    self.logger.info("处理器已恢复")
                
                # 创建分段目录
                segment_dir = os.path.join(self.temp_dir, f"segments_{int(time.time())}")
                os.makedirs(segment_dir, exist_ok=True)
                
                # 构建ffmpeg命令
                segment_pattern = os.path.join(segment_dir, f"segment_%06d.{self.output_format}")
                
                # 使用ffmpeg-python库构建命令
                stream = ffmpeg.input(self.stream_url, loglevel=self.ffmpeg_loglevel)
                stream = ffmpeg.output(
                    stream,
                    segment_pattern,
                    ac=self.channels,
                    ar=self.sample_rate,
                    segment_time=self.segment_duration,
                    f='segment'
                )
                
                # 启动ffmpeg进程
                self.logger.info(f"启动ffmpeg进程，分段模式，每段{self.segment_duration}秒")
                self.ffmpeg_process = ffmpeg.run_async(stream, pipe_stdout=True, pipe_stderr=True)
                
                # 启动文件监控线程
                monitor_thread = threading.Thread(
                    target=self._monitor_segments, 
                    args=(segment_dir,)
                )
                monitor_thread.daemon = True
                monitor_thread.start()
                
                # 等待ffmpeg进程结束
                returncode = self.ffmpeg_process.wait()
                
                if returncode != 0 and not self.stop_event.is_set() and not self.is_paused:
                    stderr = self.ffmpeg_process.stderr.read().decode('utf-8', errors='ignore')
                    self.logger.error(f"ffmpeg进程异常退出，返回码: {returncode}")
                    self.logger.error(f"ffmpeg错误输出: {stderr}")
                    raise Exception(f"ffmpeg进程异常退出，返回码: {returncode}")
                
                # 等待监控线程结束
                monitor_thread.join(timeout=5)
                
                # 如果是正常结束，不再重试
                if not self.stop_event.is_set() and not self.is_paused:
                    self.logger.info("音频流已结束")
                break
                
            except Exception as e:
                if self.stop_event.is_set() or self.is_paused:
                    break
                
                retry_count += 1
                self.logger.error(f"处理流失败 (尝试 {retry_count}/{self.max_retries}): {str(e)}")
                
                if retry_count <= self.max_retries:
                    self.logger.info(f"将在 {self.retry_delay} 秒后重试...")
                    time.sleep(self.retry_delay)
                else:
                    self.logger.error(f"达到最大重试次数 ({self.max_retries})，放弃处理")
        
        # 确保队列中有一个None，以便迭代器结束
        self.segment_queue.put(None)
        self.logger.info("处理线程已结束")

    def _monitor_segments(self, segment_dir: str):
        """监控分段目录，将新生成的分段文件添加到队列中"""
        self.logger.info(f"开始监控分段目录: {segment_dir}")
        
        processed_files = set()
        
        while not self.stop_event.is_set() and not self.is_paused:
            try:
                # 获取目录中的所有文件
                files = sorted([f for f in os.listdir(segment_dir) 
                               if f.endswith(f".{self.output_format}")])
                
                # 处理新文件
                for file in files:
                    file_path = os.path.join(segment_dir, file)
                    
                    # 如果文件已处理，跳过
                    if file_path in processed_files:
                        continue
                    
                    # 等待文件写入完成
                    if self._is_file_ready(file_path):
                        # 处理文件
                        output_path = self._process_segment(file_path)
                        if output_path:
                            self.segment_queue.put(output_path)
                            processed_files.add(file_path)
                
                # 检查ffmpeg进程是否仍在运行
                if self.ffmpeg_process and self.ffmpeg_process.poll() is not None:
                    # 处理最后可能的文件
                    for file in sorted([f for f in os.listdir(segment_dir) 
                                      if f.endswith(f".{self.output_format}")]):
                        file_path = os.path.join(segment_dir, file)
                        if file_path not in processed_files and self._is_file_ready(file_path):
                            output_path = self._process_segment(file_path)
                            if output_path:
                                self.segment_queue.put(output_path)
                                processed_files.add(file_path)
                    break
                
                time.sleep(0.1)  # 短暂休眠，避免CPU占用过高
                
            except Exception as e:
                self.logger.error(f"监控分段目录时出错: {str(e)}")
                self.logger.error(traceback.format_exc())
                time.sleep(1)  # 出错时稍长的休眠
        
        self.logger.info(f"停止监控分段目录: {segment_dir}")

    def _is_file_ready(self, file_path: str) -> bool:
        if not os.path.exists(file_path):
            return False
        
        try:
            # 尝试获取文件大小
            size1 = os.path.getsize(file_path)
            time.sleep(0.1)  # 等待一小段时间
            size2 = os.path.getsize(file_path)
            
            # 如果文件大小没有变化，认为文件已经写入完成
            return size1 == size2 and size1 > 0
        except Exception:
            return False

    def _process_segment(self, file_path: str) -> Optional[str]:
        try:
            # 生成输出文件路径
            output_path = os.path.join(
                self.temp_dir, 
                f"processed_segment_{self.segment_index:06d}.{self.output_format}"
            )
            
            # 使用ffmpeg确保音频格式正确
            stream = ffmpeg.input(file_path)
            stream = ffmpeg.output(
                stream,
                output_path,
                format=self.output_format,
                ac=self.channels,
                ar=self.sample_rate,
                loglevel=self.ffmpeg_loglevel
            )
            ffmpeg.run(stream, quiet=True, overwrite_output=True)
            
            # 如果启用调试模式，也保存一份到调试目录
            if self.debug:
                debug_path = os.path.join(
                    self.debug_dir, 
                    f"segment_{self.segment_index:06d}.{self.output_format}"
                )
                shutil.copy2(output_path, debug_path)
            
            self.logger.info(f"处理分段 {self.segment_index}: {output_path}")
            self.segment_index += 1
            
            return output_path
            
        except Exception as e:
            self.logger.error(f"处理分段失败: {str(e)}")
            self.logger.error(traceback.format_exc())
            return None

    def cleanup(self):
        """清理临时文件"""
        try:
            if os.path.exists(self.temp_dir):
                shutil.rmtree(self.temp_dir)
                self.logger.info(f"已清理临时目录: {self.temp_dir}")
        except Exception as e:
            self.logger.error(f"清理临时文件失败: {str(e)}")

    def __del__(self):
        """析构函数，确保资源被释放"""
        self.stop()
        self.cleanup()

In [17]:
def main():
    # 直接定义参数变量
    # 音频流参数
    stream_url = "http://localhost:6006/stream"  # 音频流URL (支持http/https/rtsp/rtmp)
    segment_duration = 10  # 每个音频段的持续时间（秒）
    sample_rate = 16000  # 采样率（Hz）
    channels = 1  # 音频通道数
    output_format = "wav"  # 输出格式 (wav/flac)
    
    # Whisper模型参数
    whisper_model = "/root/autodl-tmp/faster-whisper-large-v3-zh"  # whisper模型路径或预训练模型名称
    language = "zh"  # 转写语言代码
    device = "cuda" if torch.cuda.is_available() else "cpu"  # 计算设备
    compute_type = "int8"  # 计算类型
    
    # 其他参数
    debug = False  # 启用调试模式
    max_queue_size = 10  # 最大队列大小
    
    # 设置日志级别
    if debug:
        logger.setLevel(logging.DEBUG)
    else:
        logger.setLevel(logging.INFO)
    
    try:
        # 初始化音频流处理器
        logger.info(f"初始化音频流处理器: {stream_url}")
        audio_processor = AudioStreamProcessor(
            stream_url=stream_url,
            segment_duration=segment_duration,
            sample_rate=sample_rate,
            channels=channels,
            output_format=output_format,
            debug=debug
        )
        
        # 初始化Whisper转写器
        logger.info(f"初始化Whisper转写器: {whisper_model}")
        transcriber = FasterWhisperTranscribe(
            whisper_model=whisper_model,
            device=device,
            compute_type=compute_type,
            debug=debug
        )
        
        # 初始化队列和线程控制
        segment_queue = queue.Queue(maxsize=max_queue_size)
        stop_event = threading.Event()
        
        # 转写结果
        segments = []
        current_segment_index = 0
        
        # 定义音频流处理线程
        def stream_worker():
            try:
                logger.info("启动音频流处理线程")
                
                # 获取音频段迭代器
                segment_generator = audio_processor.start()
                
                # 处理音频段
                for segment_file in segment_generator:
                    if stop_event.is_set():
                        break
                    
                    try:
                        # 将音频段放入队列
                        logger.debug(f"将音频段放入队列: {segment_file}")
                        segment_queue.put(segment_file, timeout=1)
                    except queue.Full:
                        logger.warning("音频段队列已满，丢弃当前段")
                
                logger.info("音频流处理线程已结束")
                
            except Exception as e:
                logger.error(f"音频流处理线程异常: {str(e)}")
                if debug:
                    logger.error(traceback.format_exc())
                stop_event.set()
        
        # 定义转写线程
        def transcribe_worker():
            nonlocal current_segment_index
            
            try:
                logger.info("启动转写线程")
                
                while not stop_event.is_set():
                    try:
                        # 从队列获取音频段
                        segment_file = segment_queue.get(timeout=1)
                        
                        if segment_file is None:
                            continue
                        
                        # 转写音频段
                        logger.debug(f"转写音频段: {segment_file}")
                        
                        # 记录开始时间
                        start_time = time.time()
                        
                        # 转写音频段
                        segment_results = list(transcriber.transcribe_file_direct(segment_file, language))
                        
                        # 记录结束时间
                        end_time = time.time()
                        processing_time = end_time - start_time
                        
                        # 如果没有结果，跳过
                        if not segment_results:
                            logger.debug(f"音频段无转写结果: {segment_file}")
                            segment_queue.task_done()
                            continue
                        
                        # 处理转写结果
                        for result in segment_results:
                            # 添加段索引
                            result["segment_index"] = current_segment_index
                            result["segment_file"] = segment_file
                            result["processing_time"] = processing_time
                            
                            # 添加到结果列表
                            segments.append(result)
                            
                            # 输出转写结果
                            print(f"[{result['start']:.2f}s - {result['end']:.2f}s]: {result['text']}")
                        
                        # 更新段索引
                        current_segment_index += 1
                        
                        # 标记任务完成
                        segment_queue.task_done()
                        
                    except queue.Empty:
                        continue
                    except Exception as e:
                        logger.error(f"转写音频段异常: {str(e)}")
                        if debug:
                            logger.error(traceback.format_exc())
                
                logger.info("转写线程已结束")
                
            except Exception as e:
                logger.error(f"转写线程异常: {str(e)}")
                if debug:
                    logger.error(traceback.format_exc())
                stop_event.set()
        
        # 启动线程
        stream_thread = threading.Thread(target=stream_worker)
        stream_thread.daemon = True
        stream_thread.start()
        
        transcribe_thread = threading.Thread(target=transcribe_worker)
        transcribe_thread.daemon = True
        transcribe_thread.start()
        
        print(f"开始转写音频流: {stream_url}")
        print("按Ctrl+C停止转写")
        
        # 主循环
        try:
            while True:
                time.sleep(0.1)  # 短暂休眠，避免CPU占用过高
                
        except KeyboardInterrupt:
            print("\n用户中断，停止转写...")
        
        finally:
            # 停止处理
            stop_event.set()
            
            # 停止音频处理器
            audio_processor.stop()
            
            # 等待线程结束
            if stream_thread.is_alive():
                stream_thread.join(timeout=5)
            
            if transcribe_thread.is_alive():
                transcribe_thread.join(timeout=5)
            
            # 输出完整转写结果
            print("\n完整转写结果:")
            full_text = " ".join([segment["text"] for segment in segments])
            print(full_text)
            
            # 清理资源
            audio_processor.cleanup()
            
            print("转写已完成，资源已清理")
        
    except Exception as e:
        print(f"处理异常: {str(e)}")
        if debug:
            traceback.print_exc()


if __name__ == "__main__":
    main()


2025-03-02 13:23:21,354 - FasterWhisperTranscribe - INFO - 初始化音频流处理器: http://localhost:6006/stream
2025-03-02 13:23:21,411 - AudioStreamProcessor - INFO - 初始化音频流处理器: http://localhost:6006/stream
2025-03-02 13:23:21,411 - AudioStreamProcessor - INFO - 初始化音频流处理器: http://localhost:6006/stream
2025-03-02 13:23:21,412 - AudioStreamProcessor - INFO - 音频格式: 1通道, 16000Hz, 输出格式: wav
2025-03-02 13:23:21,412 - AudioStreamProcessor - INFO - 音频格式: 1通道, 16000Hz, 输出格式: wav
2025-03-02 13:23:21,413 - FasterWhisperTranscribe - INFO - 初始化Whisper转写器: /root/autodl-tmp/faster-whisper-large-v3-zh
2025-03-02 13:23:26,560 - FasterWhisperTranscribe - INFO - 启动音频流处理线程
2025-03-02 13:23:26,561 - FasterWhisperTranscribe - INFO - 启动转写线程
2025-03-02 13:23:26,563 - AudioStreamProcessor - INFO - 开始处理音频流: http://localhost:6006/stream
2025-03-02 13:23:26,563 - AudioStreamProcessor - INFO - 开始处理音频流: http://localhost:6006/stream
2025-03-02 13:23:26,565 - AudioStreamProcessor - INFO - 启动ffmpeg进程，分段模式，每段10秒
2025-03-02 13:23:2

开始转写音频流: http://localhost:6006/stream
按Ctrl+C停止转写


2025-03-02 13:23:31,469 - AudioStreamProcessor - INFO - 处理分段 0: /tmp/audio_stream_n80oceu0/processed_segment_000000.wav
2025-03-02 13:23:31,469 - AudioStreamProcessor - INFO - 处理分段 0: /tmp/audio_stream_n80oceu0/processed_segment_000000.wav
2025-03-02 13:23:31,653 - faster_whisper - INFO - Processing audio with duration 00:08.190


[0.00s - 7.56s]: 放暂停继续开始放暂停


2025-03-02 13:23:41,458 - AudioStreamProcessor - INFO - 处理分段 1: /tmp/audio_stream_n80oceu0/processed_segment_000001.wav
2025-03-02 13:23:41,458 - AudioStreamProcessor - INFO - 处理分段 1: /tmp/audio_stream_n80oceu0/processed_segment_000001.wav
2025-03-02 13:23:41,625 - faster_whisper - INFO - Processing audio with duration 00:08.190


[0.00s - 8.24s]: 暂停继续开始暂停继续


2025-03-02 13:23:51,343 - AudioStreamProcessor - INFO - 处理分段 2: /tmp/audio_stream_n80oceu0/processed_segment_000002.wav
2025-03-02 13:23:51,343 - AudioStreamProcessor - INFO - 处理分段 2: /tmp/audio_stream_n80oceu0/processed_segment_000002.wav
2025-03-02 13:23:51,513 - faster_whisper - INFO - Processing audio with duration 00:08.190


[0.00s - 8.18s]: 放暂停继续开始放暂停继续


2025-03-02 13:24:01,324 - AudioStreamProcessor - INFO - 处理分段 3: /tmp/audio_stream_n80oceu0/processed_segment_000003.wav
2025-03-02 13:24:01,324 - AudioStreamProcessor - INFO - 处理分段 3: /tmp/audio_stream_n80oceu0/processed_segment_000003.wav
2025-03-02 13:24:01,473 - faster_whisper - INFO - Processing audio with duration 00:08.190


[0.00s - 8.12s]: 暂停继续开始放暂停继续


2025-03-02 13:24:04,027 - AudioStreamProcessor - INFO - 停止音频流处理
2025-03-02 13:24:04,027 - AudioStreamProcessor - INFO - 停止音频流处理
2025-03-02 13:24:04,085 - FasterWhisperTranscribe - INFO - 转写线程已结束
2025-03-02 13:24:04,090 - AudioStreamProcessor - INFO - 停止监控分段目录: /tmp/audio_stream_n80oceu0/segments_1740893006
2025-03-02 13:24:04,090 - AudioStreamProcessor - INFO - 停止监控分段目录: /tmp/audio_stream_n80oceu0/segments_1740893006
2025-03-02 13:24:04,093 - AudioStreamProcessor - INFO - 处理线程已结束
2025-03-02 13:24:04,093 - AudioStreamProcessor - INFO - 处理线程已结束
2025-03-02 13:24:04,095 - AudioStreamProcessor - INFO - 音频流处理已完成
2025-03-02 13:24:04,095 - AudioStreamProcessor - INFO - 音频流处理已完成
2025-03-02 13:24:04,097 - FasterWhisperTranscribe - INFO - 音频流处理线程已结束
2025-03-02 13:24:04,100 - AudioStreamProcessor - INFO - 已清理临时目录: /tmp/audio_stream_n80oceu0
2025-03-02 13:24:04,100 - AudioStreamProcessor - INFO - 已清理临时目录: /tmp/audio_stream_n80oceu0



用户中断，停止转写...

完整转写结果:
放暂停继续开始放暂停 暂停继续开始暂停继续 放暂停继续开始放暂停继续 暂停继续开始放暂停继续
转写已完成，资源已清理
