# video-translation-service（Colab + T4 GPU）一键安装运行

目标：从直链下载 `1.mp4` → 生成中文字幕 `1_zh.ass`（不启用润色）。

注意：首次运行会下载 Whisper + 翻译模型（可能几 GB），需要等待一段时间。

In [None]:
# 直接把你的直链贴到这里即可
VIDEO_URL = "https://xxx.com/d/存储/1.mp4"

In [None]:
#@title 0) 确认已启用 GPU（Runtime -> Change runtime type -> GPU -> T4）
!nvidia-smi -L

import sys
import torch

print("python:", sys.version)
print("torch:", torch.__version__)
print("cuda_available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("gpu:", torch.cuda.get_device_name(0))
else:
    raise RuntimeError("未检测到GPU：请在 Colab 切换到 GPU(T4) 运行时")


In [None]:
#@title 1) git clone 项目
# TODO: 改成你的仓库地址（私有仓库可用：https://<TOKEN>@github.com/<org>/<repo>.git）
REPO_URL = "https://github.com/MAE5blog/video-translation-service.git"
BRANCH = "main"
REPO_DIR = "/content/video-translation-service"

import os
import shutil

if os.path.exists(REPO_DIR):
    shutil.rmtree(REPO_DIR)

!git clone --depth 1 -b {BRANCH} {REPO_URL} {REPO_DIR}
%cd {REPO_DIR}


In [None]:
#@title 2) 安装系统依赖（ffmpeg + 字体）
!apt-get update -y
!apt-get install -y ffmpeg fonts-noto-cjk libsndfile1 build-essential cmake ninja-build
!ffmpeg -version | head -n 2


In [None]:
#@title 3) 安装 Python 依赖
# 说明：Colab 自带 CUDA 版 torch，避免从 requirements.txt 里重复安装 torch（否则可能被换成CPU版/或耗时升级）
!python -m pip install -U pip
!grep -vE '^torch' requirements.txt > /tmp/requirements_no_torch.txt
!python -m pip install -r /tmp/requirements_no_torch.txt

# 可选：人声分离（Demucs）用于嘈杂/背景音乐场景
!python -m pip install demucs

# torchaudio 新版本保存音频需要 torchcodec（否则 Demucs 会报错）
!python -m pip install torchcodec

# OpenList/AList 大文件上传：requests 默认 multipart 会把文件读进内存，需用 requests-toolbelt 流式上传
!python -m pip install requests-toolbelt

# 预编译 llama-cpp-python wheel（可复用，优先下载）
from pathlib import Path
import urllib.request

LLAMA_WHEEL_URL = "https://oplist.mae5.com/d/gdrive_lz26xg/share/llama_cpp_python-0.3.16-cp312-cp312-linux_x86_64.whl?sign=s08ZZHeakHFTTevv8Vja5I-6HPXyT4ojOHMesEXpZUQ=:0"
LLAMA_WHEEL_DIR = Path("/content/llama_wheels")
LLAMA_WHEEL_DIR.mkdir(parents=True, exist_ok=True)
LLAMA_WHEEL_PATH = LLAMA_WHEEL_DIR / "llama_cpp_python-0.3.16-cp312-cp312-linux_x86_64.whl"
if not LLAMA_WHEEL_PATH.exists():
    try:
        print("Downloading prebuilt llama-cpp-python wheel ...")
        urllib.request.urlretrieve(LLAMA_WHEEL_URL, LLAMA_WHEEL_PATH)
    except Exception as e:
        print("Prebuilt wheel download failed, will build from source:", e)
# GGUF/llama.cpp 翻译（默认模型为 GGUF 时需要，CPU 也可用）
# wheel 会保存在 /content/llama_wheels，便于上传复用
!bash -lc 'LLAMA_WHEEL_DIR=/content/llama_wheels; mkdir -p "$LLAMA_WHEEL_DIR"; if ls "$LLAMA_WHEEL_DIR"/llama_cpp_python-*.whl >/dev/null 2>&1; then python -m pip install "$LLAMA_WHEEL_DIR"/llama_cpp_python-*.whl; else MAX_JOBS=2 CMAKE_ARGS="-DGGML_CUDA=on -DCMAKE_CUDA_ARCHITECTURES=75" FORCE_CMAKE=1 python -m pip wheel llama-cpp-python -w "$LLAMA_WHEEL_DIR" -v; python -m pip install "$LLAMA_WHEEL_DIR"/llama_cpp_python-*.whl; fi'
# 如果编译失败，可改用 CPU 版：!python -m pip install llama-cpp-python --no-cache-dir -v



In [None]:
#@title 4) 生成 config.ini（GPU + 中文 + 不润色）
from pathlib import Path

# 分块识别开关（默认关闭）
# 说明：分块越小，越容易把单词/句子切开；但分块越大，越容易触发服务端 500/OOM。
# 建议从 900s（15分钟）起步，不稳定再降到 600/300。
ENABLE_ASR_CHUNKING = False  # True=分块识别（显示进度条/降低长音频500/OOM）
ASR_CHUNK_SEC = 900
ASR_CHUNK_OVERLAP_SEC = 1.0

# ReazonSpeech/transformers 内部分块（仅对 transformers 后端生效）
# 默认 30s/5s 可显著降低长音频 OOM/掉线风险；如稳定可调大或设 0 禁用
ASR_TRANSFORMERS_CHUNK_SEC = 90
ASR_TRANSFORMERS_STRIDE_SEC = 5.0

# Demucs 人声分离分段秒数（仅对超长/超大音频触发；越大越不容易出现边界接缝，但越吃内存）
# 如果仍然 OOM，可降到 600/300；如果视频特别长但内存足够，可升到 3600。
DEMUCS_CHUNK_SEC = 1800

# 人声+混音融合比例（0=仅人声；0.2=加入20%原始混音，减少漏词/空洞）
# 建议范围：0.1 ~ 0.4；越大越接近原音（噪声也会更多）
VOCAL_MIX_RATIO = 0.2

# 指定 ASR 音频语言（auto=自动检测；填错会变差）
# 备选示例：auto / en / zh / ja / ko / de / fr / es / ru / ar
ASR_LANGUAGE = "ja"

# ASR 回退到原始混音（人声分离出现长空洞/提前结束时）
ASR_FALLBACK_TO_MIX = True
# 触发阈值：最大空洞 / 末尾缺失（秒）
ASR_FALLBACK_MAX_GAP_SEC = 45
ASR_FALLBACK_END_DIFF_SEC = 60

# ASR 模型：默认 reazonspeech（日语优化，transformers 后端）
# 其它语言可改为 medium / large-v3；或自定义：reazonspeech:HF模型名
ASR_MODEL_SIZE = "reazonspeech"

# 字幕时间轴修复（避免长静音字幕滞留；参数太激进可能导致字幕提前结束）
# 一般无需改动；若字幕提前结束：先把 SUBTITLE_LINGER_KEEP_RATIO 调大（例如 6），或直接 SUBTITLE_FIX_LINGER=False。
SUBTITLE_FORMAT = "ass"  # 字幕格式：srt / ass（推荐 ass）
SUBTITLE_FIX_LINGER = True  # True=启用“滞留修复”；False=完全按 ASR 原始时间轴输出
SUBTITLE_MIN_DURATION_SEC = 1.2  # 单条字幕最短显示秒数（避免一闪而过）
SUBTITLE_MAX_DURATION_SEC = 20.0  # 单条字幕最长显示秒数（用于限制长静音滞留）
SUBTITLE_CHARS_PER_SEC = 5.0  # 阅读速度估算：每秒字符数（越大越快->字幕更短）
SUBTITLE_LINGER_SLACK_SEC = 0.8  # 容忍额外时长（秒），减少误判
SUBTITLE_LINGER_TRIGGER_SEC = 10.0  # 仅当原始持续时间>=该值才考虑收缩
SUBTITLE_LINGER_TRIGGER_RATIO = 6.0  # 仅当 原始时长 > 可读时长*ratio 才触发（越大越不容易触发）
SUBTITLE_LINGER_KEEP_RATIO = 4.0  # 触发后保留：可读时长*ratio（越大越不容易提前结束）

config_text = f"""[API]
deepseek_api_key =

[Service]
host = 127.0.0.1
port = 50515
lazy_load_models = true
manage_models = true
unload_models_after_tasks = true

[Models]
# 如果显存不够/加载太慢，可改：asr_model_size = small
asr_model_size = {ASR_MODEL_SIZE}

# 翻译模型：默认使用 GGUF 量化（T4 更稳）；如显存不够可改为 nllb-200-distilled-1.3B；GGUF 用法：gguf:/path/to/model.gguf（需安装 llama-cpp-python）
translation_model = gguf:hf:SakuraLLM/Sakura-7B-Qwen2.5-v1.0-GGUF@sakura-7b-qwen2.5-v1.0-iq4xs.gguf

use_gpu = true
beam_size = 5

[GPU]
# 在 GPU 重任务前清理 CUDA 缓存，降低 OOM 概率（略慢）
clear_cuda_cache_before_tasks = true

[ASR]
# 指定音频语言（可提升嘈杂场景稳定性；填错会变差）
language = {ASR_LANGUAGE}

# 分块识别：显示进度条，也可降低长音频导致的 500/OOM
chunk_sec = {ASR_CHUNK_SEC if ENABLE_ASR_CHUNKING else 0}
# 重叠一点点，避免切在单词中间（会产生少量重复，程序会尽量去重）
chunk_overlap_sec = {ASR_CHUNK_OVERLAP_SEC}

# transformers 内部分块（仅对 reazonspeech/transformers 后端生效）
transformers_chunk_sec = {ASR_TRANSFORMERS_CHUNK_SEC}
transformers_stride_sec = {ASR_TRANSFORMERS_STRIDE_SEC}

# 人声分离回退：检测到长空洞/末尾缺失时，改用原始混音识别
fallback_to_mix = {str(ASR_FALLBACK_TO_MIX).lower()}
fallback_max_gap_sec = {ASR_FALLBACK_MAX_GAP_SEC}
fallback_end_diff_sec = {ASR_FALLBACK_END_DIFF_SEC}

[Translation]
default_target_language = zh
use_deepseek_polish = false

[Subtitles]
format = {SUBTITLE_FORMAT}
fix_linger = {str(SUBTITLE_FIX_LINGER).lower()}
min_duration_sec = {SUBTITLE_MIN_DURATION_SEC}
max_duration_sec = {SUBTITLE_MAX_DURATION_SEC}
chars_per_sec = {SUBTITLE_CHARS_PER_SEC}
linger_slack_sec = {SUBTITLE_LINGER_SLACK_SEC}
linger_trigger_sec = {SUBTITLE_LINGER_TRIGGER_SEC}
linger_trigger_ratio = {SUBTITLE_LINGER_TRIGGER_RATIO}
linger_keep_ratio = {SUBTITLE_LINGER_KEEP_RATIO}

[Audio]
# 人声分离（Demucs）：改善背景音乐/嘈杂场景识别
enable_vocal_separation = true
vocal_separation_model = htdemucs
# 默认用 cuda 加速；如遇 OOM 可改为 cpu
vocal_separation_device = cuda
vocal_separation_chunk_sec = {DEMUCS_CHUNK_SEC}
vocal_mix_ratio = {VOCAL_MIX_RATIO}
"""

Path("config.ini").write_text(config_text, encoding="utf-8")
print("wrote config.ini")


In [None]:
#@title 5) 后台启动服务（不占用单元格）
import os
import pathlib
import signal
import subprocess
import sys
import time

import requests

SERVICE_URL = "http://127.0.0.1:50515"
FORCE_RESTART = False  # True=重启服务并生成 server.log（用于排查 500）
RETURN_TRACEBACK = False  # True=服务端 500 时返回 JSON traceback（更容易定位问题）

def health():
    try:
        return requests.get(f"{SERVICE_URL}/health", timeout=2).json()
    except Exception:
        return None

def stop_running_server():
    pid_path = pathlib.Path("server.pid")
    if pid_path.exists():
        try:
            pid = int(pid_path.read_text().strip())
            os.kill(pid, signal.SIGTERM)
            print("killed by server.pid:", pid)
        except Exception as e:
            print("failed to kill by server.pid:", e)

    # 兜底：按端口杀（Colab/Linux）
    subprocess.run(["bash", "-lc", "fuser -k 50515/tcp || true"], check=False)

h = health()
if h and FORCE_RESTART:
    print("force restart: stopping existing server ...")
    stop_running_server()
    time.sleep(2)
    h = health()

if not h:
    env = os.environ.copy()
    if RETURN_TRACEBACK:
        env["VTS_RETURN_TRACEBACK"] = "1"
    p = subprocess.Popen(
        [sys.executable, "server_optimized.py"],
        stdout=open("server.log", "wb"),
        stderr=subprocess.STDOUT,
        start_new_session=True,
        env=env,
    )
    pathlib.Path("server.pid").write_text(str(p.pid))
    print("server started, pid:", p.pid)
else:
    print("server already running:", h)
    if not pathlib.Path("server.log").exists():
        print("NOTE: 未找到 server.log；如需抓取服务端报错，请将 FORCE_RESTART=True 再运行本单元格。")


In [None]:
#@title 6) 等待服务启动（模型按需加载）
import json
import os
import time
from pathlib import Path

import requests

SERVICE_URL = "http://127.0.0.1:50515"
pid_path = Path("server.pid")
log_path = Path("server.log")

def pid_alive(pid: int) -> bool:
    try:
        os.kill(pid, 0)
        return True
    except Exception:
        return False

def tail_log(max_lines: int = 120) -> str:
    if not log_path.exists():
        return "(server.log not found)"
    try:
        lines = log_path.read_text(errors="ignore").splitlines()
        return "\n".join(lines[-max_lines:])
    except Exception as e:
        return f"(failed to read server.log: {e})"

for i in range(1800):  # 最多等 3600 秒
    try:
        h = requests.get(f"{SERVICE_URL}/health", timeout=5).json()
    except Exception as e:
        if i % 5 == 0:
            print(f"{i*2:>4}s", "waiting for server...", repr(e))
        if pid_path.exists():
            try:
                pid = int(pid_path.read_text().strip() or "0")
            except Exception:
                pid = 0
            if pid and not pid_alive(pid):
                print("\nserver process exited; last logs:\n")
                print(tail_log())
                raise
        time.sleep(2)
        continue

    if h.get("status") == "ok":
        print("HEALTH:\n", json.dumps(h, ensure_ascii=False, indent=2))
        if not h.get("ready"):
            print("NOTE: 未预加载模型，将在 ASR/翻译前按需加载。")
        break
    if i % 5 == 0:
        print(f"{i*2:>4}s", h.get("phase"), h.get("progress"), h.get("message"))
    if h.get("phase") == "error":
        raise RuntimeError(h.get("error") or "模型加载失败，请查看 server.log")
    time.sleep(2)
else:
    raise TimeoutError("等待服务就绪超时：请查看 server.log")


In [None]:
#@title 7) 下载测试视频（自动使用链接文件名）

OUT_PATH = ""  # 留空=使用链接里的文件名
FORCE = False  # True=总是重新下载

import urllib.parse
from pathlib import Path

import requests

try:
    from tqdm.auto import tqdm
except Exception:
    tqdm = None

def guess_filename(url: str) -> str:
    parts = urllib.parse.urlsplit(url)
    name = Path(urllib.parse.unquote(parts.path)).name
    if not name:
        q = urllib.parse.parse_qs(parts.query or "")
        for key in ("filename", "file", "name"):
            if q.get(key):
                name = q[key][0]
                break
    if not name:
        name = "video.mp4"
    return name

def normalize_url(url: str) -> str:
    parts = urllib.parse.urlsplit(url)
    # 对 path 做编码，避免包含中文路径时部分工具报错
    path = urllib.parse.quote(parts.path)
    return urllib.parse.urlunsplit((parts.scheme, parts.netloc, path, parts.query, parts.fragment))

out = Path(OUT_PATH) if OUT_PATH else Path(guess_filename(VIDEO_URL))
VIDEO_FILE = str(out)  # 给后续单元格使用
print("save as:", out)
if out.exists() and out.stat().st_size > 0 and not FORCE:
    print("exists:", out, out.stat().st_size)
else:
    url = normalize_url(VIDEO_URL)
    with requests.get(url, stream=True, timeout=60) as r:
        r.raise_for_status()
        total = int(r.headers.get("content-length") or 0)
        if tqdm and total > 0:
            pbar = tqdm(total=total, unit="B", unit_scale=True, desc="download")
        else:
            pbar = None
        with open(out, "wb") as f:
            for chunk in r.iter_content(chunk_size=1024 * 1024):
                if not chunk:
                    continue
                f.write(chunk)
                if pbar:
                    pbar.update(len(chunk))
        if pbar:
            pbar.close()
    print("downloaded:", out, out.stat().st_size)


In [None]:
#@title 8) 翻译生成中文字幕（不润色）
from pathlib import Path

TARGET_LANG = "zh"
VIDEO_PATH = Path(globals().get("VIDEO_FILE", "1.mp4"))
print("video:", VIDEO_PATH)

!python batch_translate.py "{VIDEO_PATH}" -t {TARGET_LANG} --translation-only --subtitle-format ass

SUB_PATH = VIDEO_PATH.with_name(f"{VIDEO_PATH.stem}_{TARGET_LANG}.ass")
globals()["TARGET_LANG"] = TARGET_LANG
globals()["SUBTITLE_FILE"] = str(SUB_PATH)
print("subtitle:", SUB_PATH)


In [None]:
#@title 9) 合并字幕与视频（生成带字幕视频）
import json
import subprocess
from pathlib import Path

VIDEO_PATH = Path(globals().get("VIDEO_FILE", "1.mp4"))
TARGET_LANG = globals().get("TARGET_LANG", "zh")

SUB_PATH = Path(globals().get("SUBTITLE_FILE") or VIDEO_PATH.with_name(f"{VIDEO_PATH.stem}_{TARGET_LANG}.ass"))
SUB_EXT = SUB_PATH.suffix.lower()
BASE_STEM = f"{VIDEO_PATH.stem}_{TARGET_LANG}"

# soft：内封字幕（不转码，大小≈原视频；字幕清晰，推荐）
#   - ASS 内封：MP4 不支持内封 ASS，因此会输出 MKV
# hard：硬字幕烧录（需要重编码；ASS 样式生效；编码质量决定清晰度/体积）
MODE = "soft"  # soft / hard

if not VIDEO_PATH.exists():
    raise FileNotFoundError(VIDEO_PATH)
if not SUB_PATH.exists():
    raise FileNotFoundError(SUB_PATH)

def ffprobe_json(path: Path) -> dict:
    out = subprocess.check_output(
        [
            "ffprobe",
            "-v",
            "error",
            "-show_entries",
            "format=duration,size,bit_rate",
            "-show_entries",
            "stream=codec_type,codec_name,width,height,bit_rate",
            "-of",
            "json",
            str(path),
        ],
        text=True,
    )
    return json.loads(out)

cmd_nvenc = None
cmd_x264 = None

if MODE == "soft":
    # 内封字幕：不重编码，文件大小基本不变（推荐）
    if SUB_EXT == ".ass":
        OUT_PATH = VIDEO_PATH.with_name(f"{BASE_STEM}.mkv")
        cmd = [
            "ffmpeg",
            "-y",
            "-i",
            str(VIDEO_PATH),
            "-i",
            str(SUB_PATH),
            "-map",
            "0:v",
            "-map",
            "0:a?",
            "-map",
            "1:0",
            "-c:v",
            "copy",
            "-c:a",
            "copy",
            "-c:s",
            "ass",
            "-metadata:s:s:0",
            "language=chi",
            "-metadata:s:s:0",
            "title=Chinese",
            "-disposition:s:0",
            "default",
            str(OUT_PATH),
        ]
    else:
        OUT_PATH = VIDEO_PATH.with_name(f"{BASE_STEM}{VIDEO_PATH.suffix or '.mp4'}")
        cmd = [
            "ffmpeg",
            "-y",
            "-i",
            str(VIDEO_PATH),
            "-i",
            str(SUB_PATH),
            "-map",
            "0:v",
            "-map",
            "0:a?",
            "-map",
            "1:0",
            "-c:v",
            "copy",
            "-c:a",
            "copy",
            "-c:s",
            "mov_text",
            "-metadata:s:s:0",
            "language=chi",
            "-metadata:s:s:0",
            "title=Chinese",
            "-disposition:s:0",
            "default",
            "-movflags",
            "+faststart",
            str(OUT_PATH),
        ]
else:
    OUT_PATH = VIDEO_PATH.with_name(f"{BASE_STEM}{VIDEO_PATH.suffix or '.mp4'}")
    # 硬字幕：会重编码；为了让体积接近原视频，按原视频码率做 ABR（并略加一点给字幕边缘）
    info = ffprobe_json(VIDEO_PATH)
    height = None
    v_bitrate = None
    for st in info.get("streams", []) or []:
        if st.get("codec_type") == "video":
            height = st.get("height")
            try:
                v_bitrate = int(st.get("bit_rate")) if st.get("bit_rate") else None
            except Exception:
                v_bitrate = None
            break
    try:
        duration = float((info.get("format", {}) or {}).get("duration") or 0)
    except Exception:
        duration = 0.0
    try:
        size = int((info.get("format", {}) or {}).get("size") or 0)
    except Exception:
        size = 0
    if not v_bitrate and duration > 0 and size > 0:
        v_bitrate = int(size * 8 / duration)  # 退化估算（包含音频，足够用来控体积）
    target_k = int((v_bitrate or 800_000) / 1000 * 1.05)  # 轻微余量

    if SUB_EXT == ".ass":
        # ASS 直接走 libass（样式以 ASS 为准）
        vf = f"ass={SUB_PATH.as_posix()}:fontsdir=/usr/share/fonts"
    else:
        if not height:
            height = 720
        font_size = min(60, max(24, int(height * 0.06)))
        outline = max(2, int(font_size / 16))
        margin_v = max(20, int(font_size * 1.2))

        style = (
            f"FontName=Noto Sans CJK SC,"
            f"FontSize={font_size},Bold=1,"
            f"Outline={outline},Shadow=0,"
            f"MarginV={margin_v},Alignment=2"
        )
        vf = f"subtitles={SUB_PATH.as_posix()}:charenc=UTF-8:fontsdir=/usr/share/fonts:force_style='{style}'"

    encoders = subprocess.check_output(
        ["ffmpeg", "-hide_banner", "-encoders"],
        text=True,
        stderr=subprocess.STDOUT,
    )
    use_nvenc = "h264_nvenc" in encoders

    cmd = [
        "ffmpeg",
        "-y",
        "-i",
        str(VIDEO_PATH),
        "-vf",
        vf,
        "-c:a",
        "copy",
        "-movflags",
        "+faststart",
    ]
    cmd_nvenc = None
    cmd_x264 = None
    if use_nvenc:
        # 注意：部分环境虽然编译了 nvenc，但运行时可能不可用（驱动/权限），因此下面会自动回退到 libx264。
        cmd_nvenc = cmd + [
            "-c:v",
            "h264_nvenc",
            "-preset",
            "p7",
            "-rc",
            "vbr",  # 比 vbr_hq 兼容性更好
            "-b:v",
            f"{target_k}k",
            "-maxrate",
            f"{int(target_k * 1.5)}k",
            "-bufsize",
            f"{int(target_k * 2)}k",
            "-pix_fmt",
            "yuv420p",
            str(OUT_PATH),
        ]

    cmd_x264 = cmd + [
        "-c:v",
        "libx264",
        "-preset",
        "slow",
        "-b:v",
        f"{target_k}k",
        "-maxrate",
        f"{int(target_k * 1.5)}k",
        "-bufsize",
        f"{int(target_k * 2)}k",
        "-pix_fmt",
        "yuv420p",
        str(OUT_PATH),
    ]

    cmd = cmd_nvenc or cmd_x264

print("Running:\n ", " ".join(cmd))
try:
    subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
    if MODE == "hard" and cmd_nvenc is not None:
        print("\nNVENC 合并失败，自动回退到 libx264 重试...\n")
        cmd = cmd_x264
        print("Running:\n ", " ".join(cmd))
        subprocess.run(cmd, check=True)
    else:
        raise
globals()["SUBBED_FILE"] = str(OUT_PATH)
globals()["SUBTITLE_FILE"] = str(SUB_PATH)
print("OK video:", OUT_PATH)
print("OK subtitle:", SUB_PATH)


In [None]:
#@title 10) 复制最终视频+字幕到目录
from pathlib import Path
import shutil

# 复制目标目录（可改）
EXPORT_DIR = Path("/content/drive/MyDrive/share")
EXPORT_DIR.mkdir(parents=True, exist_ok=True)

video = Path(globals().get("SUBBED_FILE", ""))
sub = Path(globals().get("SUBTITLE_FILE", ""))

if not video.exists():
    raise FileNotFoundError(video)
if not sub.exists():
    raise FileNotFoundError(sub)

# 让字幕和视频同名（仅扩展名不同）
sub_ext = (sub.suffix or ".ass").lower()
aligned_sub = video.with_suffix(sub_ext)
if aligned_sub != sub:
    shutil.copy2(sub, aligned_sub)
    sub = aligned_sub

dst_video = EXPORT_DIR / video.name
dst_sub = EXPORT_DIR / f"{dst_video.stem}{sub_ext}"
shutil.copy2(video, dst_video)
shutil.copy2(sub, dst_sub)

globals()["SUBBED_FILE"] = str(dst_video)
globals()["SUBTITLE_FILE"] = str(dst_sub)
print("copied video:", dst_video)
print("copied subtitle:", dst_sub)


In [None]:
#@title （备用）上传到 OpenList/AList（可选） + 下载文件
import json
from pathlib import Path
from urllib.parse import quote
from google.colab import userdata

import requests

# ====== 本地文件（由前面步骤生成）======
VIDEO_PATH = Path(globals().get("VIDEO_FILE", "1.mp4"))
TARGET_LANG = globals().get("TARGET_LANG", "zh")
SUB_FILE = Path(globals().get("SUBTITLE_FILE") or VIDEO_PATH.with_name(f"{VIDEO_PATH.stem}_{TARGET_LANG}.ass"))
LOCAL_FILE = Path(globals().get("SUBBED_FILE") or VIDEO_PATH.with_name(f"{VIDEO_PATH.stem}_{TARGET_LANG}{VIDEO_PATH.suffix or '.mp4'}"))
UPLOAD_FILE_OVERRIDE = ""  # 例如 /content/llama_wheels/llama_cpp_python-*.whl
if UPLOAD_FILE_OVERRIDE:
    import glob
    matches = glob.glob(UPLOAD_FILE_OVERRIDE)
    LOCAL_FILE = Path(matches[0]) if matches else Path(UPLOAD_FILE_OVERRIDE)
print("local subtitle:", SUB_FILE)
print("local video:", LOCAL_FILE)

# ====== OpenList/AList 上传配置（Colab Secrets/userdata）======
UPLOAD_TO_OPENLIST = True  # True=上传；False=跳过
OPENLIST_BASE_URL = userdata.get('OPENLIST_BASE_URL')  # 例如 https://oplist.example.com
OPENLIST_USERNAME = userdata.get('OPENLIST_USERNAME')
OPENLIST_PASSWORD = userdata.get('OPENLIST_PASSWORD')
REMOTE_DIR = userdata.get('REMOTE_DIR')  # 例如 /tianyi/test3
REMOTE_FILENAME = LOCAL_FILE.name  # 或写死："xxx.mp4"
VERIFY_TLS = True  # https 自签证书可设 False

def alist_login(base_url: str, username: str, password: str) -> str:
    url = f"{base_url}/api/auth/login"
    # 兼容不同参数名：先按博客（Username/Password），失败再试小写
    last = None
    for payload in (
        {"Username": username, "Password": password},
        {"username": username, "password": password},
    ):
        r = requests.post(url, data=payload, timeout=30, verify=VERIFY_TLS)
        r.raise_for_status()
        last = r.json()
        if last.get("code") == 200 and isinstance(last.get("data"), dict) and last["data"].get("token"):
            return last["data"]["token"]
    raise RuntimeError(f"login failed: {last}")

def alist_mkdir(base_url: str, token: str, path: str):
    url = f"{base_url}/api/fs/mkdir"
    headers = {"Authorization": token}
    r = requests.post(url, headers=headers, data={"path": path}, timeout=30, verify=VERIFY_TLS)
    # 目录已存在时通常返回非200或 code!=200，这里不强制失败
    try:
        return r.json()
    except Exception:
        return None

def alist_upload_file(base_url: str, token: str, local_path: Path, remote_dir: str, remote_name: str, as_task: bool = True):
    url = f"{base_url}/api/fs/form"
    remote_dir = remote_dir if str(remote_dir).startswith("/") else "/" + str(remote_dir)
    remote_path = remote_dir.rstrip("/") + "/" + remote_name
    file_path_header = quote(remote_path, safe="/")  # URL 编码（保留/）
    headers = {
        "Authorization": token,
        "file-path": file_path_header,
    }
    if as_task:
        # 让服务端以“任务”方式处理（可避免反代等待远端存储上传导致 504 超时）
        headers["As-Task"] = "true"

    def _do_put(hdrs: dict):
        # 用 requests-toolbelt 流式 multipart 上传 + 进度条/速度
        import time
        from tqdm.auto import tqdm
        from requests_toolbelt.multipart.encoder import MultipartEncoder, MultipartEncoderMonitor
        with local_path.open("rb") as f:
            encoder = MultipartEncoder(fields={"file": (remote_name, f, "application/octet-stream")})
            hdrs2 = dict(hdrs)
            hdrs2["Content-Type"] = encoder.content_type
            hdrs2["Content-Length"] = str(encoder.len)

            bar = tqdm(total=encoder.len, unit="B", unit_scale=True, unit_divisor=1024, desc="upload")
            t0 = time.time()
            last_t = t0
            last_b = 0

            def _cb(m: MultipartEncoderMonitor):
                nonlocal last_t, last_b
                cur = m.bytes_read
                if cur > bar.n:
                    bar.update(cur - bar.n)
                now = time.time()
                if now - last_t >= 0.5:
                    spd = (cur - last_b) / max(1e-6, (now - last_t))
                    bar.set_postfix_str(f"{spd/1024/1024:.2f} MB/s")
                    last_t = now
                    last_b = cur

            monitor = MultipartEncoderMonitor(encoder, _cb)
            try:
                # 读超时设很大：避免上传大文件时本地 read timeout
                r = requests.put(url, headers=hdrs2, data=monitor, timeout=(30, 24 * 3600), verify=VERIFY_TLS)
            finally:
                bar.close()
            return r

    r = _do_put(headers)
    if r.status_code in (401, 403) and not str(headers.get("Authorization", "")).lower().startswith("bearer "):
        headers2 = dict(headers)
        headers2["Authorization"] = f"Bearer {token}"
        r = _do_put(headers2)

    r.raise_for_status()
    j = r.json()
    if j.get("code") != 200:
        raise RuntimeError(f"upload failed: {j}")
    return j


def alist_task_info(base_url: str, token: str, tid: str):
    url = f"{base_url}/api/task/upload/info"
    headers = {"Authorization": token}
    r = requests.post(url, headers=headers, params={"tid": tid}, timeout=30, verify=VERIFY_TLS)
    if r.status_code in (401, 403) and not str(headers.get("Authorization", "")).lower().startswith("bearer "):
        headers2 = dict(headers)
        headers2["Authorization"] = f"Bearer {token}"
        r = requests.post(url, headers=headers2, params={"tid": tid}, timeout=30, verify=VERIFY_TLS)
    r.raise_for_status()
    return r.json()


def wait_upload_task(base_url: str, token: str, tid: str, poll_sec: float = 2.0, timeout_sec: int = 24 * 3600):
    import time
    from tqdm.auto import tqdm
    t0 = time.time()
    bar = tqdm(total=100, desc="server task", unit="%")
    last_p = 0
    try:
        while True:
            info = alist_task_info(base_url, token, tid)
            items = info.get("data") or []
            task = items[0] if items else None
            if not task:
                raise RuntimeError(f"task not found: {info}")
            state = str(task.get("state") or "")
            status = str(task.get("status") or "")
            err = str(task.get("error") or "")
            try:
                p = int(task.get("progress") or 0)
            except Exception:
                p = 0
            p = max(0, min(100, p))
            if p > last_p:
                bar.update(p - last_p)
                last_p = p
            if status:
                bar.set_postfix_str(status[:60])

            if state in ("succeeded", "success") or p >= 100:
                return task
            if state in ("failed", "error"):
                raise RuntimeError(f"upload task failed: {err or task}")
            if time.time() - t0 > timeout_sec:
                raise TimeoutError(f"upload task timeout after {timeout_sec}s: {tid}")
            time.sleep(poll_sec)
    finally:
        bar.close()

if UPLOAD_TO_OPENLIST:
    if not OPENLIST_BASE_URL:
        raise ValueError("OPENLIST_BASE_URL is empty")
    if not OPENLIST_USERNAME or not OPENLIST_PASSWORD:
        raise ValueError("OPENLIST_USERNAME/OPENLIST_PASSWORD is empty")
    if not LOCAL_FILE.exists():
        raise FileNotFoundError(LOCAL_FILE)

    print("login ...")
    token = alist_login(OPENLIST_BASE_URL, OPENLIST_USERNAME, OPENLIST_PASSWORD)
    print("token ok")

    if REMOTE_DIR and REMOTE_DIR != "/":
        print("mkdir ...", REMOTE_DIR)
        alist_mkdir(OPENLIST_BASE_URL, token, REMOTE_DIR)

    print("upload ...", LOCAL_FILE, "->", REMOTE_DIR, REMOTE_FILENAME)
    res = alist_upload_file(OPENLIST_BASE_URL, token, LOCAL_FILE, REMOTE_DIR, REMOTE_FILENAME, as_task=True)
    print("upload response:", json.dumps(res, ensure_ascii=False))
    tid = (((res.get("data") or {}) .get("task") or {}) .get("id")) if isinstance(res, dict) else None
    if tid:
        print("server task id:", tid)
        final_task = wait_upload_task(OPENLIST_BASE_URL, token, tid)
        print("upload done:", json.dumps(final_task, ensure_ascii=False))
    else:
        print("upload ok (no task id)")

# ====== 下载到本地（Colab）======
# from google.colab import files

# files.download(str(SUB_FILE))
# files.download(str(LOCAL_FILE))


In [None]:
#@title （备用）停止服务
import os
import signal
import pathlib

pid_path = pathlib.Path("server.pid")
if pid_path.exists():
    pid = int(pid_path.read_text().strip())
    os.kill(pid, signal.SIGTERM)
    print("killed:", pid)
else:
    print("server.pid not found")
