# **使用 Faster Whisper 高效转录 YouTube 视频**

[![Notebook](https://img.shields.io/static/v1?label=&message=在Colab中打开&color=blue&style=for-the-badge&logo=googlecolab)](https://colab.research.google.com/github/lewangdev/whisper-youtube/blob/main/faster_whisper_youtube.ipynb)
[![Repository](https://img.shields.io/static/v1?label=&message=查看代码仓库&color=blue&style=for-the-badge&logo=github)](https://github.com/lewangdev/faster_whisper_youtube)

**Faster-Whisper** 是对 OpenAI Whisper 模型的优化实现，它使用 CTranslate2 引擎进行快速推理。与原始 Whisper 相比，它的转录速度最多可提高4倍，同时减少了内存使用。此外，它还支持8位量化，以在 CPU 和 GPU 上实现更高的效率。

本项目旨在引导您使用 Faster Whisper 来转录 YouTube 视频。您可以自定义推理参数，或使用默认设置将转录稿和音频保存到您的 Google Drive。

## **1. 检查 GPU 环境** 🕵️

In [None]:
#@markdown Colab 分配的 GPU 类型会影响转录速度。FLOPS（每秒浮点运算次数）越高，处理速度越快。
#@markdown 您可以在 **“代码执行程序” → “更改运行时类型”** 中确保已选择 **“GPU”** 作为硬件加速器。

#@markdown | GPU 类型 | GPU 显存 | FP32 算力 (TFLOPS) | Colab 可用性     |
#@markdown |----------|----------|--------------------|------------------|
#@markdown | T4       | 16 GB    | 8.1                | 免费版           |
#@markdown | P100     | 16 GB    | 10.6               | Colab Pro        |
#@markdown | V100     | 16 GB    | 15.7               | Colab Pro (稀有) |

#@markdown **注意**: 如果需要更换 GPU 类型，可以“恢复出厂代码执行程序设置”后重新连接。

!nvidia-smi -L
!nvidia-smi

## **2. 安装依赖库** 🏗️

In [None]:
#@markdown 这将安装 Faster Whisper 和 YouTube 视频处理所需的库。该过程可能需要几分钟。
#@markdown 为了减少不必要的输出，我们使用 `-q` 标志进行静默安装。

# Colab GPU 实例通常预装了 CUDA 工具包，因此 libcublas 通常是可用的。
# 如果遇到 CUDA 相关错误，可以取消下面这行的注释。
# !apt-get install -y -qq libcublas11

!pip install -q faster-whisper yt-dlp ctranslate2

import sys
import warnings
from pathlib import Path
import os
import shutil
import re
import logging
import subprocess
import json # Not explicitly used in this cell, but good to keep if other parts might use it.

import torch
import yt_dlp
from faster_whisper import WhisperModel
from IPython.display import display, Markdown

# --- 设置日志记录 ---
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%H:%M:%S',
    stream=sys.stdout,
    force=True  # 强制重新配置，避免在 Colab 多次运行时出现问题
)

# --- 环境检查 ---
if torch.cuda.is_available():
    device = "cuda"
    try:
        gpu_info = !nvidia-smi --query-gpu=memory.total --format=csv,noheader,nounits
        vram_gb = int(gpu_info[0]) / 1024
        logging.info(f"✅ 检测到 GPU，设备: {device} | 显存: {vram_gb:.2f} GB")
    except Exception as e:
        logging.warning(f"✅ 检测到 GPU，设备: {device} | 无法获取显存信息: {e}")
else:
    device = "cpu"
    logging.warning("⚠️ 未检测到 GPU，将使用 CPU。处理速度会非常慢。")
    logging.warning("   请在“代码执行程序”->“更改运行时类型”中选择“GPU”作为硬件加速器。")

## **3. (可选) 挂载 Google Drive** 💾

In [None]:
#@markdown 挂载 Google Drive 以便永久保存转录结果。
from google.colab import drive

DRIVE_ROOT_PATH = Path('/content/drive')
MY_DRIVE_PATH = DRIVE_ROOT_PATH / 'My Drive' # Standard path for 'My Drive'
drive_whisper_path = None # Initialize to None

try:
    if not DRIVE_ROOT_PATH.exists() or not os.listdir(DRIVE_ROOT_PATH):
        drive.mount(str(DRIVE_ROOT_PATH), force_remount=True)
        logging.info("✅ Google Drive 挂载成功。")
    else:
        logging.info("✅ Google Drive 已挂载。")

    #@markdown ---
    #@markdown ### **设置保存路径**
    #@markdown 指定一个 Google Drive 中的文件夹路径 (相对于 'My Drive')，用于存放音频和字幕文件。
    #@markdown 这个文件夹如果不存在，程序会自动创建。
    drive_results_folder_name = 'Colab Notebooks/Faster Whisper YouTube/2025-06-15 起信论' #@param {type:'string'}

    if not MY_DRIVE_PATH.exists():
        # This case is unlikely if drive.mount succeeded but good for robustness
        logging.error(f"❌ 错误：在 '{DRIVE_ROOT_PATH}' 中未找到 'My Drive' 文件夹。")
        logging.error("   如果您的 Google Drive 主文件夹名称不同，这可能是一个罕见的问题。通常它应该是 'My Drive'。")
    else:
        # Remove leading slashes from user input to prevent issues with Path.joinpath
        clean_folder_name = drive_results_folder_name.lstrip('/')
        drive_whisper_path = MY_DRIVE_PATH / clean_folder_name
        drive_whisper_path.mkdir(parents=True, exist_ok=True)
        display(Markdown(f"**✅ 转录结果将保存至: `{drive_whisper_path}`**"))

except Exception as e:
    logging.error(f"❌ Google Drive 操作失败: {e}")
    display(Markdown(f"**❌ Google Drive 操作失败: {e}**"))

if not drive_whisper_path or not drive_whisper_path.is_dir():
    drive_whisper_path = None # Ensure it's None if any step failed
    display(Markdown("**⚠️ Google Drive 未成功配置，所有结果将只保存在当前 Colab 临时会话中。**"))

## **4. 选择模型** 🧠

In [None]:
#@markdown 选择一个预训练的 Whisper 模型。模型越大，准确率越高，但需要更多显存和计算时间。

#@markdown | 大小      | 参数量 | 仅英语模型   | 多语言模型     | 所需显存 | 相对速度 |
#@markdown |-----------|--------|--------------|----------------|----------|----------|
#@markdown | tiny      | 39 M   | `tiny.en`    | `tiny`         | ~1 GB    | ~32x     |
#@markdown | base      | 74 M   | `base.en`    | `base`         | ~1 GB    | ~16x     |
#@markdown | small     | 244 M  | `small.en`   | `small`        | ~2 GB    | ~6x      |
#@markdown | medium    | 769 M  | `medium.en`  | `medium`       | ~3 GB    | ~2x      |
#@markdown | large-v1  | 1.55 B | N/A          | `large-v1`     | ~5 GB    | 1x       |
#@markdown | large-v2  | 1.55 B | N/A          | `large-v2`     | ~5 GB    | 1x       |
#@markdown | large-v3  | 1.55 B | N/A          | `large-v3`     | ~5 GB    | 1x       |

model_size = 'large-v2' #@param ['tiny', 'tiny.en', 'base', 'base.en', 'small', 'small.en', 'medium', 'medium.en', 'large-v1', 'large-v2', 'large-v3']

#@markdown ---
#@markdown ### **计算精度设置**
#@markdown 选择模型的计算类型。这会影响速度和显存占用。
#@markdown - `float16`: **(推荐)** 适用于现代 GPU (T4, V100, A100)，速度和精度平衡。
#@markdown - `int8_float16`: 速度更快，显存占用更低，精度略有下降。
#@markdown - `int8`: 速度最快，显存占用最小，适合在 CPU 或显存非常有限的 GPU 上运行。
#@markdown - `float32`: 原始精度，速度最慢，显存占用最大，通常不需要。

compute_type = 'float16' #@param ['float16', 'float32', 'int8_float16', 'int8']

# 释放可能已加载的旧模型所占用的内存，以便加载新模型
if 'model' in globals() and isinstance(model, WhisperModel):
    logging.info("检测到已加载的模型，正在释放内存...")
    del model
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    logging.info("旧模型已从内存中释放。")

# 加载模型
logging.info(f"正在加载 {model_size} 模型，使用设备 {device} 和计算类型 {compute_type}...")
try:
    model = WhisperModel(model_size, device=device, compute_type=compute_type)
    display(Markdown(f'**✅ 模型 `{model_size}` 加载成功！**'))
except Exception as e:
    display(Markdown(f'**❌ 模型加载失败: {e}**'))
    logging.error(f"模型加载失败: {e}")
    logging.error("   请检查您选择的模型大小是否与您的 GPU 显存匹配。")
    logging.error("   对于免费版 Colab (T4 GPU)，'large' 模型可能因显存不足而失败，请尝试 'medium' 或更小的模型。")
    # Stop execution if model fails to load
    raise SystemExit("模型加载失败，请检查设置后重试。") from e

## **5. 选择媒体源并预处理** 📺

In [None]:
#@markdown ## **选择数据源**
#@markdown 选择视频/音频的来源（YouTube 或 Google Drive）。
source_type = 'Youtube video or playlist' #@param ['Youtube video or playlist', 'Google Drive file or folder']

#@markdown ---
#@markdown ### **YouTube 选项**
#@markdown 如果来源是 YouTube，请在此处填写 URL。**多个 URL 可以用英文逗号 (,) 分隔。**
youtube_urls_input = 'https://youtu.be/c-H-MYSDXQU' #@param {type:'string'}
#@markdown **播放列表范围 (可选):** 用于指定下载播放列表中的特定视频。例如 `1-3`, `5`, `8:12`。留空则下载整个列表。此设置将应用于所有输入的播放列表URL。
playlist_range = '' #@param {type:'string'}
#@markdown **使用 Cookie 文件?** 勾选以启用 Cookie 文件，用于下载需要登录或有年龄限制的视频。
use_youtube_cookie = False #@param {type:'boolean'}
#@markdown **Cookie 文件路径 (可选):** 如果勾选了上一项，请在此处提供 Cookie 文件的路径。请先将 `cookies.txt` 文件上传到 Colab 根目录 (`/content/`) 或 Google Drive，然后在此处填写完整路径，例如 `/content/cookies.txt` 或 `/content/drive/My Drive/cookies.txt`。
youtube_cookie_file_path = '' #@param {type:'string'}

#@markdown ---
#@markdown ### **Google Drive 选项**
#@markdown 如果来源是 Google Drive，请在此处填写文件或文件夹的路径（相对于您 Google Drive 的根目录，即 "My Drive"）。
#@markdown - **文件示例:** `My Videos/Lecture 1.mp4`
#@markdown - **文件夹示例:** `My Audiobooks/` (将处理该文件夹下的所有媒体文件)
gdrive_media_path = '' #@param {type:'string'}

# --- 变量初始化 ---
initial_media_paths = []  # 存储所有找到/下载的原始文件路径
files_to_transcribe = [] # 存储最终准备好进行转录的音频文件路径 (WAV 或 M4A)
WORKSPACE_DIR = Path('/content/transcription_workspace') # 用于下载/处理的本地工作目录
WORKSPACE_DIR.mkdir(exist_ok=True)

logging.info("🚀 音频预处理流程启动 🚀")

# ===================================================================
# 步骤 1: 根据来源获取媒体文件
# ===================================================================
if source_type == 'Youtube video or playlist':
    urls_to_download = [url.strip() for url in youtube_urls_input.split(',') if url.strip()]
    if not urls_to_download:
        logging.warning("  - ⚠️ YouTube URL 列表为空或无效。")
    else:
        logging.info(f"▶️ [1/3] 准备从 YouTube 下载 {len(urls_to_download)} 个链接...")

        ydl_opts = {
            'format': 'm4a/bestaudio/best', # 优先下载m4a格式
            'outtmpl': str(WORKSPACE_DIR / '%(title)s.%(ext)s'),
            'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'm4a'}],
            'quiet': False,
            'no_warnings': True,
            'ignoreerrors': True, # 在播放列表下载中，跳过下载失败的视频
        }
        if playlist_range:
            ydl_opts['playlist_items'] = playlist_range
            logging.info(f"  - 指定播放列表范围: {playlist_range}")

        if use_youtube_cookie and youtube_cookie_file_path:
            cookie_file = Path(youtube_cookie_file_path.strip())
            if cookie_file.exists():
                ydl_opts['cookiefile'] = str(cookie_file)
                logging.info(f"  - 正在使用 Cookie 文件: {cookie_file}")
            else:
                logging.warning(f"  - ⚠️ 指定的 Cookie 文件不存在: {cookie_file}，将不使用 Cookie。")
        elif use_youtube_cookie and not youtube_cookie_file_path:
            logging.warning("  - ⚠️ 已选择使用 Cookie，但未提供 Cookie 文件路径。将不使用 Cookie。")

        for i, current_url in enumerate(urls_to_download, 1):
            logging.info(f"  Downloading URL {i}/{len(urls_to_download)}: {current_url}")
            try:
                with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                    info_dict = ydl.extract_info(current_url, download=True)
                    if not info_dict:
                        # This might happen if all items in a playlist failed or were skipped
                        logging.warning(f"  - ⚠️ yt-dlp 未返回任何信息，下载可能已失败或被跳过: {current_url}")
                        continue

                    # Handle single video or playlist entries
                    entries = info_dict.get('entries') or ([info_dict] if info_dict else [])

                    downloaded_count_for_url = 0
                    for entry in entries:
                        if not entry: continue # Skip invalid entries
                        # Get filename as determined by yt-dlp after download/postprocessing
                        # ydl.prepare_filename might not give the final postprocessed name if 'outtmpl' has %(ext)s
                        # A more robust way is to list files or rely on yt-dlp's output structure if known,
                        # but for m4a extraction, it should be fairly predictable.
                        # Assuming 'requested_downloads' contains info about actual files if 'download=True'
                        # For simplicity, we'll scan WORKSPACE_DIR for newly added .m4a files after this call,
                        # or rely on `prepare_filename` being accurate enough for audio extraction.

                        # yt-dlp >= 2023.06.22 uses 'filepath' in the entry if downloaded
                        # For older versions or more complex scenarios, might need to scan the output dir.
                        downloaded_filepath_str = entry.get('filepath') # After postprocessing
                        if not downloaded_filepath_str:
                             # Fallback if 'filepath' is not available (e.g. older yt-dlp or weird entry)
                             # This will give the path *before* postprocessing might change extension
                             base_filename = Path(ydl.prepare_filename(entry))
                             # Assume postprocessor changes it to .m4a
                             downloaded_filepath_str = str(base_filename.with_suffix('.m4a'))

                        downloaded_file = Path(downloaded_filepath_str)

                        if downloaded_file.exists() and downloaded_file.is_file():
                            # Ensure it's in our workspace, yt-dlp might put it elsewhere if outtmpl is complex
                            if downloaded_file.parent != WORKSPACE_DIR:
                                target_file = WORKSPACE_DIR / downloaded_file.name
                                try:
                                   shutil.move(str(downloaded_file), target_file) # Use str() for older python compatibility with shutil
                                   downloaded_file = target_file
                                except Exception as e_move:
                                   logging.warning(f"  - ⚠️ 无法移动文件 {downloaded_file} 到工作目录: {e_move}")
                                   continue # Skip this file

                            if downloaded_file not in initial_media_paths:
                                initial_media_paths.append(downloaded_file)
                                logging.info(f"    - ✅ 已处理/找到: {downloaded_file.name}")
                                downloaded_count_for_url += 1
                            else:
                                logging.info(f"    - ℹ️ 已跳过重复文件: {downloaded_file.name}")
                        elif entry.get('extractor_key'): # If it's a valid entry but file not found
                            logging.warning(f"    - ⚠️ 文件下载后未找到: {entry.get('title', '未知标题')} (预期路径: {downloaded_file})")

                    if downloaded_count_for_url == 0 and entries:
                         logging.warning(f"  - ⚠️ URL {current_url} 未成功下载任何音频文件。可能是播放列表范围或错误导致。")

            except yt_dlp.utils.DownloadError as e:
                logging.error(f"❌ YouTube 下载失败 ({current_url}): {e}")
            except Exception as e:
                logging.error(f"❌ 下载 ({current_url}) 过程中发生未知错误: {e}")

elif source_type == 'Google Drive file or folder':
    logging.info(f"▶️ [1/3] 从 Google Drive 搜索文件: {gdrive_media_path}")
    if not drive_whisper_path: # Check if Drive is mounted and path configured from cell 3
         logging.error("❌ Google Drive 未成功挂载或输出路径未配置。请先运行 '挂载 Google Drive' 单元格。")
    elif not gdrive_media_path:
         logging.warning("⚠️ Google Drive 媒体路径为空，已跳过。")
    else:
        # MY_DRIVE_PATH should be defined in cell 3
        if 'MY_DRIVE_PATH' not in globals() or not MY_DRIVE_PATH.exists():
            logging.error("❌ 'MY_DRIVE_PATH' 未定义或不存在。请确保已成功运行 Google Drive 挂载单元格。")
        else:
            full_gdrive_media_path = MY_DRIVE_PATH / gdrive_media_path.lstrip('/')
            supported_formats = ['.mp4', '.m4a', '.mp3', '.wav', '.mkv', '.webm', '.flac', '.ogg', '.aac']

            if full_gdrive_media_path.is_file():
                if full_gdrive_media_path.suffix.lower() in supported_formats:
                    initial_media_paths.append(full_gdrive_media_path)
                    logging.info(f"  - ✅ 找到文件: {full_gdrive_media_path.name}")
                else:
                    logging.warning(f"  - ⚠️ 不支持的文件类型，已跳过: {full_gdrive_media_path.name}")
            elif full_gdrive_media_path.is_dir():
                logging.info(f"  - 正在搜索文件夹 '{full_gdrive_media_path.name}' 中的所有媒体文件...")
                found_files_in_dir = []
                for ext in supported_formats:
                    found_files_in_dir.extend(list(full_gdrive_media_path.rglob(f'*{ext}'))) # Use list() to eagerly evaluate
                    found_files_in_dir.extend(list(full_gdrive_media_path.rglob(f'*{ext.upper()}'))) # Case-insensitive search for extensions

                # Remove duplicates that might arise from rglobbing both lower and upper case
                unique_found_files = sorted(list(set(found_files_in_dir)))

                if not unique_found_files:
                    logging.warning("  - ⚠️ 在该文件夹及其子目录中未找到支持的媒体文件。")
                else:
                    initial_media_paths.extend(unique_found_files)
                    for f_path in unique_found_files:
                        logging.info(f"  - ✅ 找到文件: {f_path.relative_to(MY_DRIVE_PATH)}")
            else:
                logging.error(f"❌ 指定的 Google Drive 路径不存在: {full_gdrive_media_path}")

# ===================================================================
# 步骤 2: 文件格式验证与转换 (如果需要)
# ===================================================================
logging.info("\n▶️ [2/3] 验证并转换音频格式...")
if not initial_media_paths:
    logging.warning("  - ⚠️ 未找到任何媒体文件，流程中止。")
else:
    for original_file_path in initial_media_paths:
        logging.info(f"--- 处理: {original_file_path.name} ---")
        target_wav_path = WORKSPACE_DIR / f"{original_file_path.stem}.wav"

        # Determine the path of the file to be processed (might be on Drive or already in WORKSPACE_DIR)
        current_processing_file = original_file_path

        # If the original file is on Drive, copy it to the local workspace for processing
        # This avoids issues with ffmpeg directly on Drive paths and keeps originals safe.
        # Check if DRIVE_ROOT_PATH is defined and original_file_path is relative to it.
        if 'DRIVE_ROOT_PATH' in globals() and DRIVE_ROOT_PATH and original_file_path.resolve().is_relative_to(DRIVE_ROOT_PATH.resolve()):
             local_copy_path = WORKSPACE_DIR / original_file_path.name
             if local_copy_path.exists() and local_copy_path.stat().st_size == original_file_path.stat().st_size:
                 logging.info(f"  - 使用已存在于工作区的本地副本: {local_copy_path.name}")
                 current_processing_file = local_copy_path
             else:
                 try:
                     logging.info(f"  - 从 Drive 复制 '{original_file_path.name}' 到本地进行处理...")
                     shutil.copy(str(original_file_path), str(local_copy_path))
                     current_processing_file = local_copy_path
                 except Exception as e:
                     logging.error(f"  - ❌ 复制文件 '{original_file_path.name}' 失败: {e}，跳过此文件。")
                     continue
        elif original_file_path.parent != WORKSPACE_DIR : # If it's local but not in WORKSPACE_DIR (e.g. /content/)
            local_copy_path = WORKSPACE_DIR / original_file_path.name
            if local_copy_path.exists() and local_copy_path.stat().st_size == original_file_path.stat().st_size:
                 logging.info(f"  - 使用已存在于工作区的本地副本: {local_copy_path.name}")
                 current_processing_file = local_copy_path
            else:
                try:
                    logging.info(f"  - 将文件 '{original_file_path.name}' 移动/复制到工作目录...")
                    shutil.copy(str(original_file_path), str(local_copy_path)) # Copy to be safe
                    current_processing_file = local_copy_path
                except Exception as e:
                    logging.error(f"  - ❌ 移动/复制文件 '{original_file_path.name}' 到工作目录失败: {e}，跳过此文件。")
                    continue

        # Whisper can handle M4A directly, often high quality AAC. Let's prefer it.
        if current_processing_file.suffix.lower() == '.m4a':
            logging.info(f"  - ✅ 格式为 M4A，高质量音频，直接使用: {current_processing_file.name}")
            if current_processing_file not in files_to_transcribe:
                 files_to_transcribe.append(current_processing_file)
            continue

        # For other formats, convert to 16kHz mono WAV for best Whisper compatibility
        logging.info(f"  - 格式为 {current_processing_file.suffix.upper()}，正在转换为 16kHz 单声道 WAV ({target_wav_path.name})...")
        ffmpeg_cmd = [
            'ffmpeg', '-i', str(current_processing_file),
            '-y',          # Overwrite output files without asking
            '-vn',         # Disable video recording
            '-acodec', 'pcm_s16le', # Audio codec: PCM signed 16-bit little-endian
            '-ar', '16000', # Audio sample rate: 16kHz
            '-ac', '1',     # Audio channels: 1 (mono)
            str(target_wav_path)
        ]

        try:
            # Using subprocess.run for better control and error capture
            process = subprocess.run(ffmpeg_cmd, check=True, capture_output=True, text=True, encoding='utf-8')
            if target_wav_path.exists() and target_wav_path.stat().st_size > 0:
                if target_wav_path not in files_to_transcribe:
                    files_to_transcribe.append(target_wav_path)
                logging.info(f"  - ✅ 成功转换为: {target_wav_path.name}")
            else:
                # This case should ideally be caught by check=True if ffmpeg fails, but good to have.
                logging.error(f"  - ❌ 转换后文件不存在或为空。FFmpeg 输出: {process.stdout} \n FFmpeg 错误: {process.stderr}")
        except subprocess.CalledProcessError as e:
            logging.error(f"  - ❌ FFmpeg 转换失败! 命令: {' '.join(e.cmd)}")
            logging.error(f"     返回码: {e.returncode}")
            logging.error(f"     标准输出: {e.stdout}")
            logging.error(f"     标准错误: {e.stderr}")
        except FileNotFoundError:
            logging.error("  - ❌ FFmpeg 未找到。请确保 ffmpeg 已安装并位于系统的 PATH 中。在 Colab 中，它通常是预装的。")

# ===================================================================
# 步骤 3: 最终报告
# ===================================================================
print("\n" + "="*70)
logging.info("🎉 [3/3] 预处理完成 - 最终报告 🎉")
if files_to_transcribe:
    logging.info(f"总共准备好 {len(files_to_transcribe)} 个音频文件等待转录:")
    for i, f_path in enumerate(files_to_transcribe, 1):
        try:
            size_mb = f_path.stat().st_size / (1024 * 1024)
            print(f"  {i}. {f_path.name} (大小: {size_mb:.2f} MB, 路径: {f_path})")
        except FileNotFoundError:
            print(f"  {i}. {f_path.name} (文件在最终检查时未找到!)")
else:
    logging.warning("本次运行没有产出任何可转录的音频文件。请检查源设置或错误日志。")
print("="*70 + "\n")

## **6. 运行转录** 🚀

In [None]:
#@markdown ### **时间范围设置**
#@markdown 设置您想转录的音频起止时间。格式为 `HH:MM:SS` 或 `HH:MM:SS.mmm` (毫秒)。
start_time_str = '00:00:00' #@param {type:'string'}
end_time_str = '' #@param {type:'string'}
#@markdown 将 `end_time_str` 留空表示转录到音频末尾。

#@markdown **相对时间戳:** 勾选此项后，输出字幕的时间戳将从 00:00:00 开始计算（相对于 `start_time_str`）。
use_relative_time = False #@param {type:'boolean'}

#@markdown ---
#@markdown ### **任务与语言设置**
#@markdown - **任务 (Task):** `transcribe` (语音转文字) 或 `translate` (翻译成英语)。
#@markdown - **语言 (Language):** 指定音频的语言 (如 'zh' 表示中文, 'en' 表示英语)。设为 `auto` 可自动检测，但指定语言能提高准确度。
transcription_task = 'transcribe' #@param ['transcribe', 'translate']
audio_language = 'auto' #@param ['auto', 'en', 'zh', 'ja', 'fr', 'de', 'es', 'ru', 'ko', 'it'] {allow-input: true}

#@markdown ---
#@markdown ### **高级参数**
#@markdown - **提示 (Initial Prompt):** 给模型一些上下文提示，例如专有名词或格式要求，可以提高特定词汇的识别准确率。（下方是一个详细示例，请根据您的内容修改）
initial_prompt_text = "请将以下佛教讲经音频准确转录，无需翻译。注意以下要求：   - **专有名词优先**：     - 名号：释迦牟尼佛、阿弥陀佛、药师佛、观世音菩萨、地藏菩萨、大圣欢喜天、欢喜天、大黑天、玛哈嘎啦     - 术语：护摩、灌顶、菩提心、般若波罗蜜、阿耨多罗三藐三菩提、涅槃、五蕴、十二因缘、正念、观呼吸、数息法、止观、昏沉、掉举、五盖（贪嗔痴慢疑）、四念处、轻安、气脉、妄念、唯识、唯识学、涅槃、般若、起信论       - 经典名：《金刚经》《心经》《法华经》《楞严经》（不加书名号也可）   - **保留原文风格**：       - 文言虚词（如\"之、乎、者、也\"）全部保留     - 重复性诵念需完整记录（如\"南无阿弥陀佛×10\"）   - **标记说明**：     - 听不清的咒语用 [咒语不明] 标注     - 法会背景声（如引磬、木鱼）用 [法器声] 标注" #@param {type:'string'}
#@markdown - **启用 VAD (Voice Activity Detection):** 语音活动检测。可以过滤掉长段的静音，提高效率和准确率。**强烈推荐开启**。
enable_vad_filter = True #@param {type:'boolean'}
#@markdown - **词级时间戳 (Word-level timestamps):** 生成每个词的时间戳，而不是每句话。这会使 SRT 文件更长，但更精确。
enable_word_level_timestamps = False #@param {type:'boolean'}

#@markdown ---
#@markdown ### **输出设置**
#@markdown 选择输出文件的格式。`both` 会同时生成 `.txt` 和 `.srt` 文件。
output_file_format = 'both' #@param ["srt", "txt", "both"]

# --- 全局变量 ---
generated_transcription_files = []

# --- Helper Functions ---
def time_str_to_seconds_float(time_str: str) -> Optional[float]: # <--- 优化点: 返回 Optional[float] 类型提示更准确
    """将 HH:MM:SS.mmm 格式的时间字符串转换为秒 (float)。返回 None 表示未设置。"""
    if not time_str or not time_str.strip():
        return None # <--- 优化点: 用 None 代替 -1.0，更符合 Python 习惯
    match = re.match(r'(\d{1,2}):(\d{2}):(\d{2})(?:[.,](\d{1,3}))?$', time_str.strip())
    if not match:
        raise ValueError(f'无效的时间格式: "{time_str}"。请使用 HH:MM:SS 或 HH:MM:SS.mmm')
    h, m, s, ms_str = match.groups()
    ms = int(ms_str.ljust(3, '0')) / 1000.0 if ms_str else 0.0
    return int(h) * 3600 + int(m) * 60 + int(s) + ms

def seconds_to_srt_timestamp(seconds: float) -> str:
    """将秒数转换为 SRT 时间戳格式 (HH:MM:SS,mmm)"""
    if seconds < 0: seconds = 0.0
    h = int(seconds // 3600)
    m = int((seconds % 3600) // 60)
    s = int(seconds % 60)
    ms = int((seconds * 1000) % 1000)
    return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"

# --- 检查与准备 ---
if 'model' not in globals() or not isinstance(model, WhisperModel):
    display(Markdown("**❌ 错误：Whisper 模型未加载。**"))
    raise SystemExit("模型未加载")

if 'files_to_transcribe' not in locals() or not files_to_transcribe:
    display(Markdown("**❌ 错误：没有找到可供转录的文件。**"))
    raise SystemExit("无可转录文件")

try:
    clip_start_seconds = time_str_to_seconds_float(start_time_str)
    clip_end_seconds = time_str_to_seconds_float(end_time_str)

    start_display = start_time_str if start_time_str.strip() else "音频开始"
    end_display = end_time_str if end_time_str.strip() else "音频末尾"
    display(Markdown(f'**处理时间范围: `{start_display}` 到 `{end_display}`**'))

    if clip_start_seconds is not None and clip_end_seconds is not None and clip_start_seconds >= clip_end_seconds:
         display(Markdown(f"**⚠️ 警告: 开始时间 ({start_time_str}) 大于或等于结束时间 ({end_time_str})。将转录从开始时间到音频末尾。**"))
         clip_end_seconds = None # 设置为 None 表示转录到结尾

    display(Markdown(f'**时间戳模式:** `{"相对模式 (从0开始)" if use_relative_time else "绝对模式 (基于原始音频)"}`'))
except ValueError as e_time:
    display(Markdown(f'**❌ 时间格式错误: {e_time}**'))
    raise

should_save_to_drive = 'drive_whisper_path' in globals() and drive_whisper_path and drive_whisper_path.is_dir()
if should_save_to_drive:
    logging.info(f"✅ 检测到 Google Drive 路径，转录结果将保存至: {drive_whisper_path}")
else:
    logging.warning("⚠️ 未配置或无法访问 Google Drive 路径，转录结果将仅保存在当前 Colab 临时环境中。")

# --- 主转录循环 ---
for audio_idx, audio_file_path in enumerate(files_to_transcribe):
    display(Markdown(f"--- \n### 🎤 **正在转录 ({audio_idx+1}/{len(files_to_transcribe)}): `{audio_file_path.name}`**"))

    transcription_params = {
        "beam_size": 5,
        "task": transcription_task,
        # <--- 优化点: strip().or None 确保空字符串或纯空格也传递 None
        "initial_prompt": initial_prompt_text.strip() or None,
        "word_timestamps": enable_word_level_timestamps,
        "vad_filter": enable_vad_filter,
    }
    if audio_language and audio_language.lower() != 'auto':
        transcription_params['language'] = audio_language

    try:
        segments_iterable, audio_info = model.transcribe(str(audio_file_path), **transcription_params)
        display(Markdown(f"**- 语言检测:** `{audio_info.language}` (置信度: {audio_info.language_probability:.2f}) | **音频时长:** {seconds_to_srt_timestamp(audio_info.duration)}"))

        do_create_srt = output_file_format in ['srt', 'both']
        do_create_txt = output_file_format in ['txt', 'both']
        base_output_filename = audio_file_path.stem
        local_srt_file_path = WORKSPACE_DIR / f"{base_output_filename}.srt"
        local_txt_file_path = WORKSPACE_DIR / f"{base_output_filename}.txt"

        srt_entry_index = 1
        processed_segments_count = 0

        # <--- 优化点: 预先计算时间边界和偏移，让循环内代码更简洁
        clip_start = clip_start_seconds if clip_start_seconds is not None else 0.0
        clip_end = clip_end_seconds if clip_end_seconds is not None else audio_info.duration
        time_offset = clip_start if use_relative_time else 0.0

        with open(local_srt_file_path, 'w', encoding='utf-8') if do_create_srt else open(os.devnull, 'w') as srt_file, \
             open(local_txt_file_path, 'w', encoding='utf-8') if do_create_txt else open(os.devnull, 'w') as txt_file:

            for segment in segments_iterable:
                if segment.end < clip_start or segment.start > clip_end:
                    continue

                processed_segments_count += 1
                text_content = segment.text.strip()
                if do_create_txt:
                    txt_file.write(text_content + '\n')

                if do_create_srt:
                    time_units_to_process = segment.words if enable_word_level_timestamps and hasattr(segment, 'words') else [segment]

                    for unit in time_units_to_process:
                        unit_start = max(unit.start, clip_start)
                        unit_end = min(unit.end, clip_end)

                        if unit_end <= unit_start: continue

                        unit_text = getattr(unit, 'word', unit.text).strip()
                        if not unit_text: continue

                        # <--- 优化点: 使用预先计算好的 time_offset
                        display_start_seconds = unit_start - time_offset
                        display_end_seconds = unit_end - time_offset

                        # 确保时间戳非负且有最小间隔
                        if display_end_seconds <= display_start_seconds:
                            display_end_seconds = display_start_seconds + 0.001

                        srt_timestamp_start = seconds_to_srt_timestamp(display_start_seconds)
                        srt_timestamp_end = seconds_to_srt_timestamp(display_end_seconds)

                        print(f"[{srt_timestamp_start} --> {srt_timestamp_end}] {unit_text}")
                        srt_file.write(f"{srt_entry_index}\n")
                        srt_file.write(f"{srt_timestamp_start} --> {srt_timestamp_end}\n")
                        srt_file.write(f"{unit_text}\n\n")
                        srt_entry_index += 1

        if processed_segments_count == 0:
             display(Markdown(f"**ℹ️ 注意: 对于文件 `{audio_file_path.name}`，在指定的时间范围 `{start_display}`-`{end_display}` 内没有找到可转录的音频片段。**"))

        # <--- 优化点: 使用循环处理文件保存和复制，避免代码重复
        output_files_to_process = []
        if do_create_srt:
            output_files_to_process.append(("SRT", local_srt_file_path))
        if do_create_txt:
            output_files_to_process.append(("TXT", local_txt_file_path))

        for file_type, local_path in output_files_to_process:
            if local_path.exists() and local_path.stat().st_size > 0:
                generated_transcription_files.append(str(local_path))
                display(Markdown(f"**📄 {file_type} 文件已保存: `{local_path}`**"))
                if should_save_to_drive:
                    try:
                        shutil.copy(str(local_path), str(drive_whisper_path))
                        display(Markdown(f"**↳ 已复制到 Google Drive: `{drive_whisper_path / local_path.name}`**"))
                    except Exception as e_copy:
                        display(Markdown(f"**❌ 复制 {file_type} 到 Google Drive 失败: {e_copy}**"))
            elif processed_segments_count > 0:
                # 只有在处理过片段但文件为空或不存在时才警告
                display(Markdown(f"**⚠️ 警告: 为 `{audio_file_path.name}` 处理了片段，但没有生成有效的 {file_type} 文件。请检查设置。**"))


    except Exception as e_transcribe:
        display(Markdown(f"**❌ 转录文件 `{audio_file_path.name}` 时发生严重错误: {e_transcribe}**"))
        logging.error(f"转录文件 {audio_file_path.name} 失败: {e_transcribe}", exc_info=True)
        continue

display(Markdown("--- \n**🎉 所有选定文件的转录处理已完成!** ---"))

## **7. 打包并下载转录结果** 📥

In [None]:
#@markdown 运行此单元格，会将上方生成的字幕或文本文件打包成 **一个 ZIP 压缩包**，并自动下载到您的电脑。

from google.colab import files
import zipfile

#@markdown ---
#@markdown **选择您想打包下载的文件类型：**
download_file_type_choice = 'both' #@param ["srt_only", "txt_only", "both"]

# 检查生成文件列表是否存在
if 'generated_transcription_files' not in locals() or not generated_transcription_files:
    display(Markdown("### ⚠️ 没有找到可下载的文件。\n请先返回并成功运行 **步骤 6 (运行转录)** 来生成转录文件。" ))
else:
    files_to_include_in_zip = []
    # 筛选需要打包的文件
    for file_path_str in generated_transcription_files:
        file_to_check = Path(file_path_str)
        should_add_file = False
        if download_file_type_choice == 'both':
            should_add_file = True
        elif download_file_type_choice == 'srt_only' and file_to_check.suffix.lower() == '.srt':
            should_add_file = True
        elif download_file_type_choice == 'txt_only' and file_to_check.suffix.lower() == '.txt':
            should_add_file = True

        if should_add_file and file_to_check.exists() and file_to_check.stat().st_size > 0:
            files_to_include_in_zip.append(file_to_check)
        elif should_add_file and (not file_to_check.exists() or file_to_check.stat().st_size == 0):
            logging.warning(f"文件 '{file_to_check.name}' 符合下载类型但不存在或为空，将不包含在压缩包中。")

    # 如果有文件需要打包
    if files_to_include_in_zip:
        zip_output_filename = "transcription_results.zip"
        # WORKSPACE_DIR should be defined in cell 5, zip will be created there temporarily
        zip_full_path = WORKSPACE_DIR / zip_output_filename

        display(Markdown(f"**⏳ 正在将 {len(files_to_include_in_zip)} 个文件打包到 `{zip_output_filename}`...**"))

        try:
            # 创建 ZIP 文件并写入
            with zipfile.ZipFile(zip_full_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
                for file_path_to_zip in files_to_include_in_zip:
                    # arcname=file_path_to_zip.name ensures only filename is used in zip, not full path
                    zipf.write(file_path_to_zip, arcname=file_path_to_zip.name)
                    logging.info(f"已添加 '{file_path_to_zip.name}' 到压缩包。")

            logging.info(f"压缩包 '{zip_output_filename}' 创建成功于 {zip_full_path}，准备下载。")
            display(Markdown(f"**✅ 打包完成！正在触发下载 `{zip_output_filename}`...**"))

            # 下载 ZIP 文件 from its location in WORKSPACE_DIR
            files.download(str(zip_full_path))

        except Exception as e_zip:
            logging.error(f"创建或下载压缩包时出错: {e_zip}")
            display(Markdown(f"**❌ 创建或下载压缩包时发生错误：**\n`{e_zip}`"))
    else:
        display(Markdown(f"**ℹ️ 根据您的选择 `{download_file_type_choice}`，没有找到匹配的、非空的文件进行打包。**"))

## **8. (可选) 清理临时文件** 🧹

In [None]:
#@markdown 运行此单元格以删除在本会话中 `/content/transcription_workspace` 目录下下载的音频和生成的字幕文件。
#@markdown **这不会删除您保存在 Google Drive 中的文件，也不会删除 Colab 根目录下 (`/content/`) 的其他文件，例如上传的 cookie 文件。**

# WORKSPACE_DIR should be defined in Cell 5
if 'WORKSPACE_DIR' in globals() and WORKSPACE_DIR.exists():
    try:
        shutil.rmtree(WORKSPACE_DIR)
        display(Markdown(f'**✅ 临时工作目录 `{WORKSPACE_DIR}` 已被清除。**'))
        # Recreate the directory for subsequent runs in the same session
        WORKSPACE_DIR.mkdir(exist_ok=True)
    except Exception as e_clean:
        display(Markdown(f'**❌ 清除目录 `{WORKSPACE_DIR}` 时出错: {e_clean}**'))
else:
    display(Markdown('**ℹ️ 未找到临时工作目录，或 `WORKSPACE_DIR` 变量未定义，无需清除。**'))

# Optional: Clean other specific temp files if they were ever created directly in /content (legacy behavior)
logging.info("正在清理根目录下可能残留的旧式临时文件 (如果存在)... ")
legacy_temp_files_pattern = "/content/*.srt /content/*.txt /content/*.m4a /content/*.wav /content/*.zip"
try:
    # Use shell command for pattern matching, silence errors if files don't exist
    subprocess.run(f"rm -f {legacy_temp_files_pattern}", shell=True, check=False, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL)
    logging.info(f"旧式临时文件清理尝试完成。")
except Exception as e_legacy_clean:
    logging.warning(f"清理旧式临时文件时出现小问题: {e_legacy_clean}")