## 一 配置

In [None]:
# ===================================================================
# 1. 环境检查与库安装 (在终端已完成，此处仅作记录)
# ===================================================================
# 以下命令已在 Conda 虚拟环境 anisub 中执行完毕，无需在此重新运行。
# !conda install python=3.10 ffmpeg pytorch torchvision torchaudio pytorch-cuda=12.1 -c pytorch -c nvidia -c conda-forge
# !conda install -c conda-forge demucs requests pydub ipywidgets jupyter ipykernel
# !pip install openai-whisper
# !pip install git+https://github.com/snakers4/silero-vad.git

# ===================================================================
# 2. 导入所有必需的库
# ===================================================================
import os
import torch
import whisper
import requests
import json
import subprocess
import demucs
import logging
from pathlib import Path
from pydub import AudioSegment
from IPython.display import Audio, display
import sys 

# ===================================================================
# 3. 日志配置
# ===================================================================
# 配置日志记录器，方便在长时任务中追踪进度和排查问题
logging.basicConfig(
    level=logging.INFO, 
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# ===================================================================
# 4. 全局变量与路径配置
# ===================================================================
logging.info("--- 初始化配置 ---")

# --- 核心目录配置 ---
# 输入目录：存放所有待处理的视频文件
INPUT_DIR = Path("GrandBlue")
# 工作目录：所有中间文件和最终产物都将保存在这里
WORKSPACE_DIR = Path("workspace")
TEMP_DIR = WORKSPACE_DIR / "temp"
OUTPUT_DIR = WORKSPACE_DIR / "output"

# 注意：由于现在是批量处理，具体到每个视频的文件路径（如 full_audio.wav, vocals.wav 等）
# 将在后续的处理循环中根据当前视频动态生成，因此不在全局预先定义。

# --- API 密钥 ---
# !!! 重要: 请在此处填入您的 DeepSeek API 密钥 !!!
DEEPSEEK_API_KEY = "sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
os.environ["HUGGING_FACE_HUB_TOKEN"] = "hf-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
# ===================================================================
# 5. 初始化环境与发现输入文件
# ===================================================================
# --- 创建工作目录 ---
logging.info("正在创建所需的工作目录...")
INPUT_DIR.mkdir(exist_ok=True) # 确保输入目录也存在
WORKSPACE_DIR.mkdir(exist_ok=True)
TEMP_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)
logging.info(f" > 工作目录已确认: {WORKSPACE_DIR.resolve()}")

# --- 发现所有视频文件 ---
logging.info(f"正在从 '{INPUT_DIR}' 目录中搜索视频文件...")
supported_extensions = ['.mp4', '.mkv', '.avi', '.mov', '.webm', '.flv']
video_files = []
for ext in supported_extensions:
    video_files.extend(INPUT_DIR.glob(f"*{ext}"))

if not video_files:
    logging.warning(f"在 '{INPUT_DIR}' 目录中未找到任何支持的视频文件。")
    logging.warning("请将您的 .mp4, .mkv 等文件放入该目录。")
else:
    logging.info(f"成功发现 {len(video_files)} 个视频文件待处理:")
    for video_path in video_files:
        logging.info(f"  - {video_path.name}")

# --- 检查并设置计算设备 (CPU/GPU) ---
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
logging.info(f" > PyTorch 将使用设备: {DEVICE.upper()}")
if DEVICE == "cuda":
    logging.info(f" > GPU 名称: {torch.cuda.get_device_name(0)}")

# --- 检查API密钥 ---
if "sk-xxxxxxxx" in DEEPSEEK_API_KEY:
    logging.warning(" > 注意: DeepSeek API 密钥似乎是占位符，请记得在后续步骤前填充。")
else:
    logging.info(" > DeepSeek API 密钥已配置。")
    
logging.info("--- 配置完成，环境已就绪 ---")

# ===================================================================
# 流程变更重要提示
# ===================================================================
# 从下一个单元格开始，您的所有代码（步骤1到步骤9）
# 都应该被包含在一个 for 循环中，像这样：
#
# for video_path in video_files:
#     logging.info(f"===== 开始处理视频: {video_path.name} =====")
#     
#     # --- 在这里根据 video_path 动态定义该视频的文件路径 ---
#     video_stem = video_path.stem
#     full_audio_path = TEMP_DIR / f"{video_stem}_full_audio.wav"
#     vocals_path = TEMP_DIR / "htdemucs" / video_stem / "vocals.wav"
#     # ... 其他路径 ...
#
#     # --- 然后执行您的步骤 1: 提取音频 ---
#     # extract_audio(video_path, full_audio_path)
#     
#     # --- 接着执行步骤 2, 3, 4... ---
#     # ...
#
#     logging.info(f"===== 视频 {video_path.name} 处理完成 =====")
#
# ===================================================================

2025-07-15 01:24:41 - INFO - --- 初始化配置 ---
2025-07-15 01:24:41 - INFO - 正在创建所需的工作目录...
2025-07-15 01:24:41 - INFO -  > 工作目录已确认: /root/autodl-tmp/Sub_Gen/workspace
2025-07-15 01:24:41 - INFO - 正在从 'GrandBlue' 目录中搜索视频文件...
2025-07-15 01:24:41 - INFO - 成功发现 12 个视频文件待处理:
2025-07-15 01:24:41 - INFO -   - S02E01.mp4
2025-07-15 01:24:41 - INFO -   - S02E02.mp4
2025-07-15 01:24:41 - INFO -   - S02E03.mp4
2025-07-15 01:24:41 - INFO -   - S02E04.mp4
2025-07-15 01:24:41 - INFO -   - S02E05.mp4
2025-07-15 01:24:41 - INFO -   - S02E06.mp4
2025-07-15 01:24:41 - INFO -   - S02E07.mp4
2025-07-15 01:24:41 - INFO -   - S02E08.mp4
2025-07-15 01:24:41 - INFO -   - S02E09.mp4
2025-07-15 01:24:41 - INFO -   - S02E10.mp4
2025-07-15 01:24:41 - INFO -   - S02E11.mp4
2025-07-15 01:24:41 - INFO -   - S02E12.mp4
2025-07-15 01:24:41 - INFO -  > PyTorch 将使用设备: CUDA
2025-07-15 01:24:41 - INFO -  > GPU 名称: NVIDIA GeForce RTX 3090
2025-07-15 01:24:41 - INFO -  > DeepSeek API 密钥已配置。
2025-07-15 01:24:41 - INFO - --- 配置完

## 二 音频预处理

In [2]:
# ===================================================================
# 步骤 1, 2, 和 2.5 的功能函数 (已优化)
# ===================================================================

def extract_audio(video_path: Path, audio_path: Path):
    """步骤1：从视频提取高质量音轨 (44.1kHz, 立体声) 以便Demucs处理。"""
    if audio_path.exists():
        logging.info(f"高质量音频文件已存在，跳过提取: {audio_path.name}")
        return True
    logging.info(f"为Demucs提取高质量音频: '{video_path.name}' -> '{audio_path.name}'")
    command = [
        "ffmpeg", "-i", str(video_path), 
        "-vn", "-acodec", "pcm_s16le", "-ar", "44100", "-ac", "2", 
        str(audio_path), "-y"
    ]
    try:
        subprocess.run(command, check=True, capture_output=True, text=True)
        logging.info(f"高质量音轨提取成功。大小: {audio_path.stat().st_size / 1e6:.2f} MB")
        return True
    except subprocess.CalledProcessError as e:
        logging.error(f"提取 '{video_path.name}' 的音频时出错:\n{e.stderr}")
        return False

def separate_vocals(full_audio_path: Path, output_dir: Path, device: str):
    """步骤2：使用Demucs分离人声，输入为高质量音频。"""
    audio_stem = full_audio_path.stem.replace('.wav', '') # 确保stem干净
    vocals_path = output_dir / "htdemucs" / audio_stem / "vocals.wav"
    if vocals_path.exists():
        logging.info(f"高质量人声文件已存在，跳过分离: {vocals_path.name}")
        return vocals_path
    logging.info(f"开始使用 Demucs 分离人声: {full_audio_path.name}")
    demucs_command = [
        sys.executable, "-m", "demucs.separate",
        "-n", "htdemucs", "--two-stems", "vocals",
        "-d", device, "--out", str(output_dir), str(full_audio_path)
    ]
    try:
        subprocess.run(demucs_command, check=True)
        logging.info(f"人声分离完成。高质量人声文件: {vocals_path}")
        return vocals_path
    except subprocess.CalledProcessError as e:
        logging.error(f"Demucs 处理 '{full_audio_path.name}' 时失败。")
        return None

def optimize_audio_for_transcription(raw_vocals_path: Path, temp_dir: Path):
    """
    步骤2.5 (新增): 将高质量人声WAV转换为轻量级版本 (16kHz, 单声道)。
    这会极大减小文件大小，且是VAD和Whisper的最佳格式。
    """
    # e.g., "S02E01.wav" -> "S02E01_16k_mono.wav"
    optimized_path = temp_dir / f"{raw_vocals_path.stem}_16k_mono.wav"
    
    if optimized_path.exists():
        logging.info(f"优化后的人声文件已存在，跳过转换: {optimized_path.name}")
        return optimized_path

    logging.info(f"开始优化人声文件用于转录: {raw_vocals_path.name}")
    logging.info(f"原始大小: {raw_vocals_path.stat().st_size / 1e6:.2f} MB")
    
    command = [
        "ffmpeg", "-i", str(raw_vocals_path),
        "-ar", "16000",          # 采样率降至 16kHz
        "-ac", "1",              # 通道转为单声道
        "-c:a", "pcm_s16le",     # 保持为WAV编码
        str(optimized_path),
        "-y"
    ]
    try:
        subprocess.run(command, check=True, capture_output=True, text=True)
        logging.info(f"优化完成！新文件: {optimized_path.name}")
        logging.info(f"优化后大小: {optimized_path.stat().st_size / 1e6:.2f} MB")
        return optimized_path
    except subprocess.CalledProcessError as e:
        logging.error(f"优化人声文件时出错:\n{e.stderr}")
        return None

# ===================================================================
# 测试区 (已更新流程)
# ===================================================================
logging.info("--- [测试开始] 运行优化后的步骤1, 2, 和 2.5 ---")

if not video_files:
    logging.warning("未找到任何视频文件，无法执行测试。")
else:
    test_video_path = video_files[0]
    # 从 "S02E01.mp4" 得到 "S02E01"
    test_video_stem = test_video_path.stem
    logging.info(f"将使用第一个视频进行测试: '{test_video_path.name}'")
    
    # 定义测试文件路径
    # 完整的高质量音频，用于Demucs输入
    test_full_audio_path = TEMP_DIR / f"{test_video_stem}.wav"

    # --- 步骤 1 ---
    if extract_audio(test_video_path, test_full_audio_path):
        # --- 步骤 2 ---
        raw_vocals_path = separate_vocals(test_full_audio_path, TEMP_DIR, DEVICE)
        
        if raw_vocals_path:
            # --- 新增步骤 2.5: 优化音频 ---
            optimized_vocals_path = optimize_audio_for_transcription(raw_vocals_path, TEMP_DIR)

            if optimized_vocals_path:
                logging.info("--- [测试成功] ---")
                logging.info("所有阶段（提取、分离、优化）均按预期工作。")
                logging.info(f"后续的 VAD 和 Whisper 步骤将使用这个优化文件: {optimized_vocals_path.name}")
                # logging.info("下方是优化后的人声预览:")
                # display(Audio(optimized_vocals_path, autoplay=False))
            else:
                 logging.error("--- [测试失败] 在音频优化步骤 ---")
        else:
            logging.error("--- [测试失败] 在人声分离步骤 ---")
    else:
        logging.error("--- [测试失败] 在音频提取步骤 ---")

2025-07-15 01:24:41 - INFO - --- [测试开始] 运行优化后的步骤1, 2, 和 2.5 ---
2025-07-15 01:24:41 - INFO - 将使用第一个视频进行测试: 'S02E01.mp4'
2025-07-15 01:24:41 - INFO - 高质量音频文件已存在，跳过提取: S02E01.wav
2025-07-15 01:24:41 - INFO - 高质量人声文件已存在，跳过分离: vocals.wav
2025-07-15 01:24:41 - INFO - 优化后的人声文件已存在，跳过转换: vocals_16k_mono.wav
2025-07-15 01:24:41 - INFO - --- [测试成功] ---
2025-07-15 01:24:41 - INFO - 所有阶段（提取、分离、优化）均按预期工作。
2025-07-15 01:24:41 - INFO - 后续的 VAD 和 Whisper 步骤将使用这个优化文件: vocals_16k_mono.wav


## 三 VAD 静音检测

In [3]:
# ===================================================================
# 3. 导入 VAD、音频处理和绘图所需的库
# ===================================================================
import torch
import librosa
import numpy as np
import matplotlib.pyplot as plt
import logging
import json
import traceback
from pathlib import Path
from pydub import AudioSegment # 新增 pydub 导入
from IPython.display import display


import subprocess
import os

result = subprocess.run('bash -c "source /etc/network_turbo && env | grep proxy"', shell=True, capture_output=True, text=True)
output = result.stdout
for line in output.splitlines():
    if '=' in line:
        var, value = line.split('=', 1)
        os.environ[var] = value
# ===================================================================
# 步骤 3 的功能函数 (已更新)
# ===================================================================

def detect_speech_segments(audio_path: Path):
    """
    【修正版】在音频上运行 Silero VAD 以检测语音片段。
    使用经过参数搜索优化的最终参数。
    """
    SAMPLING_RATE = 16000 # VAD 固定使用 16kHz
    logging.info("正在加载 Silero VAD 模型...")
    try:
        model, utils = torch.hub.load(
            repo_or_dir='snakers4/silero-vad', model='silero_vad', force_reload=False
        )
    except Exception as e:
        logging.error(f"加载 Silero VAD 模型失败: {e}")
        return None, None, None, None

    (get_speech_timestamps, _, read_audio, _, _) = utils
    
    logging.info(f"正在读取音频文件用于 VAD: {audio_path.name}")
    wav_tensor = read_audio(str(audio_path), sampling_rate=SAMPLING_RATE)
    
    logging.info("开始使用 Silero VAD (优化参数) 检测语音时间戳...")
    # --- 使用最终确定的优化参数 ---
    speech_timestamps_samples = get_speech_timestamps(
        wav_tensor, model, sampling_rate=SAMPLING_RATE,
        threshold=0.125, 
        min_silence_duration_ms=120,
        min_speech_duration_ms=80, 
        speech_pad_ms=220
    )
    
    # --- 将采样点转换为毫秒 ---
    speech_timestamps_ms = []
    for segment in speech_timestamps_samples:
        start_ms = round(segment['start'] / (SAMPLING_RATE / 1000))
        end_ms = round(segment['end'] / (SAMPLING_RATE / 1000))
        speech_timestamps_ms.append({'start': start_ms, 'end': end_ms})
        
    logging.info(f"VAD 检测到 {len(speech_timestamps_samples)} 个语音片段。")
    return speech_timestamps_samples, speech_timestamps_ms, model, utils

def cut_audio_by_timestamps(original_audio_path: Path, timestamps_ms: list, output_chunk_dir: Path):
    """
    【新增】根据VAD时间戳将音频切割成小块，并使用描述性名称保存。

    参数:
        original_audio_path (Path): 待切割的源音频文件路径 (16kHz, 单声道 .wav)。
        timestamps_ms (list): 包含 {'start': ms, 'end': ms} 的列表。
        output_chunk_dir (Path): 保存切割后音频块的目标目录。

    返回:
        Path: 创建的音频块目录路径。
    """
    logging.info(f"开始根据VAD时间戳切割音频: {original_audio_path.name}")
    
    # 确保输出目录存在
    output_chunk_dir.mkdir(parents=True, exist_ok=True)
    
    # 使用 pydub 加载源音频
    try:
        source_audio = AudioSegment.from_wav(original_audio_path)
    except Exception as e:
        logging.error(f"使用pydub加载音频文件失败: {e}")
        return None

    # 获取视频的基本名称，用于文件命名
    video_stem = original_audio_path.stem.replace('_16k_mono', '')

    for i, ts in enumerate(timestamps_ms):
        start_ms = ts['start']
        end_ms = ts['end']
        
        # 切割音频
        audio_chunk = source_audio[start_ms:end_ms]
        
        # 构造具有高区分度的文件名
        # 格式: S02E01_chunk_0001_2530ms_4710ms.wav
        chunk_filename = f"{video_stem}_chunk_{i+1:04d}_{start_ms}ms_{end_ms}ms.wav"
        chunk_path = output_chunk_dir / chunk_filename
        
        # 导出音频块
        audio_chunk.export(chunk_path, format="wav")
        
    logging.info(f"成功切割并保存了 {len(timestamps_ms)} 个音频片段到目录: {output_chunk_dir}")
    return output_chunk_dir

def plot_vad_results(audio_path: Path, speech_timestamps_samples: list):
    """
    【无变动】使用采样点时间戳进行绘图，绘图文本为英文。
    """
    # ... 此函数代码保持不变 ...
    logging.info("正在生成 VAD 结果的可视化图表...")
    SAMPLING_RATE = 16000
    try:
        waveform, sr = librosa.load(str(audio_path), sr=SAMPLING_RATE)
        time_axis = np.linspace(0, len(waveform) / sr, num=len(waveform))
        vad_mask = np.zeros_like(waveform, dtype=np.int8)
        for segment in speech_timestamps_samples:
            vad_mask[segment['start']:segment['end']] = 1
        plt.style.use('seaborn-v0_8-whitegrid')
        fig, ax = plt.subplots(figsize=(200, 6), dpi=110)
        ax.plot(time_axis, waveform, label='Audio Waveform', color='royalblue', linewidth=0.7)
        y_min, y_max = ax.get_ylim()
        ax.fill_between(time_axis, y_min, y_max, where=vad_mask==1, color='crimson', alpha=0.4, label='Voice Activity (VAD)')
        ax.set_title(f"VAD Speech Activity Result: {audio_path.name}", fontsize=16)
        ax.set_xlabel("Time (seconds)"); ax.set_ylabel("Amplitude")
        ax.set_xlim(0, len(waveform) / sr); ax.legend(loc='upper right')
        plt.tight_layout(); plt.show()
    except Exception as e:
        logging.error(f"生成 VAD 可视化图表时出错: {e}\n{traceback.format_exc()}")

# ===================================================================
# 测试区
# ===================================================================
logging.info("--- [测试开始] 步骤3：语音活动检测 (VAD) 与音频切割 ---")

if 'optimized_vocals_path' in locals() and optimized_vocals_path.exists():
    logging.info(f"将对文件进行 VAD 处理: {optimized_vocals_path.name}")
    
    # --- 步骤 3.1: 运行 VAD 并获取两种格式的时间戳 ---
    speech_timestamps_samples, speech_timestamps_ms, vad_model, vad_utils = detect_speech_segments(optimized_vocals_path)
    
    if speech_timestamps_ms:
        logging.info("--- [VAD成功] VAD 处理完成 ---")
        logging.info("前5个检测到的语音片段 (单位: 毫秒):")
        for ts in speech_timestamps_ms[:5]:
            logging.info(f"  - Start: {ts['start']:>7d} ms, End: {ts['end']:>7d} ms")
        
        # --- 步骤 3.2: 将【毫秒】时间戳保存到 JSON 文件 (用于存档和调试) ---
        video_stem = optimized_vocals_path.stem.replace('_16k_mono', '')
        vad_results_path = TEMP_DIR / f"{video_stem}_vad_timestamps_ms.json"
        logging.info(f"正在将 VAD 结果 (毫秒) 保存到文件: {vad_results_path.name}")
        with open(vad_results_path, 'w', encoding='utf-8') as f:
            json.dump(speech_timestamps_ms, f, indent=2)

        # --- 步骤 3.3 (新增): 根据时间戳切割音频 ---
        # 定义切割后音频块的专用输出目录
        vad_chunks_dir = TEMP_DIR / f"{video_stem}_vad_chunks"
        # 调用新函数执行切割
        cut_audio_by_timestamps(optimized_vocals_path, speech_timestamps_ms, vad_chunks_dir)
            
        # --- 步骤 3.4: 使用【采样点】时间戳进行精确绘图 (可选，用于可视化检查) ---
        # plot_vad_results(optimized_vocals_path, speech_timestamps_samples)

    else:
        logging.warning("--- [测试警告] VAD 未能检测到任何语音片段 ---")
else:
    logging.error("--- [测试失败] 找不到上一步生成的 'optimized_vocals_path' 文件。")
    logging.error("请确保上一个单元格已成功运行。")

2025-07-15 01:24:42 - INFO - --- [测试开始] 步骤3：语音活动检测 (VAD) 与音频切割 ---
2025-07-15 01:24:42 - INFO - 将对文件进行 VAD 处理: vocals_16k_mono.wav
2025-07-15 01:24:42 - INFO - 正在加载 Silero VAD 模型...
Using cache found in /root/.cache/torch/hub/snakers4_silero-vad_master
2025-07-15 01:24:43 - INFO - 正在读取音频文件用于 VAD: vocals_16k_mono.wav
2025-07-15 01:24:43 - INFO - 开始使用 Silero VAD (优化参数) 检测语音时间戳...
2025-07-15 01:24:59 - INFO - VAD 检测到 393 个语音片段。
2025-07-15 01:25:00 - INFO - --- [VAD成功] VAD 处理完成 ---
2025-07-15 01:25:00 - INFO - 前5个检测到的语音片段 (单位: 毫秒):
2025-07-15 01:25:00 - INFO -   - Start:     868 ms, End:    1724 ms
2025-07-15 01:25:00 - INFO -   - Start:   47236 ms, End:   49724 ms
2025-07-15 01:25:00 - INFO -   - Start:   51844 ms, End:   52636 ms
2025-07-15 01:25:00 - INFO -   - Start:   55460 ms, End:   57084 ms
2025-07-15 01:25:00 - INFO -   - Start:   57828 ms, End:   58876 ms
2025-07-15 01:25:00 - INFO - 正在将 VAD 结果 (毫秒) 保存到文件: vocals_vad_timestamps_ms.json
2025-07-15 01:25:00 - INFO - 开始根据VAD时间戳切割音频:

## 三.五 参数空间搜索

In [4]:
# # ===================================================================
# # 6. 导入所需库 (已移除 concurrent.futures 和 os)
# # ===================================================================
# import logging
# from pathlib import Path
# import librosa
# import json
# import torch
# import re
# import numpy as np

# # ===================================================================
# # 辅助函数：解析SRT文件和计算IoU得分 (保持不变)
# # ===================================================================

# def _srt_time_to_ms(time_str: str) -> int:
#     """将 SRT 时间戳字符串 (HH:MM:SS,ms) 转换为总毫秒数。"""
#     parts = re.split(r'[:,]', time_str)
#     h, m, s, ms = map(int, parts)
#     return h * 3600000 + m * 60000 + s * 1000 + ms

# def parse_srt_file(srt_path: Path) -> list:
#     """解析SRT文件，返回一个包含语音片段起始和结束时间（毫秒）的列表。"""
#     if not srt_path.exists():
#         logging.error(f"SRT 文件未找到: {srt_path}")
#         return []
    
#     segments = []
#     try:
#         with open(srt_path, 'r', encoding='utf-8-sig') as f:
#             content = f.read()
        
#         pattern = re.compile(r'\d+\n(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})')
#         matches = pattern.finditer(content)

#         for match in matches:
#             start_str, end_str = match.groups()
#             segments.append({
#                 'start': _srt_time_to_ms(start_str),
#                 'end': _srt_time_to_ms(end_str)
#             })
#         logging.info(f"成功从 '{srt_path.name}' 解析出 {len(segments)} 个字幕片段。")
#     except Exception as e:
#         logging.error(f"解析 SRT 文件 '{srt_path.name}' 时出错: {e}")
#         return []
        
#     return segments

# def calculate_iou_score(ground_truth_segs: list, vad_segs: list, total_duration_ms: int) -> float:
#     """通过创建时间轴掩码来计算两组时间片段的 IoU (Intersection over Union) 得分。"""
#     gt_timeline = np.zeros(total_duration_ms, dtype=bool)
#     vad_timeline = np.zeros(total_duration_ms, dtype=bool)

#     for seg in ground_truth_segs:
#         gt_timeline[seg['start']:seg['end']] = True
    
#     for seg in vad_segs:
#         vad_timeline[seg['start']:seg['end']] = True
        
#     intersection = np.sum(gt_timeline & vad_timeline)
#     union = np.sum(gt_timeline | vad_timeline)

#     if union == 0:
#         return 1.0

#     return intersection / union

# # ===================================================================
# # 定义参数空间和基准参数 (保持不变)
# # ===================================================================
# param_grid_single_test = {
#     'threshold': [ 0.125, 0.1,0.15, 0.175, 0.20],
#     'min_silence_duration_ms': [120, 140, 150, 160, 180],
#     'min_speech_duration_ms': [60, 80, 100],
#     'speech_pad_ms': [ 150, 180,200,220,250]
# }
# base_params = {
#     'threshold': 0.125, 'min_silence_duration_ms': 120,
#     'min_speech_duration_ms': 80, 'speech_pad_ms': 220
# }

# # ===================================================================
# # 单进程串行测试主程序 (已移除多进程逻辑)
# # ===================================================================
# def run_serial_vad_testing(audio_for_tuning_path: Path, srt_ground_truth_path: Path, 
#                            vad_model, vad_utils):
#     """
#     【单进程串行版】主测试函数。
#     """
#     logging.info("--- [开始] VAD参数单变量影响测试 (单进程串行版) ---")
#     print(f"将使用以下基准参数进行对照: {base_params}")
    
#     # --- 步骤1: 检查并加载数据 ---
#     if not audio_for_tuning_path.exists() or not srt_ground_truth_path.exists():
#         logging.error("音频或SRT文件路径无效，测试中止。")
#         return

#     ground_truth_segments = parse_srt_file(srt_ground_truth_path)
#     if not ground_truth_segments:
#         logging.error("无法加载SRT文件，测试中止。")
#         return

#     duration_sec = librosa.get_duration(path=str(audio_for_tuning_path))
#     audio_duration_ms = int(duration_sec * 1000)
    
#     (get_speech_timestamps, _, read_audio, _, _) = vad_utils
#     wav_tensor = read_audio(str(audio_for_tuning_path), sampling_rate=16000)
    
#     all_single_test_results = {}
    
#     # --- 步骤2: 循环测试每个参数 ---
#     for param_key, param_values in param_grid_single_test.items():
#         print(f"\n--- 正在测试参数: '{param_key}' ---")
        
#         current_param_results = []
        
#         # --- 步骤3: 使用简单的 for 循环串行执行 ---
#         for value in param_values:
#             # 准备本次测试的参数
#             test_params = base_params.copy()
#             test_params[param_key] = value
            
#             # 执行VAD检测
#             speech_timestamps_samples = get_speech_timestamps(
#                 wav_tensor, vad_model, sampling_rate=16000, **test_params
#             )
            
#             # 计算结果
#             vad_segments_ms = [
#                 {'start': round(s['start'] / 16), 'end': round(s['end'] / 16)} 
#                 for s in speech_timestamps_samples
#             ]
#             iou_score = calculate_iou_score(
#                 ground_truth_segments, vad_segments_ms, audio_duration_ms
#             )
            
#             # 打印单次结果
#             num_srt_segs = len(ground_truth_segments)
#             segs = len(vad_segments_ms)
#             print(f"  > 当 {param_key:<25} = {value:<5} | IoU得分: {iou_score:.4f} (片段数: {segs} / 标准: {num_srt_segs})")

#             # 保存单次结果
#             current_param_results.append({
#                 'params': test_params, 'iou_score': iou_score,
#                 'num_vad_segments': segs
#             })

#         all_single_test_results[param_key] = current_param_results
#         print("-" * 75)

#     # --- 步骤4: 保存最终结果 ---
#     results_path = WORKSPACE_DIR / "vad_serial_tuning_results.json"
#     with open(results_path, 'w', encoding='utf-8') as f:
#         json.dump(all_single_test_results, f, indent=4, ensure_ascii=False)
        
#     logging.info(f"--- [完成] 所有串行测试结果已保存到: {results_path} ---")


# # ===================================================================
# # --- 调用主函数 (与原始版本相同) ---
# # ===================================================================
# # 1. 定义全局变量 (这些是在其他单元格中创建的)
# SRT_GROUND_TRUTH_PATH = Path("E01_cn.srt") 

# # 2. 检查全局变量是否存在，再调用函数
# if 'optimized_vocals_path' in locals() and 'vad_model' in locals() and 'vad_utils' in locals():
#     # 3. 将全局变量作为参数，传入函数中
#     run_serial_vad_testing(
#         audio_for_tuning_path=optimized_vocals_path,
#         srt_ground_truth_path=SRT_GROUND_TRUTH_PATH,
#         vad_model=vad_model,
#         vad_utils=vad_utils
#     )
# else:
#     logging.error("无法启动测试：一个或多个必需的全局变量 (optimized_vocals_path, vad_model, vad_utils) 未定义。请确保之前的单元格已成功运行。")

## 四 ASR&原文字幕生成

#### 4.1 使用VAD切分的音频进行识别(首选)

In [12]:
# ===================================================================
# 4. 导入 ASR 和字幕生成所需的库 (已切换到 OpenAI-Whisper)
# ===================================================================
# from faster_whisper import WhisperModel  <- 已移除
import whisper  # <--- 使用官方的 whisper 库
import logging
from pathlib import Path
from datetime import timedelta
import torch
import gc
import os

# ===================================================================
# 步骤 4 的功能函数 (已适配 OpenAI-Whisper)
# ===================================================================

def load_original_whisper_model():
    """
    【新】加载官方的 OpenAI-Whisper 模型。
    它会自动处理模型的下载和缓存。
    """
    # 'large-v3' 是当前最先进的模型
    model_name = "large-v3" 
    device = "cuda" if torch.cuda.is_available() else "cpu"
    
    logging.info(f"正在加载官方 Whisper 模型: '{model_name}'")
    logging.info(f"将使用设备: {device}")
    
    try:
        # load_model 会自动下载并缓存模型，非常稳定
        model = whisper.load_model(model_name, device=device)
        logging.info("✅ 官方 Whisper 模型加载成功。")
        return model
    except Exception as e:
        logging.error(f"❌ 加载官方 Whisper 模型失败: {e}", exc_info=True)
        return None

def format_time_for_srt(milliseconds: int) -> str:
    """将毫秒转换为 SRT 的 'HH:MM:SS,ms' 格式。(此函数无需改变)"""
    td = timedelta(milliseconds=milliseconds)
    hours, remainder = divmod(td.seconds, 3600)
    minutes, seconds = divmod(remainder, 60)
    millis = td.microseconds // 1000
    return f"{hours:02d}:{minutes:02d}:{seconds:02d},{millis:03d}"

def transcribe_audio_chunks_original(whisper_model, audio_chunks_dir: Path):
    """
    【新】使用官方 Whisper 模型，通过循环逐个处理音频文件。
    同样加入了内存管理和错误捕获。
    """
    logging.info(f"准备从目录 '{audio_chunks_dir.name}' 中逐个识别音频片段...")
    
    chunk_paths = sorted(audio_chunks_dir.glob("*.wav"))
    if not chunk_paths:
        logging.warning(f"在指定目录中未找到任何 .wav 音频片段。")
        return []

    logging.info(f"找到 {len(chunk_paths)} 个音频片段，开始逐个进行语音识别...")
    
    transcription_results = []
    total_chunks = len(chunk_paths)
    
    for i, chunk_path in enumerate(chunk_paths):
        try:
            # logging.info(f"--> 正在处理片段 [{i+1}/{total_chunks}]: {chunk_path.name}")
            
            chunk_path_str = str(chunk_path)
            
            # --- 【核心修改】调用官方 whisper 的 transcribe 方法 ---
            # 它返回的是一个包含所有信息的字典，而不是生成器
            result = whisper_model.transcribe(
                chunk_path_str,
                language="ja",      # 指定语言
                fp16=torch.cuda.is_available() # 在GPU上自动使用fp16
            )
            
            # 从结果字典中获取识别出的文本
            text = result["text"].strip()

            if text:
                try:
                    # 从文件名解析时间戳的逻辑保持不变
                    parts = chunk_path.stem.split('_')
                    start_ms = int(parts[-2].replace('ms', ''))
                    end_ms = int(parts[-1].replace('ms', ''))
                    
                    transcription_results.append({
                        "start_ms": start_ms,
                        "end_ms": end_ms,
                        "text": text
                    })
                except (IndexError, ValueError) as e:
                    logging.warning(f"无法从文件名 {chunk_path.name} 解析时间戳: {e}")

        except Exception as e:
            logging.error(f"❌ 处理文件 {chunk_path.name} 时发生严重错误，已跳过。", exc_info=True)
        
        finally:
            # 内存管理措施依然保留，这是一个好习惯
            gc.collect() 
            if torch.cuda.is_available():
                torch.cuda.empty_cache()

    logging.info(f"✅ 完成所有片段的识别，共获得 {len(transcription_results)} 条有效转录。")
    return transcription_results
    
def generate_srt_from_transcriptions(transcriptions: list, srt_output_path: Path):
    """
    将转录结果列表生成为标准的 SRT 字幕文件。(修正版)
    直接逐行写入文件，以彻底避免字符串转义问题。
    """
    logging.info(f"正在生成 SRT 字幕文件到: {srt_output_path}")
    transcriptions.sort(key=lambda x: x['start_ms'])
    
    try:
        with open(srt_output_path, 'w', encoding='utf-8') as f:
            for i, res in enumerate(transcriptions, start=1):
                start_time_str = format_time_for_srt(res['start_ms'])
                end_time_str = format_time_for_srt(res['end_ms'])
                text = res['text']
                
                # 逐行写入文件，确保换行符被正确处理
                f.write(f"{i}\n")
                f.write(f"{start_time_str} --> {end_time_str}\n")
                # 在文本末尾写入两个换行符，一个用于文本本身换行，一个用于和下一个字幕块隔开
                f.write(f"{text}\n\n")
                
        logging.info(f"✅ 日语 SRT 字幕文件已成功保存。")
    except Exception as e:
        logging.error(f"❌ 保存 SRT 文件时出错: {e}", exc_info=True)

# ===================================================================
# 测试区 (已适配新函数)
# ===================================================================
logging.info("--- [测试开始] 步骤4：ASR语音识别 (使用 OpenAI-Whisper) ---")

if 'vad_chunks_dir' in locals() and vad_chunks_dir.exists():
    
    # 【修改】调用新的模型加载函数
    whisper_model = load_original_whisper_model()
    
    if whisper_model:
        # 【修改】调用新的转录函数
        transcription_data = transcribe_audio_chunks_original(whisper_model, vad_chunks_dir)
        
        if transcription_data:
            video_stem = vad_chunks_dir.name.replace('_vad_chunks', '')
            japanese_srt_path = OUTPUT_DIR / f"{video_stem}_ja.srt"
            
            generate_srt_from_transcriptions(transcription_data, japanese_srt_path)
            
            logging.info("\n --- [测试成功] 步骤4全部完成 ---")
            logging.info("生成的日语字幕文件预览 (前5条):")
            for item in transcription_data[:5]:
                start = format_time_for_srt(item['start_ms'])
                end = format_time_for_srt(item['end_ms'])
                print(f"[{start} --> {end}] {item['text']}")
        else:
            logging.warning("--- [测试警告] ASR 未能从音频片段中识别出任何文本。---")
else:
    logging.error("--- [测试失败] 找不到上一步生成的 'vad_chunks_dir' 目录。")
    logging.error("请确保上一个单元格已成功运行。")

2025-07-15 01:47:09 - INFO - --- [测试开始] 步骤4：ASR语音识别 (使用 OpenAI-Whisper) ---
2025-07-15 01:47:09 - INFO - 正在加载官方 Whisper 模型: 'large-v3'
2025-07-15 01:47:09 - INFO - 将使用设备: cuda
2025-07-15 01:47:25 - INFO - ✅ 官方 Whisper 模型加载成功。
2025-07-15 01:47:25 - INFO - 准备从目录 'vocals_vad_chunks' 中逐个识别音频片段...
2025-07-15 01:47:25 - INFO - 找到 393 个音频片段，开始逐个进行语音识别...
2025-07-15 01:52:01 - INFO - ✅ 完成所有片段的识别，共获得 376 条有效转录。
2025-07-15 01:52:01 - INFO - 正在生成 SRT 字幕文件到: workspace/output/vocals_ja.srt
2025-07-15 01:52:01 - INFO - ✅ 日语 SRT 字幕文件已成功保存。
2025-07-15 01:52:01 - INFO - 
 --- [测试成功] 步骤4全部完成 ---
2025-07-15 01:52:01 - INFO - 生成的日语字幕文件预览 (前5条):


[00:00:00,868 --> 00:00:01,724] ご視聴ありがとうございました
[00:00:47,236 --> 00:00:49,724] やっぱり海はいいなあ
[00:00:51,844 --> 00:00:52,636] おっ
[00:00:55,460 --> 00:00:57,084] しおりちゃんから
[00:00:57,828 --> 00:00:58,876] しおり?


#### 4.x batch加速版（并非加速，弃用）

In [None]:
# # ===================================================================
# # 4. 导入 ASR 和字幕生成所需的库
# # ===================================================================
# import logging
# from pathlib import Path
# from datetime import timedelta
# import torch
# import gc
# import os
# from transformers import pipeline
# from transformers.utils import is_flash_attn_2_available # 引入 Flash Attention 2 的检查工具
# import warnings
# warnings.simplefilter(action='ignore', category=FutureWarning)
# # ===================================================================
# # 步骤 4 的功能函数
# # ===================================================================

# def load_optimized_whisper_pipeline():
#     """
#     加载经过 Flash Attention 2 优化的 Hugging Face ASR pipeline。
#     """
#     model_name = "openai/whisper-large-v3"
#     device = "cuda:0" if torch.cuda.is_available() else "cpu"
#     torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

#     logging.info(f"正在加载优化的 ASR pipeline: '{model_name}'")
#     logging.info(f"将使用设备: {device}, 计算类型: {torch_dtype}")

#     if is_flash_attn_2_available():
#         attn_implementation = "flash_attention_2"
#         logging.info("✅ Flash Attention 2 可用，将启用以获得最大加速！")
#     else:
#         attn_implementation = "sdpa" # Scaled Dot Product Attention，PyTorch 2.0+ 的内置高效实现
#         logging.info("⚠️ Flash Attention 2 不可用，将使用 PyTorch 内置的 SDPA。性能依然很好。")
        
#     try:
#         # 直接使用 Hugging Face pipeline，并传入优化参数
#         asr_pipeline = pipeline(
#             "automatic-speech-recognition",
#             model=model_name,
#             torch_dtype=torch_dtype,
#             device=device,
#             model_kwargs={"attn_implementation": attn_implementation}
#         )
#         logging.info("✅ 优化后的 ASR pipeline 加载成功。")
#         return asr_pipeline
#     except Exception as e:
#         logging.error(f"❌ 加载 ASR pipeline 失败: {e}", exc_info=True)
#         return None

# def format_time_for_srt(milliseconds: int) -> str:
#     """将毫秒转换为 SRT 的 'HH:MM:SS,ms' 格式。"""
#     td = timedelta(milliseconds=milliseconds)
#     hours, remainder = divmod(td.seconds, 3600)
#     minutes, seconds = divmod(remainder, 60)
#     millis = td.microseconds // 1000
#     return f"{hours:02d}:{minutes:02d}:{seconds:02d},{millis:03d}"

# def transcribe_chunks_with_pipeline(asr_pipeline, audio_chunks_dir: Path, batch_size: int):
#     """
#     使用优化后的 pipeline 和批处理功能高效转录所有音频块。
#     """
#     logging.info(f"准备从目录 '{audio_chunks_dir.name}' 中进行批处理识别...")
    
#     chunk_paths = [str(p) for p in sorted(audio_chunks_dir.glob("*.wav"))]
#     if not chunk_paths:
#         logging.warning(f"在指定目录中未找到任何 .wav 音频片段。")
#         return []

#     logging.info(f"找到 {len(chunk_paths)} 个音频片段，开始使用 batch_size={batch_size} 进行批处理...")
    
#     transcription_results = []
    
#     try:
#         # --- 直接调用 pipeline，它原生支持批处理 ---
#         # `pipeline` 可以直接处理一个文件路径列表
#         outputs = asr_pipeline(
#             chunk_paths,
#             chunk_length_s=10, # 这是 whisper 的标准块长度
#             batch_size=batch_size,
#             generate_kwargs={"language": "japanese"} # 指定语言
#         )
        
#         logging.info(f"✅ 所有批次处理完成。正在整理 {len(outputs)} 条结果...")

#         # 将结果与原始文件路径对应起来，以提取时间戳
#         for i, output in enumerate(outputs):
#             text = output["text"].strip()
#             if text:
#                 try:
#                     chunk_path = Path(chunk_paths[i])
#                     parts = chunk_path.stem.split('_')
#                     start_ms = int(parts[-2].replace('ms', ''))
#                     end_ms = int(parts[-1].replace('ms', ''))
                    
#                     transcription_results.append({
#                         "start_ms": start_ms,
#                         "end_ms": end_ms,
#                         "text": text
#                     })
#                 except (IndexError, ValueError) as e:
#                     logging.warning(f"无法从文件名 {chunk_path.name} 解析时间戳: {e}")

#     except Exception as e:
#         logging.error(f"❌ 在批处理过程中发生严重错误。", exc_info=True)
    
#     finally:
#         gc.collect()
#         if torch.cuda.is_available():
#             torch.cuda.empty_cache()

#     logging.info(f"成功整理出 {len(transcription_results)} 条有效转录。")
#     return transcription_results

# def generate_srt_from_transcriptions(transcriptions: list, srt_output_path: Path):
#     """将转录结果列表生成为标准的 SRT 字幕文件。"""
#     logging.info(f"正在生成 SRT 字幕文件到: {srt_output_path}")
#     srt_content = []
#     transcriptions.sort(key=lambda x: x['start_ms'])
    
#     for i, res in enumerate(transcriptions, start=1):
#         start_time_str = format_time_for_srt(res['start_ms'])
#         end_time_str = format_time_for_srt(res['end_ms'])
#         text = res['text']
#         srt_block = f"{i}\\n{start_time_str} --> {end_time_str}\\n{text}\\n"
#         srt_content.append(srt_block)
        
#     final_srt = "\\n".join(srt_content)
    
#     try:
#         with open(srt_output_path, 'w', encoding='utf-8') as f:
#             f.write(final_srt)
#         logging.info(f"✅ 日语 SRT 字幕文件已成功保存。")
#     except Exception as e:
#         logging.error(f"❌ 保存 SRT 文件时出错: {e}", exc_info=True)


# # ===================================================================
# # 测试区 
# # ===================================================================
# logging.info("--- [测试开始] 步骤4：ASR语音识别---")

# if 'vad_chunks_dir' in locals() and vad_chunks_dir.exists():
    
#     # 加载我们新的、优化过的 pipeline
#     asr_pipeline = load_optimized_whisper_pipeline()
    
#     if asr_pipeline:
#         # --- 定义批处理大小 ---
#         # 对于 3090 (24GB VRAM)，Flash Attention 2 更节省显存，可以尝试更大的批次
#         BATCH_SIZE = 8
        
#         transcription_data = transcribe_chunks_with_pipeline(asr_pipeline, vad_chunks_dir, BATCH_SIZE)
        
#         if transcription_data:
#             video_stem = vad_chunks_dir.name.replace('_vad_chunks', '')
#             japanese_srt_path = OUTPUT_DIR / f"{video_stem}_ja.srt"
            
#             generate_srt_from_transcriptions(transcription_data, japanese_srt_path)
            
#             logging.info("\\n--- [测试成功] 步骤4全部完成 ---")
#             logging.info("生成的日语字幕文件预览 (前5条):")
#             for item in transcription_data[:5]:
#                 start = format_time_for_srt(item['start_ms'])
#                 end = format_time_for_srt(item['end_ms'])
#                 print(f"[{start} --> {end}] {item['text']}")
#         else:
#             logging.warning("--- [测试警告] ASR 未能从音频片段中识别出任何文本。---")
# else:
#     logging.error("--- [测试失败] 找不到上一步生成的 'vad_chunks_dir' 目录。")
#     logging.error("请确保上一个单元格已成功运行。")

#### 4.2 混合策略(flag,To be improved)

In [None]:
# ===================================================================
# 4. 导入 ASR 和字幕生成所需的库 (终极混合策略版)
# ===================================================================
import whisper
import logging
from pathlib import Path
from datetime import timedelta
import torch
import gc
import os
import json # 需要 json 库来加载 VAD 时间戳

def load_original_whisper_model():
    """
    【新】加载官方的 OpenAI-Whisper 模型。
    它会自动处理模型的下载和缓存。
    """
    # 'large-v3' 是当前最先进的模型
    model_name = "large-v3" 
    device = "cuda" if torch.cuda.is_available() else "cpu"
    
    logging.info(f"正在加载官方 Whisper 模型: '{model_name}'")
    logging.info(f"将使用设备: {device}")
    
    try:
        # load_model 会自动下载并缓存模型，非常稳定
        model = whisper.load_model(model_name, device=device)
        logging.info("✅ 官方 Whisper 模型加载成功。")
        return model
    except Exception as e:
        logging.error(f"❌ 加载官方 Whisper 模型失败: {e}", exc_info=True)
        return None

def format_time_for_srt(milliseconds: int) -> str:
    """将毫秒转换为 SRT 的 'HH:MM:SS,ms' 格式。(此函数无需改变)"""
    td = timedelta(milliseconds=milliseconds)
    hours, remainder = divmod(td.seconds, 3600)
    minutes, seconds = divmod(remainder, 60)
    millis = td.microseconds // 1000
    return f"{hours:02d}:{minutes:02d}:{seconds:02d},{millis:03d}"

# ===================================================================
# 【核心】新的功能函数
# ===================================================================
def transcribe_long_audio_with_vad_guidance(whisper_model, long_audio_path: Path, vad_timestamps_path: Path):
    """
    【新】使用 Whisper 处理长音频，并用外部 VAD 的结果来过滤和校准。
    """
    logging.info(f"准备使用 Whisper 处理长音频: {long_audio_path.name}")
    logging.info(f"并使用 VAD 时间戳进行指导: {vad_timestamps_path.name}")

    if not long_audio_path.exists() or not vad_timestamps_path.exists():
        logging.error(f"音频文件或 VAD 时间戳文件不存在。")
        return []

    # 1. 加载我们精确的 Silero-VAD 时间戳 (单位：秒)
    with open(vad_timestamps_path, 'r') as f:
        # 将毫秒转换为秒
        vad_timestamps = [{'start': ts['start']/1000.0, 'end': ts['end']/1000.0} for ts in json.load(f)]

    # 2. 调用 Whisper 进行转录，并设置宽松的参数
    logging.info("开始使用 Whisper 进行初步转录，参数已设置为宽松模式...")
    try:
        # no_speech_threshold: 设低一点，让 Whisper 更不容易将音频判断为无语音
        # condition_on_previous_text: 设为 False 可能有助于减少长静音后的重复或幻觉
        result = whisper_model.transcribe(
            str(long_audio_path),
            language="ja",
            fp16=torch.cuda.is_available(),
            no_speech_threshold=0.5, # 默认是 0.6，稍微降低
            condition_on_previous_text=False 
        )
        whisper_segments = result['segments']
        logging.info(f"Whisper 初步转录完成，得到 {len(whisper_segments)} 个片段。")

    except Exception as e:
        logging.error(f"❌ Whisper 转录过程中发生严重错误。", exc_info=True)
        return []

    # 3. 【关键】用 VAD 时间戳过滤和对齐 Whisper 的结果
    final_segments = []
    logging.info("开始使用 Silero-VAD 时间戳对 Whisper 结果进行校准...")

    for whisper_seg in whisper_segments:
        # 检查这个 Whisper 片段的时间范围是否与任何 VAD 片段有重叠
        is_valid_segment = False
        for vad_ts in vad_timestamps:
            # 计算重叠部分
            overlap_start = max(whisper_seg['start'], vad_ts['start'])
            overlap_end = min(whisper_seg['end'], vad_ts['end'])
            
            # 如果重叠时长大于 0 (或一个很小的阈值)，就认为这个片段是有效的
            if overlap_end > overlap_start:
                is_valid_segment = True
                break # 只要跟一个 VAD 片段有重叠就行
        
        if is_valid_segment:
            final_segments.append(whisper_seg)
        else:
            logging.warning(f"丢弃 Whisper 片段 (可能为幻觉): [{whisper_seg['start']:.2f}s -> {whisper_seg['end']:.2f}s] {whisper_seg['text']}")

    logging.info(f"校准完成。最终保留 {len(final_segments)} / {len(whisper_segments)} 个片段。")
    
    # 清理内存
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        
    return final_segments

def generate_srt_from_transcriptions(transcriptions: list, srt_output_path: Path):
    """将转录结果列表生成为标准的 SRT 字幕文件。(此函数无需改变)"""
    logging.info(f"正在生成 SRT 字幕文件到: {srt_output_path}")
    srt_content = []
    transcriptions.sort(key=lambda x: x['start_ms'])
    
    for i, res in enumerate(transcriptions, start=1):
        start_time_str = format_time_for_srt(res['start_ms'])
        end_time_str = format_time_for_srt(res['end_ms'])
        text = res['text']
        srt_block = f"{i}\\n{start_time_str} --> {end_time_str}\\n{text}\\n"
        srt_content.append(srt_block)
        
    final_srt = "\\n".join(srt_content)
    
    try:
        with open(srt_output_path, 'w', encoding='utf-8') as f:
            f.write(final_srt)
        logging.info(f"✅ 日语 SRT 字幕文件已成功保存。")
    except Exception as e:
        logging.error(f"❌ 保存 SRT 文件时出错: {e}", exc_info=True)

# ===================================================================
# 测试区 (已适配混合策略)
# ===================================================================
logging.info("--- [测试开始] 步骤4：ASR语音识别 (使用终极混合策略) ---")

try:
    video_stem = "vocals" # 请根据实际情况修改
    # 需要两个输入文件：
    vocals_16k_mono_path = TEMP_DIR / f"{video_stem}_16k_mono.wav"
    vad_timestamps_path = TEMP_DIR / f"{video_stem}_vad_timestamps_ms.json"

    if vocals_16k_mono_path.exists() and vad_timestamps_path.exists():
        whisper_model = load_original_whisper_model()
        
        if whisper_model:
            # 调用我们新的混合策略函数
            final_transcription_segments = transcribe_long_audio_with_vad_guidance(
                whisper_model, 
                vocals_16k_mono_path, 
                vad_timestamps_path
            )
            
            if final_transcription_segments:
                japanese_srt_path = OUTPUT_DIR / f"{video_stem}_ja.srt"
                generate_srt_from_segments(final_transcription_segments, japanese_srt_path)
                
                logging.info("\\n--- [测试成功] 步骤4全部完成 ---")
                logging.info("生成的日语字幕文件预览 (前5条):")
                for item in transcription_data[:5]:
                    start = format_time_for_srt(item['start_ms'])
                    end = format_time_for_srt(item['end_ms'])
                    print(f"[{start} --> {end}] {item['text']}")
            else:
                logging.warning("--- [测试警告] ASR 未能从音频中识别出任何有效文本。---")
    else:
        logging.error(f"--- [测试失败] 缺少所需文件: {vocals_16k_mono_path} 或 {vad_timestamps_path}")

except NameError as e:
    logging.error(f"--- [测试失败] 变量未定义: {e}。请确保前面的单元格已成功运行。")

#### 4.3 纯whisper(切分不准确)

In [None]:
# ===================================================================
# 4. 导入 ASR 和字幕生成所需的库 (已切换到 OpenAI-Whisper，简化版)
# ===================================================================
import whisper
import logging
from pathlib import Path
from datetime import timedelta
import torch
import gc
import os

# ===================================================================
# 步骤 4 的功能函数 (已适配 OpenAI-Whisper，简化版)
# ===================================================================

def load_original_whisper_model():
    """
    加载官方的 OpenAI-Whisper 模型。
    它会自动处理模型的下载和缓存。
    """
    model_name = "large-v3" 
    device = "cuda" if torch.cuda.is_available() else "cpu"
    
    logging.info(f"正在加载官方 Whisper 模型: '{model_name}'")
    logging.info(f"将使用设备: {device}")
    
    try:
        model = whisper.load_model(model_name, device=device)
        logging.info("✅ 官方 Whisper 模型加载成功。")
        return model
    except Exception as e:
        logging.error(f"❌ 加载官方 Whisper 模型失败: {e}", exc_info=True)
        return None

def format_time_for_srt(seconds: float) -> str:
    """将秒数转换为 SRT 的 'HH:MM:SS,ms' 格式。"""
    td = timedelta(seconds=seconds)
    # 使用 total_seconds() 来正确处理大于一天的情况（虽然这里不需要）
    # 但更简洁的写法是直接从 timedelta 对象中提取
    millis = td.microseconds // 1000
    total_seconds = int(td.total_seconds())
    hours, remainder = divmod(total_seconds, 3600)
    minutes, seconds = divmod(remainder, 60)
    return f"{hours:02d}:{minutes:02d}:{seconds:02d},{millis:03d}"

def transcribe_long_audio_with_whisper(whisper_model, long_audio_path: Path):
    """
    【新】使用官方 Whisper 模型，直接处理一个长音频文件。
    Whisper 会在内部自动分块并返回带时间戳的片段。
    """
    logging.info(f"准备使用 Whisper 直接处理长音频文件: {long_audio_path.name}")
    
    if not long_audio_path.exists():
        logging.error(f"音频文件不存在: {long_audio_path}")
        return None

    try:
        logging.info("开始使用 Whisper 进行转录，参数已设置为宽松模式...")
        # no_speech_threshold: 设低一点，让 Whisper 更不容易将音频判断为无语音
        # condition_on_previous_text: 设为 False 可能有助于减少长静音后的重复或幻觉
        result = whisper_model.transcribe(
            str(long_audio_path),
            language="ja",
            fp16=torch.cuda.is_available(),
            no_speech_threshold=0.5,
            condition_on_previous_text=False
        )
        
        logging.info(f"✅ 长音频转录完成，获取到 {len(result['segments'])} 个字幕片段。")
        return result['segments']

    except Exception as e:
        logging.error(f"❌ 处理长音频文件时发生严重错误。", exc_info=True)
        return None
    
def generate_srt_from_segments(segments: list, srt_output_path: Path):
    """根据 Whisper 返回的 segments 列表生成 SRT 文件。"""
    logging.info(f"正在根据 {len(segments)} 个片段生成 SRT 文件到: {srt_output_path}")
    with open(srt_output_path, 'w', encoding='utf-8') as f:
        for i, seg in enumerate(segments, start=1):
            start_time = seg['start']
            end_time = seg['end']
            text = seg['text'].strip()
            
            # 直接使用我们修改过的 format 函数
            start_srt = format_time_for_srt(start_time)
            end_srt = format_time_for_srt(end_time)
            
            f.write(f"{i}\n")
            f.write(f"{start_srt} --> {end_srt}\n")
            f.write(f"{text}\n\n")
    logging.info(f"✅ SRT 字幕文件已成功保存。")

# ===================================================================
# 测试区 (已适配最终简化流程)
# ===================================================================
logging.info("--- [测试开始] 步骤4：ASR语音识别 (使用 Whisper 原生分割) ---")

try:
    # 假设这是您在前几个单元格中定义的变量
    video_stem = "vocals" 
    vocals_16k_mono_path = TEMP_DIR / f"{video_stem}_16k_mono.wav"
    
    if vocals_16k_mono_path.exists():
        whisper_model = load_original_whisper_model()
        
        if whisper_model:
            # 直接处理这个长音频文件
            transcription_segments = transcribe_long_audio_with_whisper(whisper_model, vocals_16k_mono_path)
            
            if transcription_segments is not None:
                japanese_srt_path = OUTPUT_DIR / f"{video_stem}_ja.srt"
                generate_srt_from_segments(transcription_segments, japanese_srt_path)
                
                logging.info("\\n--- [测试成功] 步骤4全部完成 ---")
                logging.info("生成的日语字幕文件预览 (前5条):")
                for item in transcription_segments[:5]:
                    start = item['start']
                    end = item['end']
                    text = item['text']
                    print(f"[{start:.2f}s --> {end:.2f}s] {text}")
            else:
                logging.warning("--- [测试警告] ASR 未能从音频中识别出任何文本或处理失败。---")
            
            # 清理模型，释放显存
            del whisper_model
            gc.collect()
            if torch.cuda.is_available():
                torch.cuda.empty_cache()

    else:
        logging.error(f"--- [测试失败] 找不到优化后的人声音频文件: {vocals_16k_mono_path}")

except NameError as e:
    logging.error(f"--- [测试失败] 变量未定义: {e}。请确保前面的单元格已成功运行。")

## 五 翻译

In [15]:
# ===================================================================
# 5. 导入翻译和SRT处理所需的库
# ===================================================================
import re
import requests
import json
import logging
from pathlib import Path
import time

# ===================================================================
# 步骤 5 的功能函数 (已优化为批量翻译)
# ===================================================================

# PROMPT_TEMPLATE 在此定义，使用上面优化后的版本
PROMPT_TEMPLATE = """
# 角色扮演
你是一位专业的、深谙日本动漫文化的日语字幕翻译家。你尤其擅长翻译风格夸张、充满笑点、颜艺丰富和大学生日常吐槽的喜剧动漫，比如《碧蓝之海》。

# 核心任务
你的任务是将我提供的日语字幕文本，逐行翻译成自然、地道、且极具表现力的简体中文。

# 术语表 (Glossary) - 必须严格遵守
以下是本作品《碧蓝之海》(Grand Blue)中的固定翻译，任何情况下都不能更改：
- **核心地点和团体:**
  - Grand Blue: Grand Blue (潜水商店名)
  - PaB (Peek a Boo): PaB (潜水社团)
  - 伊豆大学: 伊豆大学
- **主要角色:**
  - 北原伊織 (きたはら いおり): 北原伊织
  - 古手川千紗 (こてがわ ちさ): 古手川千纱 (注意是“纱”不是“奈”)
  - 古手川奈々華 (こてがわ ななか): 古手川奈奈华
  - 今村耕平 (いまむら こうへい): 今村耕平
  - 浜岡梓 (はまおか あずさ): 滨冈梓
  - 吉原愛菜 (よしわら あいな): 吉原爱菜
  - 寿竜次郎 (ことぶき りゅうじろう): 寿龙次郎
  - 時田信治 (ときた しんじ): 时田信治
- **关键物品与活动:**
  - スクーバダイビング: 水肺潜水
  - ウーロン茶: 乌龙茶 (核心梗！详见下方指南)
  - 水: 水 (核心梗！详见下方指南)
- **特定语气词/口头禅:**
  - ウェイ: 根据语境灵活翻译为“哟—!”、“Wassup!”、“燥起来!”等，体现年轻人聚会时的起哄感，避免生硬的“Wao”。
  - チェイサー: (在酒桌上) 解围酒，请勿翻译成“追赶者”。

# 翻译风格与核心梗指南
1.  **忠于原作搞笑精神**: 这是最高原则！翻译必须能传达出原作的喜剧效果和“有病”感。允许使用贴切的网络流行语、俏皮话和吐槽风格的句子，但要避免过度使用而显得尴尬。
2.  **“生命之水”梗**:
    - 当角色一本正经地谈论“乌龙茶”或“水”，但情景明显是喝酒时（例如，能点燃），必须在译文中体现出这个笑点。
    - 优先方案：直接翻译成“乌龙茶”或“水”，但在后面用括号或注释点明，如：**乌龙茶（可燃）**、**水（生命之水）**。
    - 备选方案：用引号强调，如：“乌龙茶”、“水”。
3.  **口语化与颜艺匹配**: 角色都是大学生，对话风格随意。请使用自然的现代汉语口语。当画面出现夸张的“颜艺”时，译文的语气和用词也要足够夸张，以达到“音画同步”的喜剧效果。
4.  **处理ASR错误**: 输入的日文由ASR生成，可能存在错误。例如，遇到长段重复的无意义字符（如 `あああああ`），请理解其为语气词并恰当截断，如翻译为“啊——！”即可，无需逐字翻译。遇到明显的识别错误，请基于上下文进行合理修正。

# 输出格式要求
请严格按照以下JSON格式输出，不要添加任何额外的解释、注释或markdown标记。
- 格式: 一个JSON数组，每个对象包含 "id" (从1开始的原始行号) 和 "translation" (翻译后的文本)。
- 示例输入: `1: こんにちは\\n2: これはテストです`
- 示例输出: `[{{ "id": 1, "translation": "你好" }}, {{ "id": 2, "translation": "这是一个测试" }}]`

# 待翻译内容
现在，请翻译以下内容：
{BATCH_CONTENT}
"""
def parse_srt_file(srt_path: Path):
    """解析SRT文件，返回包含{'index', 'time', 'text'}的字典列表。"""
    if not srt_path.exists():
        logging.error(f"SRT文件未找到: {srt_path}")
        return []
    with open(srt_path, 'r', encoding='utf-8') as f:
        content = f.read()
    pattern = re.compile(r'(\d+)\n([\d:,]+ --> [\d:,]+)\n(.*?)\n\n', re.DOTALL)
    matches = pattern.findall(content)
    segments = [{'index': m[0], 'time': m[1], 'text': m[2].strip()} for m in matches]
    logging.info(f"成功从 '{srt_path.name}' 解析出 {len(segments)} 个字幕条目。")
    return segments

def translate_batch_deepseek(batch_content: str, api_key: str):
    """使用 DeepSeek API 批量翻译文本。"""
    if not api_key or "sk-" not in api_key:
        logging.error("DeepSeek API 密钥无效或未设置，无法翻译。")
        return None

    prompt = PROMPT_TEMPLATE.format(BATCH_CONTENT=batch_content)
    url = "https://api.deepseek.com/chat/completions"
    headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
    data = {
        "model": "deepseek-chat",
        "messages": [{"role": "user", "content": prompt}],
        "temperature": 0.1,
        "stream": False
    }

    try:
        response = requests.post(url, headers=headers, json=data, timeout=180)
        response.raise_for_status()
        result_text = response.json()['choices'][0]['message']['content']
        # 清理和解析返回的JSON
        json_match = re.search(r'\[.*\]', result_text, re.DOTALL)
        if json_match:
            return json.loads(json_match.group(0))
        else:
            logging.error(f"无法从API响应中解析出JSON数组: {result_text}")
            return None
    except requests.exceptions.RequestException as e:
        logging.error(f"调用 DeepSeek API 时发生网络错误: {e}")
    except (KeyError, IndexError, json.JSONDecodeError) as e:
        logging.error(f"解析 DeepSeek API 响应时出错: {e}")
    return None

def translate_srt_file(ja_srt_path: Path, zh_srt_output_path: Path, api_key: str, batch_size=20):
    """读取日语SRT，分批翻译所有文本，并保存为中文SRT。"""
    logging.info(f"--- 开始翻译SRT文件: {ja_srt_path.name} ---")
    segments = parse_srt_file(ja_srt_path)
    if not segments: return False

    all_translated_segments = {}
    total_segments = len(segments)

    for i in range(0, total_segments, batch_size):
        batch = segments[i:i+batch_size]
        batch_text_input = "\n".join([f"{seg['index']}: {seg['text']}" for seg in batch])
        
        logging.info(f"  -> 正在翻译批次 [{i+1}-{min(i+batch_size, total_segments)}/{total_segments}]...")
        
        translated_batch = translate_batch_deepseek(batch_text_input, api_key)
        
        if translated_batch:
            for item in translated_batch:
                # API返回的id可能是数字或字符串，统一处理为字符串
                all_translated_segments[str(item['id'])] = item['translation']
        else:
            logging.error(f"批次 {i//batch_size + 1} 翻译失败，将使用占位符。")
            for seg in batch:
                all_translated_segments[seg['index']] = f"[翻译失败: {seg['text']}]"
        
        time.sleep(0.1) # 遵守API使用礼仪，避免过于频繁请求

    # 重新构建SRT文件
    final_srt_content = []
    for seg in segments:
        translated_text = all_translated_segments.get(seg['index'], f"[文本丢失: ID {seg['index']}]")
        final_srt_content.append(f"{seg['index']}\n{seg['time']}\n{translated_text}\n")

    try:
        with open(zh_srt_output_path, 'w', encoding='utf-8') as f:
            f.write("\n".join(final_srt_content))
        logging.info(f"✅ 中文SRT文件已成功保存到: {zh_srt_output_path}")
        return True
    except IOError as e:
        logging.error(f"❌ 保存中文SRT文件时出错: {e}")
        return False

# ===================================================================
# 测试区
# ===================================================================
logging.info("--- [测试开始] 步骤5：字幕翻译 ---")

# 假设这些变量已在之前的单元格中定义和创建
# OUTPUT_DIR = Path("workspace/output")
# japanese_srt_path = OUTPUT_DIR / "your_video_ja.srt" 
# DEEPSEEK_API_KEY = "sk-..."

if 'japanese_srt_path' in locals() and japanese_srt_path.exists():
    video_stem = japanese_srt_path.stem.replace('_ja', '')
    chinese_srt_path = OUTPUT_DIR / f"{video_stem}_zh.srt"
    
    if not DEEPSEEK_API_KEY or "sk-xxxxxxxx" in DEEPSEEK_API_KEY:
        logging.error("--- [测试失败] ---")
        logging.error("请在第一个单元格的'全局变量与路径配置'部分填入您的有效 DeepSeek API 密钥。")
    else:
        translate_srt_file(japanese_srt_path, chinese_srt_path, DEEPSEEK_API_KEY, batch_size=10)
        
        logging.info("--- [测试完成] 步骤5执行完毕 ---")
        if chinese_srt_path.exists():
             with open(chinese_srt_path, 'r', encoding='utf-8') as f:
                print("\n中文翻译文件预览 (前100行):")
                # 使用 for 循环安全地读取前10行
                preview_lines = []
                for _ in range(100):
                    line = f.readline()
                    if not line:
                        break
                    preview_lines.append(line)
                print("".join(preview_lines))
else:
    logging.error("--- [测试失败] 找不到上一步生成的 'japanese_srt_path' 文件。")
    logging.error("请确保步骤4.1的单元格已成功运行。")

2025-07-15 01:56:03 - INFO - --- [测试开始] 步骤5：字幕翻译 ---
2025-07-15 01:56:03 - INFO - --- 开始翻译SRT文件: vocals_ja.srt ---
2025-07-15 01:56:03 - INFO - 成功从 'vocals_ja.srt' 解析出 376 个字幕条目。
2025-07-15 01:56:03 - INFO -   -> 正在翻译批次 [1-10/376]...
2025-07-15 01:56:19 - INFO -   -> 正在翻译批次 [11-20/376]...
2025-07-15 01:56:36 - INFO -   -> 正在翻译批次 [21-30/376]...
2025-07-15 01:56:52 - INFO -   -> 正在翻译批次 [31-40/376]...
2025-07-15 01:57:07 - INFO -   -> 正在翻译批次 [41-50/376]...
2025-07-15 01:57:22 - INFO -   -> 正在翻译批次 [51-60/376]...
2025-07-15 01:57:36 - INFO -   -> 正在翻译批次 [61-70/376]...
2025-07-15 01:57:52 - INFO -   -> 正在翻译批次 [71-80/376]...
2025-07-15 01:58:07 - INFO -   -> 正在翻译批次 [81-90/376]...
2025-07-15 01:58:21 - INFO -   -> 正在翻译批次 [91-100/376]...
2025-07-15 01:58:35 - INFO -   -> 正在翻译批次 [101-110/376]...
2025-07-15 01:58:51 - INFO -   -> 正在翻译批次 [111-120/376]...
2025-07-15 01:59:06 - INFO -   -> 正在翻译批次 [121-130/376]...
2025-07-15 01:59:19 - INFO -   -> 正在翻译批次 [131-140/376]...
2025-07-15 01:59:34 - INFO - 


中文翻译文件预览 (前100行):
1
00:00:00,868 --> 00:00:01,724
感谢您的观看

2
00:00:47,236 --> 00:00:49,724
果然大海最棒了啊

3
00:00:51,844 --> 00:00:52,636
哦！

4
00:00:55,460 --> 00:00:57,084
来自诗织酱的

5
00:00:57,828 --> 00:00:58,876
诗织？

6
00:01:06,020 --> 00:01:09,372
冲啊青春街道Babystar！

7
00:01:10,148 --> 00:01:19,196
感谢您的观看

8
00:01:34,564 --> 00:01:39,708
月光下穷得叮当响的穷鬼

9
00:01:41,604 --> 00:01:46,300
今天的视频就到这里啦。记得下次也要来看哦！

10
00:01:46,692 --> 00:01:54,528
最近身体撞到卡罗比搞得腰酸背痛，但老子还能继续飙车服务啊！

11
00:01:54,528 --> 00:02:22,768
那天的光芒 让我们再次出发吧 青春街道 狂飙突进正当季 击破希望 就算变成订单也轻松搞定 追寻未知世界 潜水吧 所谓青春 就是不断追逐理想 所谓青春 无论几岁都不会褪色

12
00:02:22,768 --> 00:02:26,320
射门就是 坚持之前说过的话

13
00:02:26,320 --> 00:02:28,380
晚安

14
00:02:31,076 --> 00:02:32,764
大庆哥哥大人

15
00:02:32,804 --> 00:02:36,476
新绿青叶时节 您过得如何

16
00:02:36,484 --> 00:02:39,472
这边是喧嚣的教诲和前提歌渐渐着色

17
00:02:39,472 --> 00:02:41,980
群山已完全换上夏装

18
00:02:42,756 --> 00:02:45,948
自从哥哥大人上京已过去三个月

19
00:02:45,988 --> 00:02:48,400
七日姐姐大人和千纱姐姐大人

20
00:02:48,400 --> 00:02:51,836
没有给叔叔添麻烦吧

21
00