In [1]:
# -*- coding: utf-8 -*-
"""VideoRobot_Studio_Pro.ipynb

Automatically generated by Colab.

Original file is located at
    https://colab.research.google.com/drive/1zDTsiqfJzp_wdnlfXadMA9lwC6GfDUXO
"""

# @title 1) Install (pip + apt) — Self-contained, Idempotent (with Inter font fetch)
# نصب پیش‌نیازها + دانلود مطمئن فونت‌های Inter (static TTF) با چند مسیر fallback

from __future__ import annotations
import os, sys, shutil, tempfile, subprocess as sp
from pathlib import Path
from shutil import which

# -----------------------------
# Helpers
# -----------------------------
def _env():
    e = os.environ.copy()
    e.setdefault("PIP_DISABLE_PIP_VERSION_CHECK", "1")
    e.setdefault("DEBIAN_FRONTEND", "noninteractive")
    e.setdefault("PYTHONWARNINGS", "ignore")
    e.setdefault("LC_ALL", "C")
    e.setdefault("LANG", "C")
    return e

def _run(cmd, timeout=None):
    p = sp.run(cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True, timeout=timeout, env=_env())
    return p.returncode, (p.stdout or "").strip(), (p.stderr or "").strip()

def _ok(msg): print(f"✅ {msg}")
def _warn(msg): print(f"⚠️  {msg}")
def _err(msg): print(f"❌ {msg}")

def _apt_install(pkgs):
    if not pkgs: return
    if which("apt-get") is None:
        _warn("apt-get unavailable; skipping APT installs")
        return
    _run(["apt-get","update","-qq"], timeout=60)
    _run(["apt-get","install","-y","-qq",*pkgs], timeout=600)

def _pip_install(specs):
    if not specs: return
    py = sys.executable
    _run([py,"-m","pip","install","--upgrade","--no-input","--no-cache-dir",*specs], timeout=1200)

# -----------------------------
# APT tools
# -----------------------------
needed_apt = []
if which("ffmpeg") is None or which("ffprobe") is None:
    needed_apt.append("ffmpeg")
for req in ("fontconfig","unzip"):
    needed_apt.append(req)
# یک فونت سیستم پایه هم برای fallback
needed_apt.append("fonts-dejavu-core")
# unique
needed_apt = list(dict.fromkeys(needed_apt))
_apt_install(needed_apt)

for tool in ("ffmpeg","ffprobe","fc-match"):
    if which(tool): _ok(f"{tool} present")
    else: _err(f"{tool} missing (install via apt-get)")

# -----------------------------
# Python deps (حداقل‌های ضروری)
# -----------------------------
pip_specs = [
    "gradio>=5.49.0",
    "Pillow>=11.3.0",
    "fonttools>=4.60.0",
    "ctranslate2>=4.5.0",
    "faster-whisper>=1.0.3",
]
_pip_install(pip_specs)

def _try_import(name, attr="__version__"):
    try:
        m = __import__(name)
        ver = getattr(m, attr, None)
        _ok(f"{name} {ver if ver else ''} OK")
        return True
    except Exception as e:
        _warn(f"{name} import failed: {e}")
        return False

_ = _try_import("PIL")
_ = _try_import("fontTools")
_ = _try_import("gradio")
_ = _try_import("ctranslate2")
_ = _try_import("faster_whisper")

# -----------------------------
# Smoke tests (x264 / libass / NVENC)
# -----------------------------
def _probe_x264():
    fps = 30
    rc, _, err = _run([
        "ffmpeg","-hide_banner","-loglevel","error",
        "-f","lavfi","-t","0.1","-i",f"testsrc2=size=16x16:rate={fps}",
        "-c:v","libx264","-preset","fast","-crf","21",
        "-pix_fmt","yuv420p","-profile:v","high","-bf","2","-g",str(2*fps),
        "-colorspace","bt709","-color_primaries","bt709","-color_trc","bt709","-color_range","tv",
        "-f","null","-"
    ], timeout=6)
    if rc==0: _ok("FFmpeg: libx264 OK")
    else: _err(f"FFmpeg: libx264 probe failed: {err[:2000]}")

def _probe_ass():
    ass_path = Path(tempfile.gettempdir())/"ass_probe.ass"
    txt = (
        "[Script Info]\nPlayResX:64\nPlayResY:64\nWrapStyle:2\nScaledBorderAndShadow:yes\nYCbCr Matrix: TV.709\n\n"
        "[V4+ Styles]\n"
        "Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, "
        "Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, "
        "Alignment, MarginL, MarginR, MarginV, Encoding\n"
        "Style: Default,DejaVu Sans,18,&H00FFFFFF,&H00FFFFFF,&H00000000,&H00000000,0,0,0,0,100,100,0,0,1,1,0,7,10,10,10,0\n\n"
        "[Events]\nFormat: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n"
        "Dialogue: 0,0:00:00.00,0:00:00.20,Default,,0,0,0,,Hello\\NWorld\n"
    )
    ass_path.write_text(txt, encoding="utf-8")
    rc, _, err = _run([
        "ffmpeg","-hide_banner","-loglevel","error",
        "-f","lavfi","-t","0.1","-i","color=size=16x16:rate=30:color=black",
        "-vf", f"subtitles={ass_path}:charenc=UTF-8",
        "-f","null","-"
    ], timeout=6)
    if rc==0: _ok("FFmpeg: libass (subtitles) OK")
    else: _err(f"FFmpeg: libass probe failed: {err[:2000]}")

def _probe_nvenc():
    fps = 30
    rc, _, _ = _run([
        "ffmpeg","-hide_banner","-loglevel","error",
        "-f","lavfi","-t","0.1","-i",f"testsrc2=size=16x16:rate={fps}",
        "-c:v","h264_nvenc","-f","null","-"
    ], timeout=6)
    print(f"NVENC_AVAILABLE (detected): {rc==0}")

_probe_x264()
_probe_ass()
_probe_nvenc()

# -----------------------------
# Font install: Inter (static TTFs)
# -----------------------------
HOME = Path.home()
FONTS_A = HOME/".local/share/fonts/Inter"
FONTS_B = HOME/".fonts/Inter"  # بعضی بیلدها فقط این مسیر را اسکن می‌کنند
for d in (FONTS_A, FONTS_B):
    d.mkdir(parents=True, exist_ok=True)

# لیست وزن‌های استاندارد که برای رندر کپشن کافی و شیک‌اند
weights = ["Regular","Medium","SemiBold","Bold","ExtraBold","Black"]
static_names = [f"Inter-{w}.ttf" for w in weights]

def _download(url: str, dst: Path) -> bool:
    if dst.exists() and dst.stat().st_size > 1_000:
        return True
    rc, _, err = _run(["curl","-L","-f","-sS","-o",str(dst), url], timeout=60)
    return rc == 0 and dst.exists() and dst.stat().st_size > 1_000

def _copy_to_both(src: Path):
    try:
        shutil.copy2(src, FONTS_A/src.name)
    except Exception: pass
    try:
        shutil.copy2(src, FONTS_B/src.name)
    except Exception: pass

def install_inter_static() -> bool:
    ok_count = 0
    # Source A: Google Fonts repo (raw static TTFs)
    baseA = "https://raw.githubusercontent.com/google/fonts/main/ofl/inter/static/"
    for name in static_names:
        for target_dir in (FONTS_A, FONTS_B):
            dst = target_dir/name
            if dst.exists() and dst.stat().st_size > 1_000:
                ok_count += 1
                continue
            url = baseA + name
            if _download(url, dst):
                ok_count += 1
    if ok_count >= 4:
        return True

    # Source B: gh mirror path (alternative raw endpoint)
    baseB = "https://github.com/google/fonts/raw/main/ofl/inter/static/"
    for name in static_names:
        for target_dir in (FONTS_A, FONTS_B):
            dst = target_dir/name
            if dst.exists() and dst.stat().st_size > 1_000:
                continue
            url = baseB + name
            if _download(url, dst):
                ok_count += 1
    if ok_count >= 4:
        return True

    # Source C: Official rsms release zip (v4.1), extract TTFs
    zip_url = "https://github.com/rsms/inter/releases/download/v4.1/Inter-4.1.zip"
    tmpdir = Path(tempfile.mkdtemp(prefix="inter_dl_"))
    zpath = tmpdir/"Inter-4.1.zip"
    if _download(zip_url, zpath):
        rc, _, err = _run(["unzip","-o","-qq",str(zpath),"-d",str(tmpdir)], timeout=120)
        if rc == 0:
            for ttf in tmpdir.rglob("*.ttf"):
                # فقط Inter_* را کپی کن
                if "Inter" in ttf.name:
                    _copy_to_both(ttf)
    # شمارش نهایی
    have = [p for p in (FONTS_A.glob("Inter-*.ttf")) if p.stat().st_size>1_000]
    have += [p for p in (FONTS_B.glob("Inter-*.ttf")) if p.stat().st_size>1_000]
    return len({p.name for p in have}) >= 4

inter_ok = install_inter_static()
if inter_ok:
    _ok("Inter static TTFs installed")
else:
    _warn("Inter font download failed (variable + static). Using system fallbacks only.")

# Refresh fontconfig cache
_run(["fc-cache","-f","-v"], timeout=60)

# Verify default font availability
rc, out, err = _run(["fc-match","-f","%{family}\n","Inter,DejaVu Sans,Arial,Helvetica,sans"], timeout=5)
if rc==0 and out.strip():
    fam = out.splitlines()[0]
    _ok(f"Font available: {fam}")
else:
    _warn(f"fontconfig could not resolve a font: {err[:2000]}")

print("\nPreflight complete. This cell installed and verified runtime prerequisites.")

✅ ffmpeg present
✅ ffprobe present
✅ fc-match present
✅ PIL 11.3.0 OK
✅ fontTools 4.60.1 OK
✅ gradio 5.49.1 OK
✅ ctranslate2 4.6.0 OK
✅ faster_whisper 1.2.1 OK
✅ FFmpeg: libx264 OK
✅ FFmpeg: libass (subtitles) OK
NVENC_AVAILABLE (detected): False
✅ Inter static TTFs installed
✅ Font available: Inter

Preflight complete. This cell installed and verified runtime prerequisites.


In [2]:
# @title 2) Roots & Cache — Refactored, Deterministic, Colab-aware
# Define Drive/Work roots, JSON cache, and ensure folders.

from __future__ import annotations

import json
import os
from pathlib import Path
from typing import Any, Dict, List, Tuple

# --------------------------------------------------------------------------------------
# Changelog (kept for provenance; not used at runtime)
# --------------------------------------------------------------------------------------
CHANGELOG: List[str] = [
    "vFINAL: Hidden cells + short titles",
    "Cache-safe ASS burn + moving pill highlight",
    "NVENC with clean x264 fallback",
    "Strict assets root: MyDrive/VideoRobot/Assets",
    "UI-first; heavy steps on demand",
]

# --------------------------------------------------------------------------------------
# Runtime / environment detection
# --------------------------------------------------------------------------------------

def _in_colab() -> bool:
    """
    Cheap Colab detection that tolerates non-notebook environments.
    Signals: COLAB_RELEASE_TAG, COLAB_GPU, PATH containing 'COLAB'.
    """
    try:
        env = os.environ.copy()
        if "COLAB_RELEASE_TAG" in env or "COLAB_GPU" in env:
            return True
        return "COLAB" in env.get("PATH", "")
    except Exception:
        return False


def _mount_drive(force: bool = False) -> None:
    """
    Mount Google Drive inside Colab. No-op elsewhere.
    Failures are intentionally silent to avoid breaking local runs.
    """
    if not _in_colab():
        return
    try:
        from google.colab import drive  # type: ignore
        drive.mount("/content/drive", force_remount=bool(force))
    except Exception:
        pass  # degrade gracefully to local paths

# --------------------------------------------------------------------------------------
# Path selection helpers
# --------------------------------------------------------------------------------------

def _first_existing(paths: List[Path]) -> Tuple[bool, Path]:
    """
    Return (found, path) for the first existing candidate; if none, (False, last candidate or '.').
    """
    last = paths[-1] if paths else Path(".")
    for p in paths:
        try:
            if p.exists():
                return True, p
        except Exception:
            continue
        last = p
    return False, last


def _pick_drive_root() -> Path:
    """
    Resolve the Drive root:
      • Colab: prefer /content/drive/MyDrive/VideoRobot or 'My Drive' variant.
      • Local: $VR_DRIVE_ROOT or ~/VideoRobot.
    Ensures the returned directory exists (creates for local).
    """
    if _in_colab():
        _mount_drive(False)
        candidates = [
            Path("/content/drive/MyDrive/VideoRobot"),
            Path("/content/drive/My Drive/VideoRobot"),
        ]
        found, p = _first_existing(candidates)
        if not found:
            _mount_drive(True)  # one retry with forced remount
            found, p = _first_existing(candidates)
        if not found:
            raise RuntimeError("Drive not mounted or '/content/drive/.../VideoRobot' missing in Colab.")
        return p

    env_root = os.environ.copy().get("VR_DRIVE_ROOT")
    root = Path(env_root).expanduser().absolute() if env_root else (Path.home() / "VideoRobot")
    root.mkdir(parents=True, exist_ok=True)
    return root


def _pick_work_root() -> Path:
    """
    Resolve the work/cache root:
      • Colab: /content/_studio_cache
      • Local: $VR_WORK_ROOT or ~/.videorobot_cache
    Ensures the returned directory exists.
    """
    if _in_colab():
        wr = Path("/content/_studio_cache").absolute()
        wr.mkdir(parents=True, exist_ok=True)
        return wr

    env_wr = os.environ.copy().get("VR_WORK_ROOT")
    wr = Path(env_wr).expanduser().absolute() if env_wr else (Path.home() / ".videorobot_cache")
    wr.mkdir(parents=True, exist_ok=True)
    return wr

# --------------------------------------------------------------------------------------
# Drive/Work tree (materialize eagerly for downstream cells)
# --------------------------------------------------------------------------------------

DRIVE_ROOT: Path = _pick_drive_root().absolute()
ASSETS_ROOT: Path = (DRIVE_ROOT / "Assets"); ASSETS_ROOT.mkdir(parents=True, exist_ok=True)
OUTPUT_ROOT: Path = (DRIVE_ROOT / "Output"); OUTPUT_ROOT.mkdir(parents=True, exist_ok=True)

WORK_ROOT: Path = _pick_work_root()
WORK_CACHE: Path = WORK_ROOT / "cache";  WORK_CACHE.mkdir(parents=True, exist_ok=True)
WORK_ASSETS: Path = WORK_ROOT / "assets"; WORK_ASSETS.mkdir(parents=True, exist_ok=True)
WORK_META: Path = WORK_ROOT / "meta.json"

# --------------------------------------------------------------------------------------
# JSON cache helpers
# --------------------------------------------------------------------------------------

def _json_dump(path: Path, obj: Any) -> None:
    """
    Atomic JSON write: write to <path>.tmp then replace.
    Errors are swallowed to keep telemetry non-fatal.
    """
    try:
        path.parent.mkdir(parents=True, exist_ok=True)
        tmp = path.with_suffix(path.suffix + ".tmp")
        tmp.write_text(json.dumps(obj, ensure_ascii=False, indent=2), encoding="utf-8")
        tmp.replace(path)
    except Exception:
        pass  # best-effort cache


def _json_load(path: Path, default: Any) -> Any:
    """
    Safe JSON load:
      • Returns 'default' for any error (missing, parse, permissions).
      • Leaves corrupt file untouched (non-destructive).
    """
    try:
        if not path.exists():
            return default
        return json.loads(path.read_text(encoding="utf-8"))
    except Exception:
        return default

# Global metadata cache (shared across cells)
META: Dict[str, Any] = _json_load(WORK_META, {})

# --------------------------------------------------------------------------------------
# Asset roots hook and tree ensure
# --------------------------------------------------------------------------------------

def DRIVE_ASSETS_FOLDERS() -> List[Path]:
    """
    Returns Drive asset roots to be scanned by staging layers.
    Extendable without changing callers.
    """
    return [ASSETS_ROOT]


def ensure_drive_tree(root: Path) -> None:
    """
    Ensure a standard folder layout under 'root' (idempotent).
    Creates: Assets, Output, _tmp, Music, Figures.
    """
    for sub in ("Assets", "Output", "_tmp", "Music", "Figures"):
        try:
            (root / sub).mkdir(parents=True, exist_ok=True)
        except Exception:
            pass  # directory creation should never kill the session

# Ensure the expected subfolders under DRIVE_ROOT
ensure_drive_tree(DRIVE_ROOT)

print(f"ASSETS ROOT = {ASSETS_ROOT}")
print(f"WORK ROOT   = {WORK_ROOT}")

Mounted at /content/drive
ASSETS ROOT = /content/drive/MyDrive/VideoRobot/Assets
WORK ROOT   = /content/_studio_cache


In [3]:
# @title 3) Core Utils + NVENC Detect — Refactored, Clean, Deterministic
# Process helpers, escaping, hashing, probe, and NVENC detection.

from __future__ import annotations

import hashlib
import os
import re
import shlex
import subprocess as sp
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
from typing import Dict, Iterable, Optional, Tuple, Union

__all__ = [
    "_RunResult", "_env", "_run",
    "ffesc", "sha1_file",
    "bgr_hex", "ass_timestamp",
    "probe_image_wh",
    "detect_nvenc", "NVENC_AVAILABLE",
]

# ============================================================
# Subprocess runner
# ============================================================

@dataclass(frozen=True)
class _RunResult:
    """Minimal, predictable result container for subprocess calls."""
    returncode: int
    stdout: str
    stderr: str


def _env(extra: Optional[Dict[str, str]] = None) -> Dict[str, str]:
    """
    Return a copy of the current environment with optional overrides.
    Never mutates os.environ.
    """
    e = os.environ.copy()
    if extra:
        e.update(extra)
    return e


def _run(
    cmd: Union[str, Iterable[Union[str, Path]]],
    check: bool = True,
    capture: bool = True,
    env: Optional[Dict[str, str]] = None,
    cwd: Optional[Path] = None,
    timeout: Optional[float] = None,
) -> _RunResult:
    """
    Safe subprocess runner with sane defaults:
      - cmd may be a string (shell-split) or an iterable of args/Paths.
      - capture toggles stdout/stderr pipes.
      - raises RuntimeError on non-zero exit when check=True.
      - provides clear timeout and not-found errors.
    """
    args = shlex.split(cmd) if isinstance(cmd, str) else [str(a) for a in cmd]
    try:
        p = sp.run(
            args,
            stdout=(sp.PIPE if capture else None),
            stderr=(sp.PIPE if capture else None),
            text=True,
            env=(env or _env()),
            cwd=(str(cwd) if cwd else None),
            timeout=timeout,
        )
    except sp.TimeoutExpired as te:
        raise RuntimeError(f"Command timeout: {' '.join(args)}") from te
    except FileNotFoundError as fe:
        missing = args[0] if args else "<empty>"
        raise RuntimeError(f"Command not found: {missing}") from fe

    rr = _RunResult(p.returncode, p.stdout or "", p.stderr or "")
    if check and rr.returncode != 0:
        tail = (rr.stderr or rr.stdout or "").strip()
        raise RuntimeError(tail or f"Command failed: {' '.join(args)}")
    return rr


# ============================================================
# FFmpeg escaping
# ============================================================

def ffesc(s: str) -> str:
    r"""
    Escape string for ffmpeg filter/option contexts.
    Escapes: \  :  '  ,  [ ]  ;  %  ( )  =
    """
    r_ = str(s).replace("\\", "\\\\")
    for ch in (":", "'", ",", "[", "]", ";", "%", "(", ")", "="):
        r_ = r_.replace(ch, "\\" + ch)
    return r_


# ============================================================
# Hashing
# ============================================================

def sha1_file(p: Path, chunk: int = 1 << 20) -> str:
    """
    SHA1 over file content + mtime_ns + size.
    This keeps cache keys stable per artifact while invalidating on edits.
    """
    pp = Path(p)
    if not pp.is_file():
        raise FileNotFoundError(f"sha1_file: missing file: {pp}")
    h = hashlib.sha1()
    with pp.open("rb") as f:
        for b in iter(lambda: f.read(chunk), b""):
            h.update(b)
    st = pp.stat()
    h.update(str(st.st_mtime_ns).encode("utf-8"))
    h.update(str(st.st_size).encode("utf-8"))
    return h.hexdigest()


# ============================================================
# Color helpers
# ============================================================

_HEX_DIGITS = set("0123456789abcdefABCDEF")

def _normalize_rgb_hex(rgb_hex: str) -> str:
    """
    Normalize '#RGB' or '#RRGGBB' (or bare) to 6-hex uppercase 'RRGGBB'.
    Returns 'FFFFFF' on invalid input.
    """
    hx = (rgb_hex or "").strip().lstrip("#")
    if len(hx) == 3 and all(c in _HEX_DIGITS for c in hx):
        hx = "".join(c * 2 for c in hx)
    if len(hx) != 6 or any(c not in _HEX_DIGITS for c in hx):
        return "FFFFFF"
    return hx.upper()


def bgr_hex(rgb_hex: str) -> str:
    """
    Convert CSS-like hex to ASS BGR (without &H and alpha).
    Example: '#A1B2C3' -> 'C3B2A1'
    """
    hx = _normalize_rgb_hex(rgb_hex)
    return f"{hx[4:6]}{hx[2:4]}{hx[0:2]}"


# ============================================================
# ASS helpers
# ============================================================

def ass_timestamp(seconds: float) -> str:
    """
    Format seconds -> h:mm:ss.cc (centiseconds) per ASS spec.
    """
    cs_total = int(round(max(0.0, float(seconds)) * 100.0))
    h, rem = divmod(cs_total, 360000)
    m, rem = divmod(rem, 6000)
    s, cs = divmod(rem, 100)
    return f"{int(h)}:{int(m):02d}:{int(s):02d}.{int(cs):02d}"


# ============================================================
# Probing helpers
# ============================================================

def _probe_wh_with_ffprobe(p: Path) -> Tuple[int, int]:
    """
    Try to probe width/height using ffprobe for videos and images.
    Returns (w, h) or (0, 0) on failure.
    """
    try:
        r = _run(
            [
                "ffprobe",
                "-v", "error",
                "-select_streams", "v:0",
                "-show_entries", "stream=width,height",
                "-of", "csv=s=x:p=0",
                str(p),
            ],
            check=False,
            timeout=5.0,
        ).stdout.strip()
        if "x" in r:
            w, h = r.split("x", 1)
            return int(w), int(h)
    except Exception:
        pass
    return 0, 0


def _probe_wh_with_pil(p: Path) -> Tuple[int, int]:
    """
    Fallback probing via PIL for image files.
    Returns (w, h) or (0, 0) on failure.
    """
    try:
        from PIL import Image  # type: ignore
        with Image.open(str(p)) as im:
            w, h = im.size
            return int(w), int(h)
    except Exception:
        return 0, 0


def probe_image_wh(p: Path) -> Tuple[int, int]:
    """
    Probe width/height via ffprobe (video or image) with PIL fallback.
    Returns (0, 0) on failure or invalid input.
    """
    try:
        if not p or not Path(p).exists():
            return 0, 0
    except Exception:
        return 0, 0
    w, h = _probe_wh_with_ffprobe(Path(p))
    if w > 0 and h > 0:
        return w, h
    return _probe_wh_with_pil(Path(p))


# ============================================================
# NVENC detection (cached for speed)
# ============================================================

def _preflight_tools() -> None:
    """Assert ffmpeg and ffprobe availability with actionable errors."""
    missing = []
    for exe in ("ffmpeg", "ffprobe"):
        try:
            _run([exe, "-version"], check=False, timeout=3.0)
        except Exception:
            missing.append(exe)
    if missing:
        raise RuntimeError(
            "WHAT: PATH tools check\n"
            f"WHY: Missing required executable(s): {', '.join(missing)}\n"
            "HOW_TO_FIX: Install the missing tools and ensure they are on PATH, then re-run this cell."
        )

@lru_cache(maxsize=1)
def _ffmpeg_encoders_text() -> str:
    """Cached raw encoder listing from ffmpeg."""
    try:
        return _run(["ffmpeg", "-hide_banner", "-encoders"], check=False, timeout=5.0).stdout or ""
    except Exception:
        return ""


@lru_cache(maxsize=1)
def _ffmpeg_hwaccels_text() -> str:
    """Cached raw hwaccel listing from ffmpeg."""
    try:
        return _run(["ffmpeg", "-hide_banner", "-hwaccels"], check=False, timeout=5.0).stdout or ""
    except Exception:
        return ""


def _ffmpeg_has_encoder(name: str) -> bool:
    """Check if ffmpeg lists a given encoder by name token."""
    out = _ffmpeg_encoders_text()
    return bool(re.search(rf"(?m)^\s*[A-Z\.]{{6}}\s+{re.escape(name)}\b", out))


def _has_nvidia_smi() -> bool:
    """Quick CUDA sanity check via nvidia-smi."""
    try:
        r = _run(["nvidia-smi"], check=False, timeout=2.0)
        return r.returncode == 0
    except Exception:
        return False


def _ffmpeg_has_hwaccel(tag: str) -> bool:
    """See whether ffmpeg reports a given hwaccel (e.g., 'cuda')."""
    out = _ffmpeg_hwaccels_text()
    return tag.lower() in out.lower()


@lru_cache(maxsize=1)
def _nvenc_smoke_test() -> bool:
    """
    Try a tiny encode with h264_nvenc; True only if ffmpeg succeeds with zero errors.
    Enforces SCA: bt709 tags, yuv420p, profile=high, bf∈{2,3}, gop=2*fps.
    """
    fps = 30  # normalized integer fps
    gop = int(round(2 * fps))
    args = [
        "ffmpeg", "-hide_banner", "-loglevel", "error", "-nostdin",
        "-f", "lavfi", "-t", "0.1", "-i", f"color=size=16x16:rate={fps}:color=black",
        "-c:v", "h264_nvenc",
        "-pix_fmt", "yuv420p",
        "-profile:v", "high",
        "-bf", "2",
        "-g", str(gop),
        "-colorspace", "bt709", "-color_primaries", "bt709", "-color_trc", "bt709", "-color_range", "tv",
        "-f", "null", "-"
    ]
    rr = _run(args, check=False, timeout=3.0)
    return rr.returncode == 0 and (rr.stderr.strip() == "")


def _nvenc_available() -> bool:
    """
    Encoder is listed, CUDA stack is visible, AND a real encode succeeds.
    Eliminates false positives like 'Cannot load libcuda.so.1'.
    """
    if not _ffmpeg_has_encoder("h264_nvenc"):
        return False
    if not (_has_nvidia_smi() or _ffmpeg_has_hwaccel("cuda")):
        return False
    return _nvenc_smoke_test()


def detect_nvenc() -> bool:
    """Public entry point for NVENC capability check."""
    return _nvenc_available()


# Preflight before evaluation to provide deterministic, actionable errors.
_preflight_tools()

NVENC_AVAILABLE: bool = detect_nvenc()
print("NVENC_AVAILABLE:", NVENC_AVAILABLE)


NVENC_AVAILABLE: False


In [4]:
# @title 4) Font Manager — Refactored, TTC-aware, ASS-friendly
# Resolve fonts from Assets to ~/.fonts and cache PIL fonts with robust fallbacks.

from __future__ import annotations

import hashlib
import os
import re
import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple

# Hard fail with remediation if dependencies are missing (no installs here).
try:
    from PIL import ImageFont  # type: ignore
except Exception as e:
    raise RuntimeError("Missing dependency: Pillow (PIL). Install with: pip install Pillow>=11.3.0") from e

try:
    from fontTools.ttLib import TTCollection, TTFont  # type: ignore
except Exception as e:
    raise RuntimeError("Missing dependency: fonttools. Install with: pip install fonttools>=4.54.1") from e

# Use shared sha1_file from Core Utils; do not re-declare hashing here.
try:
    _HASH_FUNC = sha1_file  # type: ignore[name-defined]
except Exception as e:
    raise RuntimeError("Dependency missing: sha1_file not found. Execute the Core Utils cell before Font Manager.") from e

__all__ = [
    "FontInfo", "PREFERRED_FAMILIES",
    "FontManager", "FONTM",
]

# --------------------------------------------------------------------------------------
# Public data model
# --------------------------------------------------------------------------------------

@dataclass(frozen=True)
class FontInfo:
    """Resolved font choice for both ASS and PIL."""
    family: str
    subfam: str
    path: Optional[Path]


# --------------------------------------------------------------------------------------
# Utility helpers (pure functions)
# --------------------------------------------------------------------------------------

_ASS_ENV_KEY = "ASS_FONT_DIR"
_USER_FONTS_DIR = Path("~/.fonts").expanduser()
_ASS_FONT_DIR_COMPUTED: Optional[str] = None  # computed for callers that wish to pass via env

# Professional fallback chain (order matters)
PREFERRED_FAMILIES: List[str] = [
    "Inter",
    "Source Sans 3",
    "IBM Plex Sans",
    "Roboto",
    "Noto Sans",
    "DejaVu Sans",
]


def _which(cmd: str) -> bool:
    """Return True if command exists on PATH."""
    try:
        from shutil import which
        return which(cmd) is not None
    except Exception:
        return False


def _run_cmd(args: List[str], *, capture: bool = True) -> subprocess.CompletedProcess:
    """
    Run a command safely. If a project-provided `_run` exists, prefer it.
    Falls back to `subprocess.run`. Never raises; caller inspects returncode.
    """
    try:
        return _run(args, check=False, capture=capture)  # type: ignore[name-defined]
    except Exception:
        pass

    stdout = subprocess.PIPE if capture else None
    stderr = subprocess.PIPE if capture else None
    try:
        return subprocess.run(args, check=False, text=True, stdout=stdout, stderr=stderr)
    except Exception as e:
        cp = subprocess.CompletedProcess(args=args, returncode=1)
        cp.stdout = ""  # type: ignore[attr-defined]
        cp.stderr = str(e)  # type: ignore[attr-defined]
        return cp


def _atomic_copy(src: Path, dst: Path) -> None:
    """Atomic copy to dst via a temporary sibling file."""
    dst = Path(dst)
    src = Path(src)
    dst.parent.mkdir(parents=True, exist_ok=True)
    tmp = dst.with_suffix(dst.suffix + ".tmpcopy")
    try:
        if tmp.exists():
            try:
                tmp.unlink()
            except Exception:
                pass
        shutil.copy2(src, tmp)
        tmp.replace(dst)
    finally:
        try:
            if tmp.exists():
                tmp.unlink()
        except Exception:
            pass


def _dedup_existing(paths: Iterable[Path]) -> List[Path]:
    """Resolve, filter to existing unique paths; preserve order."""
    out: List[Path] = []
    seen: set[Path] = set()
    for p in paths:
        try:
            rp = Path(p).resolve()
        except Exception:
            continue
        if rp.exists() and rp not in seen:
            out.append(rp)
            seen.add(rp)
    return out


def _normalize_subfamily(name: str) -> str:
    """Normalize subfamily tags to a small, predictable set."""
    s = (name or "").strip().lower()
    mapping = {
        "regular": "regular",
        "book": "regular",
        "roman": "regular",
        "normal": "regular",
        "plain": "regular",
        "bold": "bold",
        "italic": "italic",
        "oblique": "italic",
        "bold italic": "bold italic",
        "italic bold": "bold italic",
        "black": "black",
        "heavy": "black",
        "light": "light",
        "thin": "thin",
        "medium": "medium",
        "semibold": "semibold",
        "semi bold": "semibold",
        "extra bold": "extrabold",
        "extrabold": "extrabold",
    }
    return mapping.get(s, s)


def _read_name_table(tt: TTFont) -> Tuple[str, str, str]:
    """
    Extract family, subfamily, and PostScript name from a TTFont.
    Prefers typographic family/subfamily (nameID 16/17) when present.
    Returns ('', '', '') on failure.
    """
    fam = sub = ps = ""
    try:
        n = tt["name"]
        f16 = next((r for r in n.names if r.nameID == 16), None)
        f1 = next((r for r in n.names if r.nameID == 1), None)
        f17 = next((r for r in n.names if r.nameID == 17), None)
        f2 = next((r for r in n.names if r.nameID == 2), None)
        f6 = next((r for r in n.names if r.nameID == 6), None)
        fam = (f16 or f1).toUnicode() if (f16 or f1) else ""
        sub = (f17 or f2).toUnicode() if (f17 or f2) else ""
        ps = f6.toUnicode() if f6 else ""
    except Exception:
        pass
    return (fam or "").strip(), (sub or "").strip(), (ps or "").strip()


def _read_font_meta(path: Path) -> Tuple[str, str]:
    """
    Extract (family, subfamily) from a font file.
    TTC/OTC collections: favor Regular/Book face when available.
    """
    fam, sub = Path(path).stem, ""
    try:
        if Path(path).suffix.lower() in (".ttc", ".otc"):
            coll = TTCollection(str(path), lazy=True)
            best: Optional[Tuple[str, str]] = None
            for face in coll.fonts:
                try:
                    f, s, _ = _read_name_table(face)
                    if not best:
                        best = (f or Path(path).stem, s)
                    if _normalize_subfamily(s) in ("regular", "book"):
                        best = (f or Path(path).stem, s)
                        break
                except Exception:
                    continue
            if best:
                fam, sub = best
            try:
                coll.close()
            except Exception:
                pass
        else:
            tt = TTFont(str(path), lazy=True)
            f, s, _ = _read_name_table(tt)
            fam, sub = (f or Path(path).stem), s
            try:
                tt.close()
            except Exception:
                pass
    except Exception:
        pass
    return (fam or Path(path).stem).strip(), _normalize_subfamily(sub)


def _fc_match_family_path(family: str) -> Optional[Path]:
    """
    Resolve a family to a concrete font file using fontconfig (`fc-match -v`).
    Returns a Path or None. Parsing is deliberately loose to survive distro quirks.
    """
    if not family or not _which("fc-match"):
        return None
    cp = _run_cmd(["fc-match", "-v", family], capture=True)
    text = (getattr(cp, "stdout", None) or "") + (getattr(cp, "stderr", None) or "")
    m = re.search(r'file:\s*"([^"]+)"', text)
    if not m:
        return None
    p = Path(m.group(1))
    return p if p.exists() else None


def _collect_asset_roots() -> List[Path]:
    """
    Return potential asset root directories in order of likelihood.
    Uses external globals if available, but degrades gracefully.
    """
    roots: List[Path] = []
    try:
        wa = WORK_ASSETS  # type: ignore[name-defined]
        roots.append(wa if isinstance(wa, Path) else Path(wa))
    except Exception:
        pass
    try:
        ar = ASSETS_ROOT if isinstance(ASSETS_ROOT, Path) else Path(ASSETS_ROOT)  # type: ignore[name-defined]
        roots.append(ar)
    except Exception:
        pass
    try:
        for p in DRIVE_ASSETS_FOLDERS():  # type: ignore[name-defined]
            roots.append(p)
    except Exception:
        pass
    return _dedup_existing(roots)


def _find_font_file_by_name(file_name: str) -> Optional[Path]:
    """
    Search known asset roots for an exact filename match (non-recursive).
    Returns the first match or None.
    """
    if not file_name:
        return None
    name = Path(file_name).name
    for base in _collect_asset_roots():
        candidate = Path(base) / name
        try:
            if candidate.exists() and candidate.is_file():
                return candidate
        except Exception:
            continue
    return None


def _ensure_user_fonts_dir() -> Path:
    """Create ~/.fonts if missing and return its path."""
    _USER_FONTS_DIR.mkdir(parents=True, exist_ok=True)
    return _USER_FONTS_DIR


def _needs_copy(src: Path, dst: Path) -> bool:
    """Return True if dst is missing or content hash differs (uses shared hash helper)."""
    if not Path(dst).exists():
        return True
    try:
        return _HASH_FUNC(Path(dst)) != _HASH_FUNC(Path(src))
    except Exception:
        return True


def _install_into_user_fonts(src: Path) -> Path:
    """
    Copy a font file into ~/.fonts atomically if content differs.
    Trigger `fc-cache -f` when available. Return destination path.
    """
    fonts_dir = _ensure_user_fonts_dir()
    dst = fonts_dir / Path(src).name
    try:
        if _needs_copy(Path(src), dst):
            _atomic_copy(Path(src), dst)
            if _which("fc-cache"):
                _run_cmd(["fc-cache", "-f"], capture=False)
    except Exception:
        try:
            shutil.copy2(src, dst)
        except Exception:
            pass
    return dst


def _set_ass_fonts_dir(extra_dirs: Iterable[Path]) -> None:
    """
    Compute a value for ASS_FONT_DIR including ~/.fonts and provided extras.
    libass accepts '|' separated list. Keep deterministic and de-duped.
    Do not mutate global environment per SCA; callers may pass via _run(env=...).
    """
    global _ASS_FONT_DIR_COMPUTED
    dirs = [_USER_FONTS_DIR, *extra_dirs]
    entries = [str(Path(p).resolve()) for p in _dedup_existing(dirs)]
    if not entries:
        entries = [str(_ensure_user_fonts_dir().resolve())]
    _ASS_FONT_DIR_COMPUTED = "|".join(entries)


# --------------------------------------------------------------------------------------
# FontManager: caches + public API
# --------------------------------------------------------------------------------------

class FontManager:
    """
    Responsibilities:
      • Resolve a usable font file (explicit file, family via fontconfig, or fallbacks).
      • Install asset fonts into ~/.fonts atomically and refresh fontconfig.
      • Expose computed ASS_FONT_DIR via _ASS_FONT_DIR_COMPUTED for ffmpeg calls.
      • Cache fontconfig lookups, PIL font instances, and (family, subfamily) metadata.
      • Handle TTC/OTC collections and ambiguous subfamilies.
    """

    def __init__(self) -> None:
        self._fc_cache: Dict[str, Optional[Path]] = {}
        self._pil_cache: Dict[Tuple[Optional[Path], int], Optional[ImageFont.FreeTypeFont]] = {}
        self._meta_cache: Dict[Path, Tuple[str, str]] = {}

    # -----------------------
    # Internal cached helpers
    # -----------------------

    def _fc_cached(self, family: str) -> Optional[Path]:
        if not family:
            return None
        key = family.strip()
        if key in self._fc_cache:
            return self._fc_cache[key]
        p = _fc_match_family_path(key)
        self._fc_cache[key] = p
        return p

    def _meta_cached(self, path: Path) -> Tuple[str, str]:
        if Path(path) in self._meta_cache:
            return self._meta_cache[Path(path)]
        fam, sub = _read_font_meta(Path(path))
        self._meta_cache[Path(path)] = (fam, sub)
        return fam, sub

    def _resolve_by_family_chain(self, families: Iterable[str]) -> Optional[FontInfo]:
        """Try a list of family names via fontconfig; return first success."""
        for fam in families:
            p = self._fc_cached(fam)
            if p and Path(p).exists():
                f, s = self._meta_cached(Path(p))
                _set_ass_fonts_dir([Path(p).parent])
                return FontInfo(family=f, subfam=s or "regular", path=Path(p))
        return None

    # -----------------------
    # Public API
    # -----------------------

    def resolve_font_selection(self, font_file_name: Optional[str]) -> FontInfo:
        """
        Resolution order:
          1) Explicit file in Assets → install to ~/.fonts → use it.
          2) Treat provided string as a family name via fontconfig.
          3) Professional fallback chain via fontconfig.
          4) DejaVu Sans via fontconfig.
          5) Last resort: report DejaVu Sans with no concrete file (PIL default).
        Side effect: computes ASS font dir value to include ~/.fonts and any chosen directory.
        """
        # 1) Explicit asset file
        if font_file_name and str(font_file_name).strip().lower() != "none":
            asset = _find_font_file_by_name(str(font_file_name))
            if asset and Path(asset).exists():
                dst = _install_into_user_fonts(Path(asset))
                fam, sub = self._meta_cached(Path(dst))
                _set_ass_fonts_dir([Path(dst).parent])
                return FontInfo(family=fam, subfam=sub or "regular", path=Path(dst))

        # 2) Interpret as a family name
        if font_file_name:
            p = self._fc_cached(str(font_file_name).strip())
            if p and Path(p).exists():
                fam, sub = self._meta_cached(Path(p))
                _set_ass_fonts_dir([Path(p).parent])
                return FontInfo(family=fam, subfam=sub or "regular", path=Path(p))

        # 3) Preferred professional families
        pick = self._resolve_by_family_chain(PREFERRED_FAMILIES)
        if pick:
            return pick

        # 4) DejaVu Sans via fontconfig
        p5 = self._fc_cached("DejaVu Sans")
        if p5 and Path(p5).exists():
            fam, sub = self._meta_cached(Path(p5))
            _set_ass_fonts_dir([Path(p5).parent])
            return FontInfo(family=fam, subfam=sub or "regular", path=Path(p5))

        # 5) Last resort
        _set_ass_fonts_dir([_USER_FONTS_DIR])
        return FontInfo(family="DejaVu Sans", subfam="regular", path=None)

    def load_pil_font(self, file_path: Optional[Path], px: int) -> Optional[ImageFont.FreeTypeFont]:
        """
        Cached PIL load with graceful fallbacks:
          1) Provided file path.
          2) Preferred families via fontconfig.
          3) DejaVu Sans via fontconfig.
          4) PIL default bitmap font.
        """
        key = (Path(file_path) if file_path else None, int(px))
        if key in self._pil_cache:
            return self._pil_cache[key]

        font: Optional[ImageFont.FreeTypeFont] = None

        # 1) Explicit file
        try:
            if file_path and Path(file_path).exists():
                font = ImageFont.truetype(str(Path(file_path)), int(px))
        except Exception:
            font = None

        # 2) Family chain
        if font is None:
            for fam in PREFERRED_FAMILIES:
                try:
                    p = self._fc_cached(fam)
                    if p and Path(p).exists():
                        font = ImageFont.truetype(str(Path(p)), int(px))
                        if font:
                            break
                except Exception:
                    continue

        # 3) DejaVu Sans
        if font is None:
            try:
                p = self._fc_cached("DejaVu Sans")
                if p and Path(p).exists():
                    font = ImageFont.truetype(str(Path(p)), int(px))
            except Exception:
                font = None

        # 4) PIL default
        if font is None:
            try:
                font = ImageFont.load_default()
            except Exception:
                font = None

        self._pil_cache[key] = font
        return font

    def clear_caches(self) -> None:
        """Drop all caches. Use when fonts are updated on disk."""
        self._fc_cache.clear()
        self._pil_cache.clear()
        self._meta_cache.clear()


# Singleton instance used by other modules
FONTM = FontManager()
print("Font manager ready.")


Font manager ready.


In [5]:
# @title 5) ASR + Text Utils — Hidden
# @markdown Faster-Whisper cached transcription (EN-only), LTR normalization/segmentation, precise sizing helpers.

from __future__ import annotations

import hashlib
import json
import os
import re
import shutil
import subprocess as sp
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

# ---------------------------
# Imports with fail-fast hints
# ---------------------------
try:
    from PIL import Image, ImageFont  # type: ignore
except Exception as e:
    raise RuntimeError("Pillow is required. Remediation: `pip install Pillow>=11.3.0` and re-run prior install cell.") from e

try:
    from faster_whisper import WhisperModel  # type: ignore
except Exception as e:
    raise RuntimeError("faster-whisper is required. Remediation: `pip install faster-whisper>=1.0.3`.") from e

# Optional CUDA probe (tolerate missing ctranslate2)
try:
    import ctranslate2  # type: ignore
    _ct2_cuda_count = getattr(ctranslate2, "get_cuda_device_count", lambda: 0)
except Exception:
    ctranslate2 = None  # type: ignore
    _ct2_cuda_count = lambda: 0  # type: ignore

__all__ = [
    "Word",
    "SENTENCE_GAP_SEC", "MIN_WORD_DUR", "END_PUNCT", "PUNCT",
    "normalize_words", "sentenceize",
    "ui_size_to_ass", "measure_text_px",
    "effective_render_size", "effective_render_size_pipeline",
    "detect_silences", "snap_to_voice_onset",
    "extract_speech_stats", "smart_decide",
    "transcribe_cached",
]

# ============================================================
# Data model
# ============================================================

@dataclass(frozen=True)
class Word:
    start: float
    end: float
    text: str


# ============================================================
# Constants (EN/LTR only)
# ============================================================

SENTENCE_GAP_SEC: float = 0.60
MIN_WORD_DUR: float = 0.06  # hard floor enforced by generators

# English/LTR punctuation
END_PUNCT = set(".?!…")
PUNCT = set(".,?!:;…—–-\"'()[]{}")

# ============================================================
# Compiled regex
# ============================================================

_WS_RE = re.compile(r"\s+")
_SIL_START_RE = re.compile(r"silence_start:\s*([0-9]*\.?[0-9]+)")
_SIL_END_RE = re.compile(r"silence_end:\s*([0-9]*\.?[0-9]+)")
_DOTS_TO_ELLIP = re.compile(r"(?<!\.)\.\.\.(?!\.)")  # normalize "..." → "…"

# ============================================================
# Env & misc helpers
# ============================================================

def _safe_env() -> Dict[str, str]:
    """Best-effort to obtain the runtime env mapping (copy only)."""
    try:
        return _env()  # type: ignore[name-defined]
    except Exception:
        return dict(os.environ)

def _collapse_ws(s: str) -> str:
    """Collapse internal whitespace to single spaces and strip ends."""
    return _WS_RE.sub(" ", s).strip()

def is_punct_token(token: str) -> bool:
    """True if token is non-empty and every char is in the punctuation set."""
    t = (token or "").strip()
    return bool(t) and all(ch in PUNCT for ch in t)

def _clean_token_text(token: str) -> str:
    """Normalize whitespace and convert ASCII '...' to a single ellipsis."""
    if not token:
        return ""
    token = token.replace("\n", " ").replace("\t", " ")
    token = _DOTS_TO_ELLIP.sub("…", token)
    return _collapse_ws(token)

def _clamp_duration(start: float, end: float, floor: float = MIN_WORD_DUR) -> Tuple[float, float]:
    """Ensure end ≥ start and duration ≥ floor."""
    st = float(start)
    en = float(end)
    if en < st:
        en = st
    if en - st < floor:
        en = st + floor
    return st, en

# ============================================================
# Normalize & sentence split
# ============================================================

def normalize_words(words: List[Dict[str, Any]]) -> List[Word]:
    """
    EN/LTR normalization:
      - Sort tokens by start then end time.
      - Merge pure punctuation into the previous token without spaces.
      - Keep timings monotonic; enforce min word duration.
    """
    out: List[Word] = []
    seq = sorted(
        (w for w in (words or []) if (w.get("text") or "").strip()),
        key=lambda x: (float(x.get("start", 0.0)), float(x.get("end", 0.0))),
    )
    for w in seq:
        raw = str(w.get("text", "")).strip()
        if not raw:
            continue
        st, en = _clamp_duration(w.get("start", 0.0), w.get("end", 0.0), floor=MIN_WORD_DUR)
        if is_punct_token(raw) and out:
            last = out[-1]
            out[-1] = Word(start=last.start, end=max(last.end, en), text=last.text + raw)
            continue
        cleaned = _clean_token_text(raw)
        if cleaned:
            out.append(Word(start=st, end=en, text=cleaned))
    return out

def sentenceize(words: List[Word]) -> List[List[Word]]:
    """
    Split into sentences by:
      - terminal punctuation in END_PUNCT
      - or a time gap between tokens ≥ SENTENCE_GAP_SEC
    """
    sentences: List[List[Word]] = []
    current: List[Word] = []
    prev_end: Optional[float] = None
    for w in words or []:
        gap = (float(w.start) - float(prev_end)) if prev_end is not None else 0.0
        current.append(w)
        if (w.text and w.text[-1] in END_PUNCT) or (gap >= SENTENCE_GAP_SEC):
            sentences.append(current)
            current = []
        prev_end = float(w.end)
    if current:
        sentences.append(current)
    return sentences

# ============================================================
# Font sizing & render geometry
# ============================================================

def ui_size_to_ass(value: int) -> int:
    """
    Map UI slider range (10..100) to ASS px via a smooth S-curve,
    which preserves perceptual steps at small sizes.
    """
    v = max(10, min(100, int(value)))
    px_lo, px_hi = 28, 220
    t = (v - 10) / 90.0
    eased = t * t * (3 - 2 * t)  # smoothstep
    return int(round(px_lo + (px_hi - px_lo) * eased))

def measure_text_px(file_path: Optional[Path], px: int, text: str) -> float:
    """
    Measure text width in pixels:
      - Prefer PIL FreeType getlength, then getbbox fallback.
      - Fall back to a conservative linear estimate if metrics are unavailable.
    """
    txt = _clean_token_text(text or "")
    if not txt:
        return 0.0
    # Require upstream FONTM singleton from Font Manager cell
    if "FONTM" not in globals():
        raise RuntimeError("FONTM is missing. Remediation: run the Font Manager cell before calling `measure_text_px`.")
    fnt = FONTM.load_pil_font(file_path, int(px))  # type: ignore[name-defined]
    if fnt is not None:
        try:
            return float(fnt.getlength(txt))  # type: ignore[attr-defined]
        except Exception:
            try:
                box = fnt.getbbox(txt)
                if box:
                    return float(max(0, box[2] - box[0]))
            except Exception:
                pass
    per_char = max(1.0, 0.56 * max(14, int(px)))  # conservative
    return per_char * len(txt)

def effective_render_size(final_w: int, final_h: int, subscale: float) -> Tuple[int, int, bool]:
    """
    Compute effective render size (for subscale burn-in).
    Returns (width, height, do_subscale).
    """
    if subscale >= 1.0:
        return int(final_w), int(final_h), False
    sw = int(round(final_w * float(subscale)))
    sh = int(round(final_h * float(subscale)))
    if sw <= 0 or sh <= 0:
        return int(final_w), int(final_h), False
    return sw, sh, True

def effective_render_size_pipeline(subscale: float) -> Tuple[int, int, bool]:
    """Convenience wrapper for a 1920x1080 pipeline target."""
    return effective_render_size(1920, 1080, float(subscale))

# ============================================================
# Audio: silence detection & snapping
# ============================================================

def detect_silences(audio_path: Path, noise_db: int = -35, min_silence: float = 0.35) -> List[Tuple[float, float]]:
    """
    Detect silences via ffmpeg silencedetect.
    Robust parsing, cached via META. Returns list of (start, end) pairs.
    """
    if not audio_path or not audio_path.exists():
        return []
    # Require global cache facilities from Roots & Cache cell
    if "META" not in globals() or "WORK_META" not in globals() or "_json_dump" not in globals():
        raise RuntimeError(
            "WHAT: Cache globals missing\n"
            "WHY: Required objects META/WORK_META/_json_dump are not defined\n"
            "HOW_TO_FIX: Run the Roots & Cache cell before calling `detect_silences`."
        )
    try:
        st = audio_path.stat()
        key = f"sil::{audio_path}::{int(st.st_mtime)}::{st.st_size}::{noise_db}::{min_silence}"
        if key in META:  # type: ignore[name-defined]
            return META[key]  # type: ignore[index]
    except Exception:
        key = ""  # cache disabled on error
    cmd = [
        "ffmpeg", "-hide_banner", "-nostdin", "-vn", "-nostats",
        "-i", str(audio_path),
        "-af", f"silencedetect=noise={int(noise_db)}dB:d={float(min_silence)}",
        "-f", "null", "-",
    ]
    proc = sp.run(cmd, capture_output=True, text=True, env=_safe_env())
    log = proc.stderr or ""
    silences: List[Tuple[float, float]] = []
    pending_start: Optional[float] = None
    for line in log.splitlines():
        m1 = _SIL_START_RE.search(line)
        if m1:
            try:
                pending_start = float(m1.group(1))
            except Exception:
                pending_start = None
            continue
        m2 = _SIL_END_RE.search(line)
        if m2 and pending_start is not None:
            try:
                s_en = float(m2.group(1))
                silences.append((pending_start, s_en))
            except Exception:
                pass
            pending_start = None
    silences.sort(key=lambda x: (x[0], x[1]))
    if key:
        try:
            META[key] = silences  # type: ignore[name-defined]
            _json_dump(WORK_META, META)  # type: ignore[name-defined]
        except Exception:
            pass
    return silences

def snap_to_voice_onset(t: float, silences: List[Tuple[float, float]], window: float = 0.18) -> float:
    """
    Snap a time 't' to the end of a nearby or containing silence:
      - If t is within ±window of a silence end, snap to that end.
      - If t is inside a silence interval, snap to the interval end.
    Useful for making captions feel aligned to speech onsets.
    """
    tt = float(t)
    w = max(0.0, float(window))
    if not silences:
        return tt
    for s_st, s_en in silences:
        s_st_f = float(s_st)
        s_en_f = float(s_en)
        if (s_en_f - w) <= tt <= (s_en_f + w):
            return s_en_f
        if s_st_f <= tt <= s_en_f:
            return s_en_f
    return tt

# ============================================================
# Speech stats & heuristics (language-agnostic)
# ============================================================

def extract_speech_stats(words: List[Word]) -> Dict[str, float]:
    """
    Compute basic pacing metrics:
      - wpm: words per minute across the full span
      - median_gap: median inter-word gap
      - avg_wdur: mean token duration (floored at 10ms to avoid zeros)
      - wps: average words per sentence (sentence ~ gap ≥ SENTENCE_GAP_SEC)
    """
    if not words:
        return {"wpm": 0.0, "median_gap": 0.0, "avg_wdur": 0.0, "wps": 0.0}
    ws = sorted(words, key=lambda x: (float(x.start), float(x.end)))
    starts = [float(w.start) for w in ws]
    ends = [float(w.end) for w in ws]
    total_span = max(0.001, max(ends) - min(starts))
    gaps = [max(0.0, float(ws[i].start) - float(ws[i - 1].end)) for i in range(1, len(ws))]
    if gaps:
        gs = sorted(gaps)
        n = len(gs)
        median_gap = gs[n // 2] if n % 2 else (gs[n // 2 - 1] + gs[n // 2]) / 2.0
    else:
        median_gap = 0.0
    avg_wdur = sum(max(0.01, float(w.end) - float(w.start)) for w in ws) / len(ws)
    wpm = (len(ws) / total_span) * 60.0
    sentence_count = 0
    prev_end: Optional[float] = None
    for w in ws:
        if prev_end is None or (float(w.start) - float(prev_end)) >= SENTENCE_GAP_SEC:
            sentence_count += 1
        prev_end = float(w.end)
    wps = len(ws) / max(1, sentence_count)
    return {"wpm": float(wpm), "median_gap": float(median_gap), "avg_wdur": float(avg_wdur), "wps": float(wps)}

def smart_decide(words: List[Word], bg_path: Path) -> Dict[str, Any]:
    """
    Heuristic UI defaults based on background luma and speech pace.
    Returns margins, highlight mode, karaoke offset, fps, subscale, encoder mode.
    """
    try:
        with Image.open(str(bg_path)) as im:
            resample = getattr(getattr(Image, "Resampling", Image), "BILINEAR")
            small = im.convert("L").resize((64, 36), resample)
            buf = small.tobytes()
            luma = (sum(buf) / (255.0 * len(buf))) if buf else 0.5
    except Exception:
        luma = 0.5
    stats = extract_speech_stats(words)
    wpm = float(stats["wpm"])
    median_gap = float(stats["median_gap"])
    avg_wdur = float(stats["avg_wdur"])
    hmargin = 154
    vbase = 97
    if luma > 0.65:
        vmargin = min(140, int(vbase + 10 + (luma - 0.65) * 120))
    elif luma < 0.35:
        vmargin = max(70, int(vbase - 10 - (0.35 - luma) * 80))
    else:
        vmargin = vbase
    if wpm >= 165.0 or avg_wdur <= 0.21:
        hl_mode, koffset = "word_pill", 40
    elif wpm <= 120.0 or median_gap >= 0.35:
        hl_mode, koffset = "word_pill", 60
    else:
        hl_mode, koffset = "word_pill", 50
    return {
        "hmargin": hmargin,
        "vmargin": vmargin,
        "highlight_mode": hl_mode,
        "karaoke_offset_ms": koffset,
        "fps": 24,
        "subscale": 0.80,
        "enc_mode": "studio" if wpm <= 160.0 else "race",
    }

# ============================================================
# Transcription (cached)
# ============================================================

_whisper_models: Dict[str, WhisperModel] = {}

def _read_transcript_cache(path: Path) -> Optional[List[Word]]:
    """Load words from a JSON cache if it exists and is valid."""
    try:
        data = json.loads(Path(path).read_text(encoding="utf-8"))
        records = data.get("words", [])
        out: List[Word] = []
        for rec in records:
            if {"start", "end", "text"} <= set(rec.keys()):
                out.append(Word(float(rec["start"]), float(rec["end"]), str(rec["text"])))
        return out or None
    except Exception:
        return None

def _load_whisper_model(model_name: str, device: str) -> WhisperModel:
    """
    Lazy model loader with compute_type fallbacks.
    CUDA prefers half-precision/int8 hybrids; CPU favors int8.
    """
    if model_name in _whisper_models:
        return _whisper_models[model_name]
    last_error: Optional[Exception] = None
    candidates = ["float16", "int8_float16", "int8"] if device == "cuda" else ["int8", "int8_float32"]
    for compute_type in candidates:
        try:
            model = WhisperModel(
                model_name,
                device=device,
                compute_type=compute_type,
                download_root=str(WORK_CACHE),  # type: ignore[name-defined]
                cpu_threads=max(2, (os.cpu_count() or 2)),
                num_workers=1,
            )
            _whisper_models[model_name] = model
            return model
        except Exception as e:
            last_error = e
    raise last_error or RuntimeError(f"Failed to load Whisper model: {model_name}")

def transcribe_cached(audio_path: Path, model_name: str, lang: str = "en") -> List[Word]:
    """
    Faster-Whisper transcription with VAD and on-disk caching.
    Cache key is independent of compute_type; per-device for stability.
    """
    if not audio_path or not Path(audio_path).exists():
        raise FileNotFoundError(f"Audio not found: {audio_path}")
    # Require upstream cache/work roots
    if "WORK_CACHE" not in globals():
        raise RuntimeError(
            "WHAT: WORK_CACHE missing\n"
            "WHY: Required cache directory not defined or created\n"
            "HOW_TO_FIX: Run the Roots & Cache cell before transcription."
        )
    try:
        has_cuda = bool(_ct2_cuda_count() > 0)
    except Exception:
        has_cuda = False
    device = "cuda" if has_cuda else "cpu"
    try:
        ah = sha1_file(audio_path)  # type: ignore[name-defined]
    except Exception as e:
        raise RuntimeError(
            "WHAT: sha1 helper unavailable\n"
            "WHY: sha1_file not found from Core Utils\n"
            "HOW_TO_FIX: Run the Core Utils cell before transcription."
        ) from e
    key = {"v": 8, "audio": str(audio_path), "sha1": ah, "model": model_name, "lang": lang, "device": device}
    cache_path = Path(WORK_CACHE) / f"tr_{hashlib.sha1(json.dumps(key, sort_keys=True).encode()).hexdigest()}.json"  # type: ignore[name-defined]
    cached = _read_transcript_cache(cache_path) if Path(cache_path).exists() else None
    if cached is not None:
        return cached
    model = _load_whisper_model(model_name, device)
    try:
        segments, _info = model.transcribe(
            str(audio_path),
            language=lang,
            vad_filter=True,
            vad_parameters=dict(threshold=0.55, min_speech_duration_ms=250),
            word_timestamps=True,
            beam_size=1,
            best_of=1,
        )
    except Exception:
        segments, _info = model.transcribe(
            str(audio_path),
            language=lang,
            vad_filter=False,
            word_timestamps=True,
            beam_size=1,
            best_of=1,
        )
    raw_words: List[Dict[str, Any]] = []
    for seg in segments:
        words_attr = getattr(seg, "words", None)
        if not words_attr:
            continue
        for w in words_attr:
            token = (getattr(w, "word", "") or "").strip()
            if token:
                raw_words.append(
                    {
                        "start": float(getattr(w, "start", 0.0)),
                        "end": float(getattr(w, "end", 0.0)),
                        "text": token,
                    }
                )
    words = normalize_words(raw_words)
    try:
        if "_json_dump" not in globals():
            raise RuntimeError("Missing _json_dump for cache write. Remediation: run Roots & Cache cell.")
        _json_dump(cache_path, {"words": [{"start": w.start, "end": w.end, "text": w.text} for w in words]})  # type: ignore[name-defined]
    except Exception:
        pass
    return words

# ============================================================
# Preemptive preflight (no side-effects beyond bounded probes)
# ============================================================

def _have_exe(name: str) -> bool:
    try:
        return shutil.which(name) is not None
    except Exception:
        return False

def _ffmpeg_audio_smoke() -> None:
    """
    Minimal audio pipeline probe to prevent runtime surprise:
    anullsrc (48 kHz stereo) → 0.1s → null mux. Raises on failure.
    """
    if not _have_exe("ffmpeg"):
        raise RuntimeError(
            "WHAT: ffmpeg tool check\n"
            "WHY: ffmpeg not found on PATH\n"
            "HOW_TO_FIX: Install ffmpeg and ensure it is on PATH, then re-run the install/setup cell."
        )
    cmd = [
        "ffmpeg", "-hide_banner", "-loglevel", "error", "-nostdin",
        "-f", "lavfi", "-i", "anullsrc=r=48000:cl=stereo",
        "-t", "0.1", "-ac", "2", "-ar", "48000",
        "-f", "null", "-"
    ]
    p = sp.run(cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True, env=_safe_env())
    if p.returncode != 0:
        err = (p.stderr or "").strip()
        raise RuntimeError(
            "WHAT: FFmpeg audio smoke test\n"
            f"WHY: pipeline failed with error: {err or 'unknown'}\n"
            "HOW_TO_FIX: Ensure ffmpeg has libavfilter enabled and audio filters available."
        )

def _preflight_asr_utils() -> None:
    # Tools presence
    if not _have_exe("ffmpeg"):
        raise RuntimeError(
            "WHAT: Required tools check\n"
            "WHY: ffmpeg is missing\n"
            "HOW_TO_FIX: Install ffmpeg and ensure it’s on PATH."
        )
    if not _have_exe("ffprobe"):
        raise RuntimeError(
            "WHAT: Required tools check\n"
            "WHY: ffprobe is missing\n"
            "HOW_TO_FIX: Install ffmpeg (includes ffprobe) and ensure PATH is set."
        )
    # Cache/work roots
    if "WORK_CACHE" not in globals() or not Path(WORK_CACHE).exists():  # type: ignore[name-defined]
        raise RuntimeError(
            "WHAT: WORK_CACHE directory missing\n"
            "WHY: Roots & Cache cell was not executed or path is invalid\n"
            "HOW_TO_FIX: Run the Roots & Cache cell to create WORK_CACHE."
        )
    if not os.access(str(WORK_CACHE), os.W_OK):  # type: ignore[name-defined]
        raise RuntimeError(
            "WHAT: Cache path not writable\n"
            "WHY: Insufficient permissions for WORK_CACHE\n"
            "HOW_TO_FIX: Fix permissions or configure a writable cache root."
        )
    # Shared helpers
    if "sha1_file" not in globals():
        raise RuntimeError(
            "WHAT: Helper dependency missing\n"
            "WHY: sha1_file is undefined\n"
            "HOW_TO_FIX: Run the Core Utils cell before this one."
        )
    # Audio smoke
    _ffmpeg_audio_smoke()

# Execute preflight now (bounded, deterministic)
__pref_ok = True
try:
    _preflight_asr_utils()
except Exception:
    __pref_ok = False


In [6]:
# @title 6) Caption Layout + ASS (Static Word Box) — Final (SCA v3)
# Pixel-accurate English LTR captions with adaptive alignment, safe areas, and standards-correct ASS escaping.

from __future__ import annotations

import hashlib
import json
import math
import os
import re
from functools import lru_cache
from pathlib import Path
from shutil import which
from typing import List, Optional, Tuple

# ---------- Tunables ----------
BASE_MIN_WORD_DUR: float = 0.06
BASE_MIN_CAP_DUR: float = 0.25
BASE_LINE_SPACE: float = 0.20
BOX_ALPHA: str = "55"
BOX_PAD_X: float = 0.08
BOX_PAD_Y: float = 0.15
GUARD_PX: int = 2
CACHE_VER: int = 202

# Width calibration to match ASS renderer
WIDTH_CAL_MAP: dict[str, float] = {
    "Inter 18pt ExtraBold": 0.972,
    "Arial": 0.965,
    "DejaVu Sans": 0.982,
}
WIDTH_CAL_DEFAULT: float = 0.969

# ---------- Precompiled regex ----------
_RE_WS_MULTI  = re.compile(r"\s+")
_RE_WS_PUNCT1 = re.compile(r"\s+([,.:;!?])")
_RE_WS_PUNCT2 = re.compile(r"([(\[{\"])\s+")
_RE_WS_PUNCT3 = re.compile(r"\s+([)\]\"])")

# ---------- Small constants ----------
_END_PUNCT = (".", "?", "!", "…")
_MID_PUNCT = (",", ";", ":")

# ---------------- helpers: text ----------------
def _sanitize_tokens(tokens: List["Word"], sentence_head: bool = False) -> List["Word"]:
    """Merge/pad whitespace around punctuation and preserve timing alignment."""
    if not tokens:
        return tokens
    raw = " ".join(t.text for t in tokens)
    raw = _RE_WS_MULTI.sub(" ", raw).strip()
    raw = _RE_WS_PUNCT1.sub(r"\1", raw)
    raw = _RE_WS_PUNCT2.sub(r"\1", raw)
    raw = _RE_WS_PUNCT3.sub(r"\1", raw)
    if sentence_head and raw and raw[0].isalpha():
        raw = raw[0].upper() + raw[1:]
    parts = raw.split(" ")
    out: List["Word"] = []
    i = 0
    for p in parts:
        if i < len(tokens):
            t = tokens[i]
            out.append(Word(float(t.start), float(t.end), p))
            i += 1
        else:
            last = out[-1]
            out[-1] = Word(float(last.start), float(last.end), f"{last.text} {p}")
    while i < len(tokens):
        out.append(tokens[i])
        i += 1
    return out


def _ass_escape(text: str) -> str:
    """ASS v4+ escaping: ONLY backslash, braces, and newline→\\N (standards-correct)."""
    t = (text or "")
    t = t.replace("\\", "\\\\").replace("{", "\\{").replace("}", "\\}")
    return t.replace("\n", r"\N")


# ---------------- helpers: measure/geometry (cached) ----------------
@lru_cache(maxsize=65536)
def _measured_px_cached(fp_str: str, px: int, text: str, fam: str) -> int:
    """Cached text width with renderer calibration."""
    raw = float(measure_text_px(Path(fp_str) if fp_str else None, int(px), text))  # type: ignore[name-defined]
    cal = float(WIDTH_CAL_MAP.get(fam, WIDTH_CAL_DEFAULT))
    return max(0, int(round(raw * cal)))


@lru_cache(maxsize=512)
def _space_w_cached(fp_str: str, px: int, fam: str) -> int:
    return max(1, _measured_px_cached(fp_str, int(px), " ", fam))


def _measure_tok_w_cached(file_path: Optional[Path], px: int, text: str, fam: str) -> int:
    return _measured_px_cached(str(file_path) if file_path else "", int(px), text, fam)


def _space_w(file_path: Optional[Path], px: int, fam: str) -> int:
    return _space_w_cached(str(file_path) if file_path else "", int(px), fam)


def _font_metrics(px: int, file_path: Optional[Path]) -> Tuple[int, int, int]:
    """Return (ascent, descent, line_height) with robust fallbacks."""
    f = FONTM.load_pil_font(file_path, int(px))  # type: ignore[name-defined]
    if f:
        try:
            asc, desc = f.getmetrics()
            lh = max(1, int(round((asc + desc) * 1.05)))
            return int(asc), int(desc), int(lh)
        except Exception:
            pass
    lh = max(1, int(math.ceil(int(px) * 1.7)))
    asc = int(round(lh * 0.8))
    desc = max(1, lh - asc)
    return int(asc), int(desc), int(lh)


def _ensure_interval(st: float, et: float, mind: float) -> Tuple[float, float]:
    """Clamp interval to minimum duration with stable ordering."""
    st = float(st); et = float(et); mind = float(mind)
    if et - st < mind:
        if st >= et:
            st = max(0.0, et - mind)
        else:
            et = st + mind
    return float(st), float(et)


# ---------------- prefix sums & spans ----------------
def _prefix_adv(widths: List[int], sp_w: int) -> List[int]:
    """Prefix-advance for tokens with a fixed inter-word spacing."""
    run = 0
    out: List[int] = []
    for i, w in enumerate(widths):
        run += w
        if i > 0:
            run += sp_w
        out.append(run)
    return out


def _build_word_spans_from_adv(
    adv: List[int],
    rw: int,
    margin_h: int,
    align_mode: str,
    sp_w: int
) -> Tuple[List[Tuple[int, int]], int, int]:
    """
    Build inclusive [L, R) spans for each token in line space.
    Returns (spans, line_width, x_origin).
    """
    if not adv:
        return [], 0, max(0, int(margin_h))
    usable_line_w = min(adv[-1], max(80, int(rw) - 2 * max(0, int(margin_h))))

    al = (align_mode or "left").strip().lower()
    if al == "center":
        x0 = int(round(max(margin_h, (rw - usable_line_w) / 2)))
        x0 = min(x0, max(margin_h, rw - margin_h - usable_line_w))
    elif al == "right":
        x0 = int(round(max(margin_h, rw - margin_h - usable_line_w)))
    else:
        x0 = int(max(0, margin_h))

    spans: List[Tuple[int, int]] = []
    left_edge = 0
    for i, a in enumerate(adv):
        right_edge = a
        if i + 1 < len(adv):
            right_edge = min(right_edge, max(adv[i + 1] - sp_w // 2, a))
        L = max(0, min(rw, int(round(x0 + left_edge))))
        R = max(L + 1, min(rw, int(round(x0 + right_edge))))
        spans.append((L, R))
        left_edge = a + sp_w

    return spans, int(round(usable_line_w)), int(x0)


# ---------------- line breaking (1–2 lines, greedy but width-aware) -----------
def _fit_line_with_adv(usable: int, max_words: int, adv: List[int], i0: int) -> int:
    """
    Decide how many tokens from adv[i0:] go into the current line.
    Uses O(1) prefix width queries with a gentle rag penalty.
    """
    n = min(len(adv) - i0, max(1, int(max_words)))
    best = 1
    best_pen = 1e9

    def prefix(j: int) -> int:
        return adv[i0 + j - 1] - (adv[i0 - 1] if i0 > 0 else 0)

    for j in range(1, n + 1):
        lw = prefix(j)
        if lw > usable and j > 1:
            continue
        if lw > usable and j == 1:
            return 1
        rag = (usable - lw) / float(usable)
        pen = (rag * rag) * 0.25
        if (pen < best_pen) or (abs(pen - best_pen) < 1e-9 and j > best):
            best_pen, best = pen, j
    return int(best or 1)


def _words_to_lines(
    tokens: List["Word"],
    file_path: Optional[Path],
    px: int,
    vw: int,
    h_margin: int,
    max_wpl: int,
    line_count: int,
    fam: str,
) -> Tuple[List[List["Word"]], List["Word"]]:
    """Greedy line-fit for 1–2 lines with cached measurements."""
    usable = max(80, int(vw) - 2 * max(0, int(h_margin)))
    max_words = max(1, int(max_wpl))
    sp_w = _space_w(file_path, int(px), fam)

    buf = list(tokens)
    if not buf:
        return [], []

    widths = [_measure_tok_w_cached(file_path, int(px), w.text, fam) for w in buf]
    adv = _prefix_adv(widths, sp_w)

    lines: List[List["Word"]] = []
    i0 = 0
    for _ in range(max(1, min(2, int(line_count)))):  # 1–2 lines
        if i0 >= len(buf):
            break
        cut = _fit_line_with_adv(usable, max_words, adv, i0)
        line = buf[i0 : i0 + cut]
        lines.append(line)
        i0 += cut
        if i0 >= len(buf):
            break
    rest = buf[i0:] if i0 < len(buf) else []
    return lines, rest


def _split_sentence_to_captions(
    sent: List["Word"],
    file_path: Optional[Path],
    px: int,
    vw: int,
    h_margin: int,
    max_wpl: int,
    max_wpc: int,
    line_count: int,
    fam: str,
) -> List[List[List["Word"]]]:
    """Split one sentence into caption chunks with widow/orphan control."""
    toks = _sanitize_tokens(sent, sentence_head=True)
    out: List[List[List["Word"]]] = []
    i = 0
    n = len(toks)
    cap_max = max(1, int(max_wpc))
    while i < n:
        end = min(n, i + cap_max)
        chunk = toks[i:end]
        lines, _ = _words_to_lines(
            chunk, file_path, int(px), int(vw), int(h_margin), int(max_wpl), int(line_count), fam
        )
        if not lines:
            i += 1
            continue
        lines = [_sanitize_tokens(ln, sentence_head=(len(out) == 0)) for ln in lines if ln]
        out.append(lines)

        # widow/orphan guard
        MIN_CHARS_WIDOW = 6
        if len(out) >= 2:
            last_chars = sum(len(w.text) for ln in out[-1] for w in ln)
            if last_chars < MIN_CHARS_WIDOW:
                out[-2] = out[-2] + out[-1]
                out.pop()

        used = max(1, sum(len(ln) for ln in lines))
        i += used
    return out


# ---------------- main: generate_ass_track ----------------
def generate_ass_track(
    words: List["Word"],
    font_info: "FontInfo",
    ui_font_size: int,
    position: str,
    margin_h: int,
    margin_v: int,
    border_thickness: int,
    primary_color: str,
    highlight_color: Optional[str],
    time_offset: float,
    karaoke_offset_ms: int,
    style_profile: str,
    highlight_mode_ui: str,
    rw: int,
    rh: int,
    keyword_overlay: bool,
    keyword_list: List[str],
    key_txt_color: str,
    key_bg_color: str,
    key_pos: str,
    audio_for_snap: Optional[Path],
    force_bold_ui: bool,
    outline_override: int,
    line_count: int,
    max_words_per_line: int,
    max_words_per_caption: int,
    strict_word: bool,
    align_mode: str = "left",
    safe_area_pct: int = 92,
    line_space_pct: int = 100,
    target_cps: int = 17,
    min_caption_sec: float = BASE_MIN_CAP_DUR,
    max_extend_sec: float = 0.40,
) -> Path:
    # ---- preflight guards (no side-effects) ----
    REQUIRED_GLOBALS = ("ui_size_to_ass", "measure_text_px", "sentenceize", "ass_timestamp", "bgr_hex", "FONTM", "WORK_CACHE")
    for name in REQUIRED_GLOBALS:
        if name not in globals():
            raise RuntimeError(f"Missing dependency `{name}`. HOW_TO_FIX: run prior cells (Core Utils, Font Manager, Roots & Cache).")
    if audio_for_snap is not None and "detect_silences" not in globals():
        raise RuntimeError("Missing `detect_silences`. HOW_TO_FIX: run the ASR/Text Utils cell providing silence detection.")
    if audio_for_snap is not None and "snap_to_voice_onset" not in globals():
        raise RuntimeError("Missing `snap_to_voice_onset`. HOW_TO_FIX: run the ASR/Text Utils cell.")
    if which("ffmpeg") is None or which("ffprobe") is None:
        raise RuntimeError("ffmpeg/ffprobe not found on PATH. HOW_TO_FIX: install ffmpeg (apt/pip/conda) or run your Install cell.")

    # ---- cache root sanity (no creation here) ----
    if not isinstance(WORK_CACHE, Path) or not WORK_CACHE.exists() or not WORK_CACHE.is_dir():  # type: ignore[name-defined]
        raise RuntimeError("WORK_CACHE is missing. HOW_TO_FIX: run the Roots & Cache cell to create the working cache directory.")

    # --------- geometry & styles ----------
    rw = max(16, int(rw)); rh = max(16, int(rh))
    margin_h = max(0, int(margin_h)); margin_v = max(0, int(margin_v))
    mapped_fs = int(ui_size_to_ass(int(ui_font_size)))  # type: ignore[name-defined]

    primary_color = primary_color or "#FFFFFF"
    highlight_color = highlight_color or "#19B5FF"
    text_bgr = bgr_hex(primary_color)   # type: ignore[name-defined]
    box_bgr  = bgr_hex(highlight_color) # type: ignore[name-defined]

    outline_source_val = int(border_thickness or 0)
    if outline_override and int(outline_override) > 0:
        outline_source_val = int(outline_override)
    min_outline = max(1, int(round(mapped_fs * 0.04)))  # WCAG-AA guard
    outline_px = max(min_outline, min(int(outline_source_val), 8))

    safe_pct = max(60, min(100, int(safe_area_pct)))
    safe_margin_h = int(round(rw * (1 - safe_pct / 100.0) / 2))
    safe_margin_v = int(round(rh * (1 - safe_pct / 100.0) / 2))
    margin_h = max(margin_h, safe_margin_h)
    margin_v = max(margin_v, safe_margin_v)

    align_norm = (align_mode or "left").strip().lower()
    hl_mode = (highlight_mode_ui or "word_pill").strip().lower()
    if hl_mode not in {"word_pill", "word_fill", "word_bg", "none"}:
        hl_mode = "word_pill"
    effective_hl = hl_mode if strict_word else ("word_bg" if hl_mode != "none" else "none")

    asc, desc, line_h = _font_metrics(int(mapped_fs), font_info.path)
    gap_factor = max(0.4, min(2.0, float(line_space_pct or 100) / 100.0))
    gap_y = max(4, int(round(mapped_fs * BASE_LINE_SPACE * gap_factor)))

    nlines = max(1, int(line_count))
    block_h = int(line_h if nlines == 1 else (line_h * nlines + (nlines - 1) * gap_y))

    pos_norm = (position or "Bottom").strip().lower()
    if pos_norm == "bottom":
        y_top = rh - margin_v - block_h
    elif pos_norm == "middle":
        y_top = max(0, rh // 2 - block_h // 2)
    else:
        y_top = max(margin_v, int(rh * 0.10))

    pad_x = max(2, int(round(mapped_fs * BOX_PAD_X)))
    pad_y = max(1, int(round(max(2, desc) * BOX_PAD_Y)))

    # SCA clamp for karaoke offset (ms)
    k_ms_in = int(karaoke_offset_ms)
    k_ms = max(-1000, min(1000, k_ms_in))
    base_off = float(time_offset or 0.0) + float(k_ms / 1000.0)

    min_cap_dur = max(BASE_MIN_CAP_DUR, float(min_caption_sec))
    max_extend = max(0.0, float(max_extend_sec))
    min_word_dur = float(BASE_MIN_WORD_DUR)
    target_cps = max(8, min(28, int(target_cps or 17)))  # SCA clamp

    # --------- cache key ----------
    words_fp = [{"s": float(w.start), "e": float(w.end), "t": w.text} for w in words]
    text_hash = hashlib.sha1(
        json.dumps(words_fp, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
    ).hexdigest()
    ckey = {
        "v": CACHE_VER, "rw": rw, "rh": rh,
        "fam": font_info.family, "fs": mapped_fs,
        "mh": margin_h, "mv": margin_v, "txt": text_hash,
        "out": outline_px, "lc": nlines, "pos": pos_norm,
        "align": align_norm, "safe": safe_pct, "gap": int(round(gap_factor * 100)),
        "hl": hl_mode, "strict": bool(strict_word), "target_cps": target_cps,
        "mincap": round(min_cap_dur, 3), "extend": round(max_extend, 3),
        "k_ms": k_ms,
    }
    kpath = WORK_CACHE / f"ass_{hashlib.sha1(json.dumps(ckey, sort_keys=True).encode()).hexdigest()}.ass"  # type: ignore[name-defined]
    if kpath.exists():
        return kpath

    # --------- text + audio landmarks ----------
    sents = sentenceize(words)  # type: ignore[name-defined]
    sils = detect_silences(audio_for_snap) if audio_for_snap else []  # type: ignore[name-defined]
    _sils_tuple = tuple((float(a), float(b)) for a, b in sils) if sils else ()

    _snap_cache: dict[tuple[float, float], float] = {}

    def _snap(st: float, window: float) -> float:
        key = (round(st, 3), round(window, 3))
        v = _snap_cache.get(key)
        if v is not None:
            return v
        val = snap_to_voice_onset(st, _sils_tuple, window=window) if _sils_tuple else st  # type: ignore[name-defined]
        _snap_cache[key] = val
        return val

    # --------- write ASS ----------
    tmp = kpath.with_suffix(".tmp.ass")
    with tmp.open("w", encoding="utf-8", newline="\n") as f:
        fw = f.write
        fw(
            "[Script Info]\n"
            "ScriptType: v4.00+\n"
            f"PlayResX: {rw}\nPlayResY: {rh}\nWrapStyle: 2\nScaledBorderAndShadow: yes\nYCbCr Matrix: TV.709\n\n"
        )
        fw(
            "[V4+ Styles]\n"
            "Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, "
            "Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, "
            "Alignment, MarginL, MarginR, MarginV, Encoding\n"
        )
        bold_flag = -1 if force_bold_ui else 0
        fw(
            f"Style: Default,{font_info.family},{mapped_fs},&H00{text_bgr},&H00{text_bgr},&H00000000,&H00000000,"
            f"{bold_flag},0,0,0,100,100,0,0,1,{outline_px},0,7,{margin_h},{margin_h},{margin_v},1\n\n"
        )
        fw("[Events]\nFormat: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n")

        ts = ass_timestamp  # type: ignore[name-defined]
        ass_align_map = {"left": 7, "center": 8, "right": 9}
        an_default = ass_align_map.get(align_norm, 7)

        for sent in sents:
            caps = _split_sentence_to_captions(
                sent,
                font_info.path,
                int(mapped_fs),
                int(rw),
                int(margin_h),
                int(max_words_per_line) or 999,
                int(max_words_per_caption) or 999,
                nlines,
                font_info.family,
            )
            if not caps:
                continue

            events: List[str] = []
            apnd = events.append
            sp_w_global = _space_w(font_info.path, int(mapped_fs), font_info.family)

            for li_group, lines in enumerate(caps):
                if not lines:
                    continue

                st_raw = float(lines[0][0].start)
                et_raw = float(lines[-1][-1].end)
                st_abs = max(0.0, _snap(st_raw, 0.18) + base_off)
                et_abs = max(st_abs + min_cap_dur, et_raw + base_off)
                st_abs, et_abs = _ensure_interval(st_abs, et_abs, min_cap_dur)

                # pacing extension to meet target cps
                caption_chars = sum(len(w.text) for ln in lines for w in ln)
                current_span = max(0.001, et_abs - st_abs)
                need_extend = max(0.0, (caption_chars / float(target_cps)) - current_span)
                if max_extend > 0.0 and need_extend > 0.0:
                    et_abs += min(max_extend, need_extend)
                    st_abs, et_abs = _ensure_interval(st_abs, et_abs, min_cap_dur)

                # active word window clamp
                act_ws = min(
                    max(0.0, _snap(float(w.start), 0.10) + base_off)
                    for ln in lines
                    for w in ln
                )
                act_we = max(max(0.0, float(w.end) + base_off) for ln in lines for w in ln)
                first_ws, last_we = _ensure_interval(act_ws, act_we, min_cap_dur)
                st_abs = min(st_abs, first_ws)
                et_abs = max(et_abs, last_we)

                for li, ln in enumerate(lines):
                    # precompute widths/adv once per line
                    tok_w = [_measure_tok_w_cached(font_info.path, int(mapped_fs), w.text, font_info.family) for w in ln]
                    adv = _prefix_adv(tok_w, sp_w_global)
                    spans, line_w, x_origin = _build_word_spans_from_adv(
                        adv, int(rw), int(margin_h), align_norm, sp_w_global
                    )
                    if not spans:
                        continue

                    if align_norm == "center":
                        anchor_x = int(round(x_origin + line_w / 2))
                    elif align_norm == "right":
                        anchor_x = int(round(x_origin + line_w))
                    else:
                        anchor_x = int(round(x_origin))
                    anchor_x = max(0, min(int(rw), int(anchor_x)))

                    y_line = int(y_top) + li * (line_h + gap_y)
                    txt = _ass_escape(" ".join(w.text for w in ln))

                    # ----- highlight layers -----
                    if effective_hl == "word_bg":
                        # single line bar
                        x1_line = max(0, spans[0][0] - pad_x)
                        x2_line = min(int(rw), spans[-1][1] + pad_x)
                        H = int(line_h + 2 * pad_y)
                        yT = max(0, y_line - pad_y)
                        yB = min(int(rh), yT + H)
                        if x2_line > x1_line + 1:
                            path = f"m {x1_line} {yT} l {x2_line} {yT} {x2_line} {yB} {x1_line} {yB}"
                            apnd(
                                f"Dialogue: 4,{ts(st_abs)},{ts(et_abs)},Default,,0,0,0,"
                                f"{{\\p1\\pos(0,0)\\bord0\\shad0\\c&H{box_bgr}&\\alpha&H{BOX_ALPHA}&\\blur0.8\\be0.8}}"
                                f"{path}{{\\p0}}\n"
                            )

                    elif effective_hl in {"word_pill", "word_fill"}:
                        for (x1, x2), w in zip(spans, ln):
                            ws = max(0.0, _snap(float(w.start), 0.10) + base_off)
                            we = max(ws + min_word_dur, float(w.end) + base_off)
                            ws, we = _ensure_interval(ws, we, min_word_dur)
                            ws = max(ws, st_abs)
                            we = min(we, et_abs)
                            if we - ws <= 1e-4:
                                continue

                            yT_box = max(0, y_line - pad_y)
                            H = int(line_h + 2 * pad_y)
                            yB_box = min(int(rh), yT_box + H)

                            if effective_hl == "word_pill":
                                FUDGE_L = max(0, int(round(outline_px * 0.6)))
                                FUDGE_R = max(0, int(round(outline_px * 0.4)))
                                EXTRA_SIDE = max(1, int(round(mapped_fs * 0.03)))
                                xL = max(0, x1 - pad_x - FUDGE_L - EXTRA_SIDE)
                                xR = min(int(rw), x2 + pad_x + FUDGE_R + EXTRA_SIDE)
                                if xR <= xL + 1:
                                    continue
                                path = f"m {xL} {yT_box} l {xR} {yT_box} {xR} {yB_box} {xL} {yB_box}"
                                apnd(
                                    f"Dialogue: 5,{ts(ws)},{ts(we)},Default,,0,0,0,"
                                    f"{{\\p1\\pos(0,0)\\bord0\\shad0\\c&H{box_bgr}&\\alpha&H{BOX_ALPHA}&\\blur0.8\\be0.8}}"
                                    f"{path}{{\\p0}}\n"
                                )
                            else:
                                clip_left = max(0, x1 + 1)
                                clip_right = min(int(rw), x2 - 1)
                                if clip_right <= clip_left + 1:
                                    continue
                                clip_tag = f"\\clip({clip_left},{yT_box},{clip_right},{yB_box})"
                                tag = (
                                    "{\\q2\\an" + str(an_default) +
                                    f"\\pos({anchor_x},{y_line})" +
                                    f"\\bord0\\shad0\\1a&H00&\\3a&H00&\\3c&H000000&\\c&H{box_bgr}&{clip_tag}" +
                                    "}"
                                )
                                apnd("Dialogue: 11," + ts(ws) + "," + ts(we) + ",Default,,0,0,0," + tag + txt + "\n")

                    # ----- text layer -----
                    text_tag = (
                        "{\\q2\\an" + str(an_default) +
                        f"\\pos({anchor_x},{y_line})" +
                        f"\\bord{outline_px}\\shad0.7\\1a&H00&\\3a&H00&\\3c&H000000&\\c&H{text_bgr}&" +
                        "}"
                    )
                    apnd("Dialogue: 10," + ts(st_abs) + "," + ts(et_abs) + ",Default,,0,0,0," + text_tag + txt + "\n")

            if events:
                fw(''.join(events))

    os.replace(tmp, kpath)
    return kpath


In [7]:
# @title 7) Build Main Track — Clean, Font-locked, BT.709, NVENC→x264 fallback (FINAL API, Refactored • SCA v3)
# Burns ASS at subscale, then upscales. Honors ASS_FONT_DIR for reliable font pick.

from __future__ import annotations

import hashlib
import json
from pathlib import Path
from typing import List, Optional, Tuple
from shutil import which

# Assumptions: _run, _json_dump, WORK_META, META, WORK_CACHE, ffesc, NVENC_AVAILABLE
# effective_render_size, probe_image_wh, sha1_file

# ---------------------------- small probes + cache ----------------------------

def _path_sig(p: Path) -> Tuple[int, int]:
    """(mtime, size) tuple for cache keys; raises if path is missing."""
    st = p.stat()
    return int(st.st_mtime), int(st.st_size)


def has_audio_stream_cached(p: Optional[Path]) -> bool:
    """
    Return True if the input has an audio stream, caching by file identity.
    Falls back to False on probing failure.
    """
    if not p or not p.exists():
        return False
    mt, sz = _path_sig(p)
    key = f"audiostream::{p}::{mt}::{sz}"
    if key in META:  # type: ignore[name-defined]
        return bool(META[key])  # type: ignore[index]
    try:
        r = _run(  # type: ignore[name-defined]
            ["ffprobe", "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=index",
             "-of", "compact=p=0:nk=1", str(p)],
            check=False,
        ).stdout.strip()
        ok = bool(r)
    except Exception:
        ok = False
    META[key] = ok  # type: ignore[name-defined]
    _json_dump(WORK_META, META)  # type: ignore[name-defined]
    return ok


def probe_duration_cached(p: Optional[Path]) -> float:
    """
    Return media duration in seconds using ffprobe, cached by file identity.
    Returns 0.0 on error.
    """
    if not p or not p.exists():
        return 0.0
    mt, sz = _path_sig(p)
    key = f"dur::{p}::{mt}::{sz}"
    if key in META:  # type: ignore[name-defined]
        return float(META[key])  # type: ignore[index]
    try:
        r = _run(  # type: ignore[name-defined]
            ["ffprobe", "-v", "error", "-show_entries", "format=duration",
             "-of", "default=nw=1:nk=1", str(p)],
            check=False,
        ).stdout.strip()
        d = float(r) if r else 0.0
    except Exception:
        d = 0.0
    META[key] = d  # type: ignore[name-defined]
    _json_dump(WORK_META, META)  # type: ignore[name-defined]
    return d


# ---------------------------- ffmpeg helpers ----------------------------

def _existing_dirs(paths: List[Path]) -> List[Path]:
    """Return unique, existing directories from the given list, preserving order."""
    out: List[Path] = []
    seen: set[Path] = set()
    for p in paths:
        try:
            q = Path(p).expanduser().resolve()
            if q.exists() and q.is_dir() and q not in seen:
                out.append(q)
                seen.add(q)
        except Exception:
            continue
    return out


def _ff_has_filter(name: str) -> bool:
    """True if ffmpeg has the named filter (cached)."""
    name = f" {name.strip()} "
    try:
        out = _run(["ffmpeg", "-hide_banner", "-filters"], check=False).stdout  # type: ignore[name-defined]
    except Exception:
        return False
    return name in out


def _fonts_resolvable() -> bool:
    """
    Ensure at least one font family is resolvable by libass/fontconfig.
    Prefer fc-match; fall back to ~/.fonts presence.
    """
    try:
        if which("fc-match"):
            r = _run(["fc-match", "-v", "Inter,DejaVu Sans"], check=False).stdout  # type: ignore[name-defined]
            return 'file: "' in (r or "")
    except Exception:
        pass
    try:
        fdir = Path("~/.fonts").expanduser()
        if fdir.exists():
            for ext in (".ttf", ".otf", ".ttc", ".otc"):
                if any(fdir.glob(f"*{ext}")):
                    return True
    except Exception:
        pass
    return False


def _preflight_requirements() -> None:
    """Fail-fast checks with explicit remediation; no side-effects."""
    missing = [n for n in ("_run", "WORK_CACHE", "ffesc", "effective_render_size", "probe_image_wh", "sha1_file") if n not in globals()]
    if missing:
        raise RuntimeError(f"Missing dependencies: {', '.join(missing)}. HOW_TO_FIX: run Roots/Core/Utils cells first.")
    if which("ffmpeg") is None or which("ffprobe") is None:
        raise RuntimeError("ffmpeg/ffprobe not found on PATH. HOW_TO_FIX: install ffmpeg or run your Install cell.")
    wc = globals().get("WORK_CACHE")
    if not isinstance(wc, Path) or not wc.exists() or not wc.is_dir():  # type: ignore[func-returns-value]
        raise RuntimeError("WORK_CACHE directory is missing. HOW_TO_FIX: run the Roots & Cache cell to initialize paths.")
    if not _ff_has_filter("subtitles"):
        raise RuntimeError("WHAT: FFmpeg subtitles filter check\nWHY: 'subtitles' filter (libass) not available\nHOW_TO_FIX: Install ffmpeg with libass enabled.")
    if not _fonts_resolvable():
        raise RuntimeError("WHAT: Font resolution\nWHY: No resolvable font family for libass\nHOW_TO_FIX: Ensure at least one TTF/OTF is in ~/.fonts or system fonts (e.g., DejaVu Sans).")


def _compute_loudnorm_params(audio_path: Path) -> Optional[str]:
    """
    Run a fast EBU R128 first pass and return a parameterized second-pass string.
    Returns None if parsing fails or loudnorm is unavailable.
    """
    if not has_audio_stream_cached(audio_path) or not _ff_has_filter("loudnorm"):
        return None
    cmd = [
        "ffmpeg", "-hide_banner", "-nostdin", "-v", "error",
        "-i", str(audio_path),
        "-af", "loudnorm=I=-16:LRA=11:TP=-1.5:print_format=json",
        "-f", "null", "-"
    ]
    cp = _run(cmd, check=False)  # type: ignore[name-defined]
    log = (cp.stderr or "") + (cp.stdout or "")
    # Extract the last JSON object (ffmpeg prints one per stream)
    start = log.rfind("{")
    end = log.rfind("}")
    if start == -1 or end == -1 or end <= start:
        return None
    try:
        data = json.loads(log[start:end+1])
        mi = data.get("input_i"); mlra = data.get("input_lra"); mtp = data.get("input_tp")
        mth = data.get("input_thresh"); off = data.get("target_offset", data.get("offset"))
        if None in (mi, mlra, mtp, mth, off):
            return None
        return (
            "loudnorm=I=-16:LRA=11:TP=-1.5:"
            f"measured_I={mi}:measured_LRA={mlra}:measured_TP={mtp}:"
            f"measured_thresh={mth}:offset={off}:linear=true:print_format=none"
        )
    except Exception:
        return None


def _ass_probe(subtitles_filter: str) -> None:
    """
    Minimal ASS parse probe (≤0.1s). Ensures filter arguments are valid.
    """
    cmd = [
        "ffmpeg", "-hide_banner", "-loglevel", "error", "-nostdin",
        "-f", "lavfi", "-i", "color=c=black:s=16x16:d=0.1",
        "-vf", subtitles_filter,
        "-f", "null", "-"
    ]
    cp = _run(cmd, check=False)  # type: ignore[name-defined]
    if cp.returncode != 0:
        err = (cp.stderr or "").strip()
        raise RuntimeError(
            "WHAT: ASS probe\n"
            f"WHY: ffmpeg subtitles filter failed to parse ({err or 'unknown error'})\n"
            "HOW_TO_FIX: Verify ASS path, fontsdir, and that ffmpeg is built with libass."
        )


# ---------------------------- video filtergraph ----------------------------

def _build_video_filters_main(
    bg_w: int,
    bg_h: int,
    render_w: int,
    render_h: int,
    do_subscale: bool,
    subtitles_filter: str,
    # inert CTA/outro params kept for API stability
    main_d: float,
    outro_d: float,
    cta_times: Optional[List[float]],
    cta_d: float,
    cta_key_hex: str,
    cta_similarity: float,
    cta_blend: float,
    cta_position: str,
) -> Tuple[List[str], str]:
    """
    Main-track filtergraph:
      1) Fit background to 1920x1080 (pillarbox/letterbox), setsar=1
      2) Optional downscale to render_w x render_h, burn ASS, upscale back to 1080p
      3) Output label is 'vfinal'
    """
    vf: List[str] = []
    cur = "vb"

    if int(bg_w) == 1920 and int(bg_h) == 1080:
        vf.append("[0:v]setsar=1[vb]")
    else:
        vf.append(
            "[0:v]scale=1920:1080:force_original_aspect_ratio=decrease,"
            "pad=1920:1080:-1:-1:color=black,setsar=1[vb]"
        )

    if do_subscale:
        vf.append(f"[{cur}]scale={int(render_w)}:{int(render_h)}:flags=lanczos[vbs]"); cur = "vbs"
        vf.append(f"[{cur}]{subtitles_filter}[vss]"); cur = "vss"
        vf.append(f"[{cur}]scale=1920:1080:flags=lanczos,format=yuv420p[vfinal]")
    else:
        vf.append(f"[{cur}]{subtitles_filter},format=yuv420p[vfinal]")

    return vf, "vfinal"


# ---------------------------- main build ----------------------------

def build_main_track_cached(
    bg: Path,
    ass_file: Path,
    audio_norm: Path,
    intro_delay: float,
    total_d: float,
    fps: int,
    subscale: float,
) -> Path:
    """
    Assemble main track with ASS burn-in and NVENC→x264 fallback.
    Deterministic cache key uses inputs' content/identity and render parameters.
    """
    _preflight_requirements()

    if not bg or not Path(bg).exists():
        raise FileNotFoundError(f"Background not found: {bg}")
    if not ass_file or not Path(ass_file).exists():
        raise FileNotFoundError(f"ASS file not found: {ass_file}")
    if total_d <= 0:
        raise ValueError("total_d must be > 0")

    # SCA clamp subscale
    subscale = float(max(0.70, min(1.00, subscale)))

    fps_i = int(max(1, fps))
    # Use shared helper directly (no duplicate wrapper)
    rw, rh, do_sub = effective_render_size(1920, 1080, subscale)  # type: ignore[name-defined]

    bw, bh = probe_image_wh(bg)  # type: ignore[name-defined]
    if bw <= 0 or bh <= 0:
        # Fallback to pipeline base; ffprobe sometimes fails on odd stills
        bw, bh = 1920, 1080

    # Cache key
    bg_mt, bg_sz = _path_sig(bg)
    aud_sha = sha1_file(audio_norm) if (audio_norm and Path(audio_norm).exists()) else None  # type: ignore[name-defined]
    key = {
        "v": 22,
        "bg": str(bg), "bg_m": bg_mt, "bg_s": bg_sz,
        "ass": sha1_file(ass_file),  # type: ignore[name-defined]
        "aud": aud_sha,
        "aud_has": has_audio_stream_cached(audio_norm),
        "fps": fps_i,
        "rw": int(rw), "rh": int(rh), "subscale": float(subscale), "do_sub": bool(do_sub),
        "bw": int(bw), "bh": int(bh),
        "intro_ms": int(round(intro_delay * 1000)),
        "total_ms": int(round(total_d * 1000)),
    }
    out = WORK_CACHE / f"main_{hashlib.sha1(json.dumps(key, sort_keys=True).encode()).hexdigest()}.mp4"  # type: ignore[name-defined]
    if out.exists() and out.stat().st_size > 0:
        return out

    # Inputs
    inputs: List[str] = [
        "-thread_queue_size", "512",
        "-f", "image2", "-loop", "1",
        "-framerate", str(fps_i),
        "-i", str(bg),
    ]
    if has_audio_stream_cached(audio_norm):
        inputs += ["-thread_queue_size", "512", "-i", str(audio_norm)]
        audio_label = "[1:a]"
    else:
        inputs += ["-f", "lavfi", "-i", "anullsrc=r=48000:cl=stereo"]
        audio_label = "[1:a]"

    # Fonts and subtitles filter
    ass_esc = ffesc(str(ass_file))  # type: ignore[name-defined]
    font_dirs: List[Path] = _existing_dirs([Path("~/.fonts").expanduser()])
    try:
        ar = ASSETS_ROOT if isinstance(ASSETS_ROOT, Path) else Path(ASSETS_ROOT)  # type: ignore[name-defined]
        font_dirs += _existing_dirs([ar])
    except Exception:
        pass

    force_font = "Inter 18pt ExtraBold"
    force_outline = 4
    force_style = ffesc(f"Fontname={force_font},Outline={int(force_outline)},Shadow=0,BorderStyle=3")  # type: ignore[name-defined]

    if font_dirs:
        fontsdirs_arg = ffesc(str(font_dirs[0]))  # type: ignore[name-defined]
        for extra in font_dirs[1:]:
            fontsdirs_arg += "|" + ffesc(str(extra))  # type: ignore[name-defined]
        sub_filter = (
            f"subtitles=filename='{ass_esc}':fontsdir='{fontsdirs_arg}':charenc=UTF-8:force_style='{force_style}'"
        )
    else:
        sub_filter = f"subtitles=filename='{ass_esc}':charenc=UTF-8:force_style='{force_style}'"

    # ASS probe (parse-only; fast)
    _ass_probe(sub_filter)

    vf_parts, v_final_label = _build_video_filters_main(
        int(bw), int(bh),
        int(rw), int(rh),
        bool(do_sub),
        sub_filter,
        main_d=float(max(0.0, total_d - intro_delay)),
        outro_d=0.0,
        cta_times=None,
        cta_d=0.0,
        cta_key_hex="0x00FF00",
        cta_similarity=0.42,
        cta_blend=0.08,
        cta_position="Middle",
    )

    # Audio processing chain (SCA: EBU R128 order with deterministic downgrades)
    dms = max(0, int(round(intro_delay * 1000)))
    af: List[str] = [
        f"{audio_label}asetpts=PTS-STARTPTS,aresample=48000:async=1[min_a]",
        f"[min_a]adelay={dms}|{dms}:all=1[aud_d]",
    ]
    ln2 = _compute_loudnorm_params(audio_norm) if has_audio_stream_cached(audio_norm) else None
    if ln2:
        af.append(f"[aud_d]{ln2}[aud_n]")
    elif _ff_has_filter("loudnorm"):
        af.append("[aud_d]loudnorm=I=-16:LRA=11:TP=-1.5[aud_n]")
    elif _ff_has_filter("dynaudnorm"):
        af.append("[aud_d]dynaudnorm=f=150:g=15[aud_n]")
    else:
        af.append("[aud_d]volume=1.0[aud_n]")
    if _ff_has_filter("alimiter"):
        af.append("[aud_n]alimiter=limit=0.95[afinal]")
    else:
        af.append("[aud_n]volume=0.98[afinal]")

    filter_complex = ";".join(vf_parts + af)

    # Common encoder flags
    common = [
        "-threads", "0",
        "-filter_threads", "2",
        "-filter_complex_threads", "2",
        "-sws_flags", "lanczos+accurate_rnd+full_chroma_int",
    ]
    v_color = [
        "-colorspace", "bt709",
        "-color_primaries", "bt709",
        "-color_trc", "bt709",
        "-color_range", "tv",
    ]
    v_nvenc = [
        "-c:v", "h264_nvenc",
        "-preset", "p4", "-tune", "hq",
        "-rc", "vbr", "-cq", "19",
        "-b:v", "6M", "-maxrate", "8M", "-bufsize", "12M",
        "-g", str(fps_i * 2), "-bf", "2",
        "-profile:v", "high",
        "-pix_fmt", "yuv420p",
        "-movflags", "+faststart",
        *v_color,
    ]
    v_x264 = [
        "-c:v", "libx264",
        "-preset", "fast", "-crf", "21",
        "-g", str(fps_i * 2), "-bf", "2",
        "-profile:v", "high",
        "-pix_fmt", "yuv420p",
        "-movflags", "+faststart",
        *v_color,
    ]

    tmp = out.with_suffix(".tmp.mp4")
    try:
        if tmp.exists():
            tmp.unlink()
    except Exception:
        pass

    def _encode(vargs: List[str]) -> None:
        """Run the ffmpeg encode with the provided video args."""
        cmd = [
            "ffmpeg", "-hide_banner", "-nostdin", "-y",
            "-loglevel", "error",
            *common, *inputs,
            "-filter_complex", filter_complex,
            "-map", f"[{v_final_label}]", "-map", "[afinal]",
            "-t", str(float(total_d)),
            "-r", str(fps_i),
            *vargs,
            "-c:a", "aac", "-b:a", "160k", "-ac", "2",
            "-map_metadata", "-1", "-map_chapters", "-1",
            "-max_muxing_queue_size", "1024",
            str(tmp),
        ]
        _run(cmd)  # type: ignore[name-defined]

    used_nv = False
    try:
        if NVENC_AVAILABLE:  # type: ignore[name-defined]
            _encode(v_nvenc); used_nv = True
        else:
            _encode(v_x264)
    except Exception:
        if used_nv:
            _encode(v_x264)
        else:
            raise

    if not tmp.exists() or tmp.stat().st_size <= 0:
        raise RuntimeError("Main track encode failed. HOW_TO_FIX: verify NVENC availability, or disable it and ensure libx264 + ffmpeg are installed.")
    tmp.replace(out)
    return out


In [8]:
# @title 8) Asset Staging & Discovery — Hidden (HARDLOCK v3 • SCA)
# Case-insensitive discovery and staging from Assets folder (atomic copy, safe fallbacks).

from __future__ import annotations

import os
import shutil
import unicodedata
from functools import lru_cache
from pathlib import Path
from typing import Any, Iterable, List, Optional, Tuple

# =========================
# Constants and categories
# =========================

AUDIO_EXTS = {".mp3", ".wav", ".m4a", ".aac", ".flac", ".ogg"}
BG_EXTS    = {".png", ".jpg", ".jpeg", ".webp"}
MP4_EXTS   = {".mp4"}
FONT_EXTS  = {".ttf", ".otf", ".ttc", ".otc"}


# =========================
# Prefight (no heavy I/O)
# =========================

def _preflight_assets() -> Path:
    """
    Assert required shared globals exist and WORK_ASSETS is writable.
    No heavy I/O; creates and deletes a tiny temp file to confirm writability.
    """
    missing: List[str] = []
    for name in ("WORK_ASSETS", "_atomic_copy"):
        if name not in globals():
            missing.append(name)
    if missing:
        raise RuntimeError(
            "WHAT: Missing shared dependencies for Asset Staging\n"
            f"WHY: Globals not defined: {', '.join(missing)}\n"
            "HOW_TO_FIX: Run the Roots & Cache cell (defines WORK_ASSETS) and the Font Manager cell (defines _atomic_copy)."
        )

    wa_obj = globals()["WORK_ASSETS"]
    work_assets = wa_obj if isinstance(wa_obj, Path) else Path(wa_obj)
    try:
        work_assets = work_assets.expanduser().resolve()
    except Exception:
        work_assets = Path(wa_obj)  # type: ignore[arg-type]
    work_assets.mkdir(parents=True, exist_ok=True)

    # Writable check (tiny temp)
    probe = work_assets / ".w_probe.tmp"
    try:
        probe.write_bytes(b"")
        probe.unlink(missing_ok=True)  # type: ignore[call-arg]
    except Exception as e:
        raise RuntimeError(
            "WHAT: WORK_ASSETS not writable\n"
            f"WHY: {e}\n"
            "HOW_TO_FIX: Change WORK_ASSETS to a writable directory or fix filesystem permissions."
        )
    return work_assets


# =========================
# Unicode & path utilities
# =========================

def _norm_casefold(s: str) -> str:
    """Case-insensitive, NFC-normalized key for robust filename matching."""
    return unicodedata.normalize("NFC", s).casefold()


def _path_signature(path: Path) -> Tuple[int, int]:
    """Stable signature (mtime, size) for quick equality checks; follows symlinks."""
    st = path.stat()
    return int(st.st_mtime), int(st.st_size)


def _same_enough(src: Path, dst: Path) -> bool:
    """True if dst exists and matches src by (mtime, size). Any error -> False."""
    try:
        return dst.exists() and _path_signature(src) == _path_signature(dst)
    except Exception:
        return False


def _existing_dir(p: Optional[Path]) -> Optional[Path]:
    """Return resolved directory path if it exists; else None."""
    if not p:
        return None
    try:
        q = Path(p).expanduser().resolve()
        return q if q.exists() and q.is_dir() else None
    except Exception:
        return None


def _add_if_dir(paths: List[Path], candidate: Optional[Path]) -> None:
    """Append candidate if it exists and is a directory (order-preserving, no duplicates)."""
    q = _existing_dir(candidate)
    if q and q not in paths:
        paths.append(q)


# =========================
# Environment resolution
# =========================

def _ensure_work_assets() -> Path:
    """Resolve and ensure WORK_ASSETS directory exists or raise RuntimeError."""
    wa = _preflight_assets()  # also asserts _atomic_copy presence
    return wa


@lru_cache(maxsize=8)
def _bases_from_inputs(user_bases_key: Optional[Tuple[str, ...]] = None) -> List[Path]:
    """
    Collect base asset folders from:
      - WORK_ASSETS
      - user_bases (if provided)
      - DRIVE_ASSETS_FOLDERS() (if available)
      - ASSETS_ROOT
    Keep only existing, unique, resolved paths (order-preserving).
    Memoized for stability across repeated lookups.
    """
    bases: List[Path] = []
    # WORK_ASSETS first
    try:
        _add_if_dir(bases, _ensure_work_assets())
    except Exception:
        pass

    # User-provided list or dynamic drive folders
    if user_bases_key is not None:
        for b in user_bases_key:
            _add_if_dir(bases, Path(b))
    else:
        try:
            for b in DRIVE_ASSETS_FOLDERS():  # type: ignore[name-defined]
                _add_if_dir(bases, b)
        except Exception:
            pass

    # ASSETS_ROOT fallback
    try:
        root = ASSETS_ROOT if isinstance(ASSETS_ROOT, Path) else Path(ASSETS_ROOT)  # type: ignore[name-defined]
        _add_if_dir(bases, root)
    except Exception:
        pass

    # De-dup by resolved path while preserving order
    seen = set()
    uniq: List[Path] = []
    for p in bases:
        try:
            rp = p.resolve()
        except Exception:
            rp = p
        if rp not in seen:
            uniq.append(rp)
            seen.add(rp)
    return uniq


def _bases_from_list(user_bases: Optional[List[Path]]) -> List[Path]:
    """Wrapper to keep cached key deterministic from a list of paths."""
    if user_bases is None:
        return _bases_from_inputs(None)
    key = tuple(str(Path(b)) for b in user_bases)
    return _bases_from_inputs(key)


# =========================
# Discovery helpers
# =========================

def _iter_top_files(base: Path) -> Iterable[Path]:
    """Efficient top-level file iterator using os.scandir (no recursion)."""
    try:
        with os.scandir(base) as it:
            for de in it:
                if de.is_file():
                    yield Path(de.path)
    except Exception:
        return


def _find_in_candidates(name: str, bases: List[Path]) -> Optional[Path]:
    """
    Case-insensitive filename lookup across 'bases' (top-level only).
    Try exact filename first; then normalized match via scandir.
    """
    if not name:
        return None
    target_name = Path(str(name)).name

    # Fast path: exact
    for base in bases:
        try:
            p = base / target_name
            if p.exists() and p.is_file():
                return p
        except Exception:
            continue

    # Slow path: normalized compare
    key = _norm_casefold(target_name)
    for base in bases:
        for f in _iter_top_files(base):
            if _norm_casefold(f.name) == key:
                return f
    return None


def _copy_to_work(src: Path) -> Path:
    """
    Ensure 'src' exists in WORK_ASSETS with atomic copy; skip if already identical.
    Returns destination path.
    """
    dst = _ensure_work_assets() / src.name
    if not _same_enough(src, dst):
        # Use shared atomic copy from Font Manager cell; do not re-declare here.
        _atomic_copy(src, dst)  # type: ignore[name-defined]
    return dst


def _list_media_from(base: Path) -> Tuple[List[str], List[str], List[str], List[str]]:
    """
    Collect file names (top-level only) for UI lists.
    Sorted case-insensitively. Returns (audio, backgrounds, mp4s, fonts).
    """
    if not base or not base.exists() or not base.is_dir():
        return [], [], [], []

    audio: List[str] = []
    backgrounds: List[str] = []
    mp4s: List[str] = []
    fonts: List[str] = []

    for f in _iter_top_files(base):
        ext = f.suffix.lower()
        name = f.name
        if ext in AUDIO_EXTS:
            audio.append(name)
        elif ext in BG_EXTS:
            backgrounds.append(name)
        elif ext in MP4_EXTS:
            mp4s.append(name)
        elif ext in FONT_EXTS:
            fonts.append(name)

    key = _norm_casefold
    return (
        sorted(audio, key=key),
        sorted(backgrounds, key=key),
        sorted(mp4s, key=key),
        sorted(fonts, key=key),
    )


def _uniq_casefold(items: List[str]) -> List[str]:
    """Case-insensitive de-duplication, preserving first occurrence."""
    seen = set()
    out: List[str] = []
    for s in items:
        k = _norm_casefold(s)
        if k not in seen:
            out.append(s)
            seen.add(k)
    return out


# =========================
# Public API (unchanged)
# =========================

def stage_by_name(name: Optional[str], drive_asset_folders: Optional[List[Path]] = None) -> Optional[Path]:
    """
    Locate a filename across asset bases (case-insensitive, top-level only),
    then stage it into WORK_ASSETS if the file resides elsewhere.
    Returns staged Path or None if not found.
    """
    if not name or str(name).strip().lower() == "none":
        return None
    bases = _bases_from_list(drive_asset_folders)
    candidate = _find_in_candidates(name, bases)
    if not candidate:
        return None
    try:
        wa = _ensure_work_assets()
        if candidate.is_file() and candidate.parent.resolve() != wa.resolve():
            return _copy_to_work(candidate)
    except Exception:
        return candidate
    return candidate


def stage_by_upload(upload_obj: Any) -> Optional[Path]:
    """
    Stage an uploaded file to WORK_ASSETS.
    Supports Gradio-like objects (name/orig_name) or raw paths.
    Returns staged Path in WORK_ASSETS or None if unresolved.
    """
    if upload_obj is None:
        return None
    src_name = (
        getattr(upload_obj, "name", None)
        or getattr(upload_obj, "orig_name", None)
        or getattr(upload_obj, "path", None)
        or str(upload_obj)
    )
    try:
        p = Path(src_name)
        if p.exists() and p.is_file():
            return _copy_to_work(p)
    except Exception:
        pass
    return None


def discover_assets_union() -> Tuple[List[str], List[str], List[str], List[str]]:
    """
    Union of assets from ASSETS_ROOT and WORK_ASSETS (top-level only).
    Case-insensitive unique and sorted outputs for UI dropdowns.
    Returns (audio_names, background_names, mp4_names, font_names).
    """
    # Ensure environment is ready; do not fail on ASSETS_ROOT absence.
    try:
        _ensure_work_assets()
    except Exception:
        pass

    bases: List[Path] = []
    # ASSETS_ROOT
    try:
        root = ASSETS_ROOT if isinstance(ASSETS_ROOT, Path) else Path(ASSETS_ROOT)  # type: ignore[name-defined]
        q = _existing_dir(root)
        if q:
            bases.append(q)
    except Exception:
        pass
    # WORK_ASSETS
    try:
        bases.append(_ensure_work_assets())
    except Exception:
        pass

    aud_all: List[str] = []
    bg_all: List[str] = []
    mp4_all: List[str] = []
    font_all: List[str] = []

    for b in bases:
        a, bg, m, f = _list_media_from(b)
        aud_all.extend(a); bg_all.extend(bg); mp4_all.extend(m); font_all.extend(f)

    key = _norm_casefold
    return (
        sorted(_uniq_casefold(aud_all), key=key),
        sorted(_uniq_casefold(bg_all),  key=key),
        sorted(_uniq_casefold(mp4_all), key=key),
        sorted(_uniq_casefold(font_all),key=key),
    )


In [9]:
# @title 9) Audio Normalize + Progress UI — Hidden (HARDLOCK v3 • SCA)
# Loudness-safe normalize (fast 1-pass or true 2-pass), robust stream checks, and live progress widgets.

from __future__ import annotations

import html
import json
import re
import time
import shutil
from functools import lru_cache
from pathlib import Path
from typing import List, Optional, Tuple

# Expected from the environment (asserted at runtime by _preflight_audio_env):
# _run(cmd: List[str], check: bool = True, capture: bool = True) -> CompletedProcess-like
# WORK_CACHE: Path
# sha1_file(path: Path) -> str

# =========================
# Constants & precompiled
# =========================

FF_QUIET_ARGS: List[str] = ["-hide_banner", "-nostdin", "-loglevel", "error"]

LN_TARGET_I = -16.0   # Integrated loudness
LN_TARGET_LRA = 11.0  # Loudness range
LN_TARGET_TP = -1.5   # True peak (dBTP)

# ffmpeg prints the pass-1 stats as JSON; capture the first JSON object robustly
_LN_JSON_RX = re.compile(r"\{\s*\"input_i\".*?\}\s*", re.DOTALL)


# =========================
# Preflight & probes (bounded, no side effects on success)
# =========================

def _triad_err(what: str, why: str, how: str) -> RuntimeError:
    return RuntimeError(f"WHAT: {what}\nWHY: {why}\nHOW_TO_FIX: {how}")

def _require_globals() -> None:
    missing = []
    for name in ("_run", "WORK_CACHE", "sha1_file"):
        if name not in globals():
            missing.append(name)
    if missing:
        raise _triad_err(
            "Missing globals",
            f"Required names not defined: {', '.join(missing)}",
            "Run the Roots/Cache & Core Utils cells that define these globals before this cell."
        )
    # basic type sanity
    if not isinstance(globals()["WORK_CACHE"], Path):
        raise _triad_err(
            "WORK_CACHE type",
            f"Expected pathlib.Path, got {type(globals()['WORK_CACHE']).__name__}",
            "Ensure WORK_CACHE is created as a Path in the Roots & Cache cell."
        )

def _require_tools() -> None:
    for binname in ("ffmpeg", "ffprobe"):
        if shutil.which(binname) is None:
            raise _triad_err(
                f"{binname} not found",
                f"{binname} is not on PATH",
                f"Install {binname} and ensure it's on PATH (e.g., apt-get install ffmpeg) before running this cell."
            )

def _writable_cache() -> None:
    wc: Path = globals()["WORK_CACHE"]
    wc.mkdir(parents=True, exist_ok=True)
    probe = wc / ".probe_write.tmp"
    try:
        probe.write_bytes(b"")
        probe.unlink()
    except Exception as e:
        raise _triad_err(
            "WORK_CACHE not writable",
            str(e),
            f"Fix filesystem permissions or set WORK_CACHE to a writable directory (current: {wc})."
        )

def _probe_nvenc_quick() -> bool:
    """64x64 color → h264_nvenc → null; returns availability, never raises."""
    try:
        r = _run([  # type: ignore[name-defined]
            "ffmpeg", *FF_QUIET_ARGS, "-t", "0.10",
            "-f", "lavfi", "-i", "color=black:s=64x64:r=24",
            "-c:v", "h264_nvenc", "-f", "null", "-"
        ], check=False)
        return r.returncode == 0
    except Exception:
        return False

def _probe_x264_quick() -> None:
    """64x64 color → libx264 → null; must succeed."""
    r = _run([  # type: ignore[name-defined]
        "ffmpeg", *FF_QUIET_ARGS, "-t", "0.10",
        "-f", "lavfi", "-i", "color=black:s=64x64:r=24",
        "-c:v", "libx264", "-g", "48", "-bf", "2", "-pix_fmt", "yuv420p",
        "-f", "null", "-"
    ], check=False)
    if r.returncode != 0:
        err = (r.stderr or "").strip()
        raise _triad_err(
            "x264 probe failed",
            err or "libx264 encode failed",
            "Install ffmpeg with libx264 enabled, or use a build that includes x264."
        )

def _probe_ass_quick() -> None:
    """Minimal ASS parse via subtitles filter; must parse."""
    wc: Path = globals()["WORK_CACHE"]
    tmp_ass = wc / ".probe.ass"
    tmp_ass.write_text(
        "[Script Info]\nPlayResX:64\nPlayResY:64\nWrapStyle:2\nScaledBorderAndShadow:yes\nYCbCr Matrix: TV.709\n\n"
        "[V4+ Styles]\n"
        "Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, "
        "Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, "
        "Alignment, MarginL, MarginR, MarginV, Encoding\n"
        "Style: Default,Arial,16,&H00FFFFFF,&H00FFFFFF,&H00000000,&H00000000,0,0,0,0,100,100,0,0,1,1,0,7,2,2,2,1\n\n"
        "[Events]\nFormat: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n"
        "Dialogue: 0,0:00:00.00,0:00:00.10,Default,,0,0,0,,hello\\Nworld\n",
        encoding="utf-8"
    )
    try:
        r = _run([  # type: ignore[name-defined]
            "ffmpeg", *FF_QUIET_ARGS, "-t", "0.10",
            "-f", "lavfi", "-i", "color=black:s=64x64:r=24",
            "-vf", f"subtitles={str(tmp_ass)}:charenc=UTF-8",
            "-f", "null", "-"
        ], check=False)
        if r.returncode != 0:
            err = (r.stderr or "").strip()
            raise _triad_err(
                "ASS probe failed",
                err or "subtitles filter failed to initialize",
                "Ensure libass is enabled in your ffmpeg build and the ASS file is valid UTF-8."
            )
    finally:
        try:
            tmp_ass.unlink()
        except Exception:
            pass

def _has_filter(name: str) -> bool:
    """
    Check if ffmpeg has a given filter.
    Prefer a project-global helper named `_ff_has_filter` if present to avoid duplication.
    """
    try:
        gf = globals().get("_ff_has_filter")  # prefer previously defined shared helper
        if callable(gf):
            return bool(gf(name))
    except Exception:
        pass
    try:
        out = _run(["ffmpeg", "-hide_banner", "-filters"], check=False).stdout or ""  # type: ignore[name-defined]
        name = f" {name.strip()} "
        return name in out
    except Exception:
        return False

def _probe_audio_chain(fast: bool) -> None:
    """0.1s anullsrc through planned filter chain; must succeed or deterministically downgrade."""
    if fast:
        a_filter = "aresample=48000:async=1,aformat=channel_layouts=stereo"
    else:
        # Prefer loudnorm if present; otherwise dynaudnorm; otherwise volume
        if _has_filter("loudnorm"):
            a_filter = f"loudnorm=I={LN_TARGET_I}:LRA={LN_TARGET_LRA}:TP={LN_TARGET_TP},aresample=48000:async=1,aformat=channel_layouts=stereo"
        elif _has_filter("dynaudnorm"):
            a_filter = "dynaudnorm=f=150:g=15,aresample=48000:async=1,aformat=channel_layouts=stereo"
        else:
            a_filter = "volume=0.98,aresample=48000:async=1,aformat=channel_layouts=stereo"
    r = _run([  # type: ignore[name-defined]
        "ffmpeg", *FF_QUIET_ARGS, "-t", "0.10",
        "-f", "lavfi", "-i", "anullsrc=r=48000:cl=stereo",
        "-af", a_filter,
        "-f", "null", "-"
    ], check=False)
    if r.returncode != 0:
        err = (r.stderr or "").strip()
        raise _triad_err(
            "Audio probe failed",
            err or "filter chain failed",
            "Use a recent ffmpeg build with loudnorm/dynaudnorm or switch to fast=True to bypass loudness processing."
        )

def _preflight_audio_env(fast: bool) -> None:
    """Run zero/heavy-min preflight + bounded probes (≤ ~3s each)."""
    _require_globals()
    _require_tools()
    _writable_cache()
    # Probes (bounded, safe)
    _probe_x264_quick()          # must pass
    _probe_nvenc_quick()         # availability only; ignore result
    _probe_ass_quick()           # must pass
    _probe_audio_chain(fast)     # must pass for chosen path


# =========================
# FFmpeg / ffprobe helpers
# =========================

def _ff_args_common() -> List[str]:
    """Deterministic, quiet ffmpeg args."""
    return list(FF_QUIET_ARGS)

def _ffprobe_has_audio(src: Path) -> bool:
    """True if there is at least one audio stream; quiet on failure."""
    try:
        res = _run(  # type: ignore[name-defined]
            ["ffprobe", "-v", "error", "-select_streams", "a:0",
             "-show_entries", "stream=index", "-of", "compact=p=0:nk=1", str(src)],
            check=False,
        )
        return bool((res.stdout or "").strip())
    except Exception:
        return False

def _ffprobe_audio_format(src: Path) -> Tuple[int, int, str]:
    """Return (sample_rate, channels, codec_name) or (0, 0, "") on failure."""
    try:
        res = _run(  # type: ignore[name-defined]
            ["ffprobe", "-v", "error", "-select_streams", "a:0",
             "-show_entries", "stream=sample_rate,channels,codec_name", "-of", "json", str(src)],
            check=False,
        )
        data = json.loads(res.stdout or "{}")
        st = (data.get("streams") or [{}])[0]
        sr = int(st.get("sample_rate") or 0)
        ch = int(st.get("channels") or 0)
        cc = str(st.get("codec_name") or "")
        return sr, ch, cc
    except Exception:
        return 0, 0, ""

# =========================
# Loudnorm 2-pass helpers
# =========================

def _loudnorm_measure(src: Path) -> Optional[dict]:
    """
    Pass-1: measure loudness using loudnorm with print_format=json.
    Returns JSON dict or None on failure or when loudnorm is missing.
    """
    if not _has_filter("loudnorm"):
        return None
    cmd = [
        "ffmpeg", *_ff_args_common(), "-i", str(src),
        "-vn", "-sn", "-dn", "-map", "a:0",
        "-af", f"loudnorm=I={LN_TARGET_I}:LRA={LN_TARGET_LRA}:TP={LN_TARGET_TP}:print_format=json",
        "-f", "null", "-"
    ]
    r = _run(cmd, check=False)  # type: ignore[name-defined]
    blob = ((r.stderr or "") + "\n" + (r.stdout or ""))
    m = _LN_JSON_RX.search(blob)
    if not m:
        return None
    try:
        return json.loads(m.group(0))
    except Exception:
        return None

def _loudnorm_filter_from_measure(meas: dict) -> str:
    """
    Build pass-2 loudnorm filter using measured values.
    Missing fields fall back to 0.0 which loudnorm accepts.
    """
    def gv(key: str, default: float = 0.0) -> float:
        try:
            return float(meas.get(key, default))
        except Exception:
            return float(default)

    return (
        "loudnorm="
        f"I={LN_TARGET_I}:LRA={LN_TARGET_LRA}:TP={LN_TARGET_TP}:"
        f"measured_I={gv('input_i')}:measured_LRA={gv('input_lra')}:"
        f"measured_TP={gv('input_tp')}:measured_thresh={gv('input_thresh')}:"
        f"offset={gv('target_offset')}:linear=true:print_format=summary"
    )


# =========================
# Encoding strategies
# =========================

def _should_remux_fast(sr: int, ch: int, codec: str) -> bool:
    """True if we can stream-copy AAC stereo 48 kHz."""
    return sr == 48000 and ch == 2 and codec.lower() in {"aac", "aac_latm"}

def _remux_copy(src: Path, tmp_out: Path) -> bool:
    """Attempt AAC stream copy. Return True on success."""
    cmd = [
        "ffmpeg", *_ff_args_common(), "-y",
        "-i", str(src), "-vn", "-sn", "-dn", "-map", "a:0",
        "-c:a", "copy",
        "-movflags", "+faststart",
        str(tmp_out),
    ]
    r = _run(cmd, check=False)  # type: ignore[name-defined]
    return r.returncode == 0 and tmp_out.exists() and tmp_out.stat().st_size > 0

def _encode_fast_resample(src: Path, tmp_out: Path) -> None:
    """Fast one-pass path: enforce 48 kHz stereo without changing loudness."""
    a_filter = "aresample=48000:async=1,aformat=channel_layouts=stereo"
    cmd = [
        "ffmpeg", *_ff_args_common(), "-y",
        "-i", str(src),
        "-vn", "-sn", "-dn", "-map", "a:0",
        "-af", a_filter, "-ac", "2",
        "-c:a", "aac", "-b:a", "160k",
        "-movflags", "+faststart",
        "-map_metadata", "-1", "-map_chapters", "-1",
        str(tmp_out),
    ]
    r = _run(cmd, check=False)  # type: ignore[name-defined]
    if r.returncode != 0 or (not tmp_out.exists()) or tmp_out.stat().st_size <= 0:
        err = (r.stderr or "").strip()
        raise _triad_err(
            "Audio normalize (fast)",
            err or "ffmpeg returned non-zero status",
            "Use a recent ffmpeg with AAC encoder; verify input has a valid a:0 stream; try fast=False if persistent."
        )

def _encode_loudnorm_two_pass(src: Path, tmp_out: Path) -> None:
    """
    Accurate path: EBU R128 loudnorm then 48 kHz stereo.
    If loudnorm is unavailable, fall back to dynaudnorm or conservative volume.
    """
    if _has_filter("loudnorm"):
        meas = _loudnorm_measure(src)
        ln2 = (
            _loudnorm_filter_from_measure(meas)
            if meas
            else f"loudnorm=I={LN_TARGET_I}:LRA={LN_TARGET_LRA}:TP={LN_TARGET_TP}:print_format=summary"
        )
    elif _has_filter("dynaudnorm"):
        ln2 = "dynaudnorm=f=150:g=15"
    else:
        ln2 = "volume=0.98"

    a_filter = f"{ln2},aresample=48000:async=1,aformat=channel_layouts=stereo"
    cmd = [
        "ffmpeg", *_ff_args_common(), "-y",
        "-i", str(src),
        "-vn", "-sn", "-dn", "-map", "a:0",
        "-af", a_filter, "-ac", "2",
        "-c:a", "aac", "-b:a", "160k",
        "-movflags", "+faststart",
        "-map_metadata", "-1", "-map_chapters", "-1",
        str(tmp_out),
    ]
    r = _run(cmd, check=False)  # type: ignore[name-defined]
    if r.returncode != 0 or (not tmp_out.exists()) or tmp_out.stat().st_size <= 0:
        err = (r.stderr or "").strip()
        raise _triad_err(
            "Audio normalize (2-pass)",
            err or "ffmpeg returned non-zero status",
            "Ensure loudnorm/dynaudnorm filters are available, or run with fast=True to bypass loudness correction."
        )


# =========================
# Public API (cached, API unchanged)
# =========================

def normalize_audio_cached(src: Path, fast: bool = True) -> Path:
    """
    Normalize `src` into AAC stereo 48 kHz .m4a with cache by SHA1.
      - fast=True  → 1-pass: resample + stereo mix only (no loudness change)
      - fast=False → loudness normalization (loudnorm/dynaudnorm fallback) + resample + stereo

    Returns:
        Path to normalized .m4a file in WORK_CACHE.

    Raises:
        FileNotFoundError if src is missing
        RuntimeError if no audio stream or encode fails
    """
    _preflight_audio_env(bool(fast))  # light checks + bounded probes

    if not src or not Path(src).exists():
        raise FileNotFoundError(f"Audio source not found: {src}")
    if not _ffprobe_has_audio(src):
        raise _triad_err(
            "Missing audio stream",
            f"No a:0 stream in {src}",
            "Provide a media file with an audio track; verify with ffprobe."
        )

    ah = sha1_file(src)  # type: ignore[name-defined]
    out = WORK_CACHE / (f"norm_{ah}.m4a" if fast else f"normln_{ah}.m4a")  # type: ignore[name-defined]
    if out.exists() and out.stat().st_size > 0:
        return out

    tmp = out.with_suffix(".tmp.m4a")
    try:
        if tmp.exists():
            tmp.unlink()
    except Exception:
        pass

    sr, ch, codec = _ffprobe_audio_format(src)
    if fast and _should_remux_fast(sr, ch, codec):
        if _remux_copy(src, tmp):
            tmp.replace(out)
            return out
        # Fall through to re-encode if copy failed silently

    if fast:
        _encode_fast_resample(src, tmp)
    else:
        _encode_loudnorm_two_pass(src, tmp)

    tmp.replace(out)
    return out


# =========================
# Lightweight progress UI (unchanged API)
# =========================

def status_steps() -> List[Tuple[str, float]]:
    """
    Weighted pipeline steps for an overall progress estimate.
    Weights sum to ~1.0 and can be tuned without breaking callers.
    """
    return [
        ("Mount/Discover",        0.06),
        ("Fonts",                 0.02),
        ("Transcribe+Normalize",  0.18),
        ("Timing",                0.04),
        ("ASS",                   0.12),
        ("MainTrack",             0.30),
        ("Overlay+Mix",           0.12),
        ("Encode",                0.16),
        ("Cleanup",               0.01),
        ("Done",                  0.00),
    ]

STEPS: List[Tuple[str, float]] = status_steps()

def _clamp(x: float, lo: float, hi: float) -> float:
    """Clamp x to [lo, hi]."""
    return lo if x < lo else hi if x > hi else x

def _progress_fraction(cur_idx: int, hint: float) -> float:
    """Convert (step index, in-step hint 0..1) to a 0..1 overall fraction."""
    if not STEPS:
        return 1.0
    last = len(STEPS) - 1
    idx = min(int(cur_idx), last)
    done = sum(w for i, (_, w) in enumerate(STEPS) if i < idx)
    cur_w = STEPS[idx][1]
    frac = done + float(hint or 0.0) * cur_w
    if cur_idx >= len(STEPS):
        frac = 1.0
    return _clamp(frac, 0.0, 1.0)

def status_html(cur_idx: int, started: float, hint: Optional[float] = None, note: str = "") -> str:
    """
    Render a compact HTML status widget with steps, percent, elapsed and ETA.
    'hint' is 0..1 for progress within the current step.
    """
    now = time.monotonic()
    elapsed = max(0.0, now - float(started))
    frac = _progress_fraction(cur_idx, float(hint or 0.0))
    pct_cap = 100.0 if cur_idx >= len(STEPS) else 99.9  # avoid showing 100% early
    pct = _clamp(frac * 100.0, 0.0, pct_cap)

    eta = 0.0
    if 0.005 < frac < 0.999 and elapsed > 0.2:
        eta = elapsed * (1.0 / max(frac, 1e-3) - 1.0)

    rows = [
        f"<li>{('✅' if i < cur_idx else ('⏳' if i == cur_idx else '▫️'))} {html.escape(name)}</li>"
        for i, (name, _) in enumerate(STEPS)
    ]
    eta_str = f"~{int(eta // 60)}m {int(eta % 60)}s" if eta > 0 else "—"
    note_html = f"<div style='margin-top:6px;opacity:.85'><i>{html.escape(note)}</i></div>" if note else ""

    return (
        "<div style='font-family:system-ui,Segoe UI,Roboto,Arial,sans-serif;line-height:1.4'>"
        "<div><b>Pipeline</b></div>"
        f"<ul style='margin:6px 0 8px 18px;padding:0'>{''.join(rows)}</ul>"
        f"<div>Progress: <b>{pct:.1f}%</b> &nbsp;|&nbsp; Elapsed: "
        f"<b>{int(elapsed//60)}m {int(elapsed%60)}s</b> &nbsp;|&nbsp; ETA: <b>{eta_str}</b></div>{note_html}</div>"
    )

def _overall_percent(cur_idx: int, hint: float) -> float:
    """Return overall percent (0..100) with one decimal precision."""
    return round(_progress_fraction(cur_idx, float(hint or 0.0)) * 100.0, 1)


In [10]:
# @title 10) Gradio UI & Render — Clean & Aligned (Refactored) { display-mode: "form" }
# @markdown Minimal UI; options match ASS generator (Cell 6). NVENC→x264 fallback kept.

from __future__ import annotations

import datetime
import os
import shutil
import subprocess as sp
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import gradio as gr

# =========================
# Constants
# =========================

HL_FIXED_COLOR: str = "#19B5FF"

# =========================
# Asset index map (enables audio overlays for intro/outro/CTA)
# =========================

_ASSET_INDEX_MAP: Dict[int, Path] = {}

def _idx_to_asset(idx: int) -> Optional[Path]:
    """Resolve ffmpeg input index to a staged asset Path for audio-stream checks."""
    try:
        return _ASSET_INDEX_MAP.get(int(idx))
    except Exception:
        return None

# =========================
# Environment helpers
# =========================

def _safe_env() -> Dict[str, str]:
    """Return a copy of the environment from upstream _env() if present, else os.environ."""
    try:
        return _env()  # type: ignore[name-defined]
    except Exception:
        return dict(os.environ)


def _ensure_fonts_env(font_path: Optional[Path], fallback_dir: Optional[Path] = None) -> Tuple[Optional[str], Dict[str, str]]:
    """
    Ensure ASS_FONT_DIR points to the chosen font directory (font file parent or provided fallback).
    Returns (fontsdir string or None, effective environment dict).
    """
    env = _safe_env()
    fontsdir: Optional[str] = None
    try:
        if font_path and font_path.exists():
            fontsdir = str(font_path.parent.resolve())
        elif fallback_dir and fallback_dir.exists():
            fontsdir = str(fallback_dir.resolve())
        if fontsdir:
            env["ASS_FONT_DIR"] = fontsdir
    except Exception:
        pass
    return fontsdir, env


def _extract_internal_font_name(font_file: Optional[Path]) -> Optional[str]:
    """
    Attempt to extract a human-readable font name (Full name or Family) from a TTF/OTF/TTC.
    Returns None if fonttools is unavailable or parsing fails.
    """
    if not font_file or not font_file.exists():
        return None
    try:
        from fontTools.ttLib import TTFont  # type: ignore
    except Exception:
        return None
    try:
        tt = TTFont(str(font_file))
        full: Optional[str] = None
        fam: Optional[str] = None
        for n in tt["name"].names:
            try:
                val = n.toUnicode()
            except Exception:
                try:
                    val = n.string.decode("utf-16-be", errors="ignore")
                except Exception:
                    continue
            if n.nameID == 4 and not full:
                full = val
            if n.nameID == 1 and not fam:
                fam = val
        return full or fam
    except Exception:
        return None


def _safe_local_copy(final_path: Path) -> Path:
    """
    Copy the final artifact to a static 'web' folder for lightweight serving/preview.
    Returns the copied path if successful, otherwise the original.
    """
    web_dir = WORK_ROOT / "web"  # type: ignore[name-defined]
    web_dir.mkdir(parents=True, exist_ok=True)
    ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    local = web_dir / f"{ts}_output.mp4"
    try:
        if local.exists():
            local.unlink()
    except Exception:
        pass
    try:
        shutil.copy2(str(final_path), str(local))
    except Exception:
        sp.run(["/bin/cp", "-f", str(final_path), str(local)], check=False)
    return local if local.exists() and local.stat().st_size > 0 else final_path


def _status_or(*args, default: str = "") -> str:
    """Call status_html if available; otherwise return a provided default string."""
    try:
        return status_html(*args)  # type: ignore[name-defined]
    except Exception:
        return default

# =========================
# Asset helpers
# =========================

def refresh_assets():
    """
    Refresh dropdowns from the union of Assets and WORK_ASSETS.
    Returns gr.update configs for audio/bg/mp4/font selectors.
    """
    audio_files, bg_files, mp4_files, font_files = discover_assets_union()  # type: ignore[name-defined]
    mp4_opts = ["None"] + mp4_files
    return (
        gr.update(choices=audio_files, value=(audio_files[0] if audio_files else None)),
        gr.update(choices=bg_files, value=(bg_files[0] if bg_files else None)),
        gr.update(choices=mp4_opts, value="None"),
        gr.update(choices=mp4_opts, value="None"),
        gr.update(choices=mp4_opts, value="None"),
        gr.update(choices=["None"] + font_files, value="None"),
    )


def _stage_optional(name: Optional[str]) -> Optional[Path]:
    """Stage an optional asset by name; graceful None for 'None' or empty."""
    if not name or str(name).strip().lower() == "none":
        return None
    return stage_by_name(name)  # type: ignore[name-defined]

# =========================
# Core pipeline helpers
# =========================

def _transcribe_and_normalize(audio_path: Path, whisper_size: str, fast_audio: bool) -> Tuple[List["Word"], Path]:  # type: ignore[name-defined]
    """
    Parallelize transcription and audio normalization (2 futures).
    Returns (list of Word, normalized audio path).
    """
    with ThreadPoolExecutor(max_workers=2) as ex:
        fut_tr = ex.submit(transcribe_cached, audio_path, whisper_size, "en")  # type: ignore[name-defined]
        fut_norm = ex.submit(normalize_audio_cached, audio_path, bool(fast_audio))  # type: ignore[name-defined]
        words = fut_tr.result()
        norm_path = Path(fut_norm.result())
    return words, norm_path


def _probe_durations(intro: Optional[Path], outro: Optional[Path], main_audio: Path) -> Tuple[float, float, float]:
    """
    Probe durations with ≤2 threads. Returns (intro_d, outro_d, main_d).
    """
    with ThreadPoolExecutor(max_workers=2) as ex:
        fut_main = ex.submit(probe_duration_cached, main_audio)  # type: ignore[name-defined]
        fut_intro = ex.submit(probe_duration_cached, intro) if intro else None  # type: ignore[name-defined]
        intro_d = fut_intro.result() if fut_intro else 0.0
        main_d = fut_main.result()
    outro_d = probe_duration_cached(outro) if outro else 0.0  # type: ignore[name-defined]
    return float(intro_d), float(outro_d), float(main_d)


def _compute_cta_schedule(
    start_sec: float,
    repeat_sec: float,
    intro_d: float,
    main_d: float,
    cta_d: float,
    cap_offset: float,
    max_instances: int = 50
) -> List[float]:
    """
    Compute CTA insertion times across the main section (intro + main).
    Returns a list of start times in seconds.
    """
    times: List[float] = []
    if cta_d <= 0 or repeat_sec <= 0:
        return times
    t = float(cap_offset) + float(start_sec or 0.0)
    end_main = float(intro_d) + float(main_d)
    while t < end_main - 0.1 and len(times) < max_instances:
        times.append(round(t, 3))
        t += float(repeat_sec)
    return times

# ---------------- video filter graph (intro/cta/outro) ----------------

def _compose_video_filters(
    fps: int,
    cur_label: str,
    intro_idx: Optional[int],
    outro_idx: Optional[int],
    cta_idx: Optional[int],
    intro_d: float,
    main_d: float,
    outro_d: float,
    cta_times: List[float],
    cta_d: float,
    cta_key_hex: str,
    cta_similarity: float,
    cta_blend: float,
    cta_position: str,
    intro_chroma_key: bool,
    outro_chroma_key: bool
) -> Tuple[List[str], str]:
    """
    Compose the overlay filtergraph for optional intro/outro and repeatable CTA insertions.
    Returns (list of vf clauses, last video label).
    """
    vf: List[str] = [f"[0:v]fps={int(fps)},format=yuv420p[{cur_label}]"]
    cur = cur_label

    # Intro overlay
    if intro_idx is not None and intro_d > 0:
        ck = (f",colorkey=0x00FF00:{cta_similarity:.2f}:{cta_blend:.2f}") if intro_chroma_key else ""
        vf.append(
            f"[{intro_idx}:v]scale=1920:1080:force_original_aspect_ratio=decrease,"
            f"pad=1920:1080:-1:-1,fps={int(fps)},format=rgba{ck},"
            f"trim=duration={intro_d:.3f},setpts=PTS-STARTPTS[intro]"
        )
        vf.append(f"[{cur}][intro]overlay=eof_action=pass:repeatlast=0:enable='between(t,0,{intro_d:.3f})'[v1]")
        cur = "v1"

    # CTA overlays
    if cta_idx is not None and cta_times:
        vf.append(f"[{cta_idx}:v]scale=1152:-1:force_original_aspect_ratio=decrease,fps={int(fps)},format=rgba[cta_base]")
        vf.append(f"[cta_base]split={len(cta_times)}" + "".join(f"[cta{i}]" for i in range(len(cta_times))))
        cta_y = "20" if cta_position == "Top" else "(H-h)/2" if cta_position == "Middle" else "H-h-20"
        for i, stt in enumerate(cta_times):
            en = stt + cta_d
            vf.append(
                f"[cta{i}]colorkey={cta_key_hex}:{cta_similarity:.2f}:{cta_blend:.2f},"
                f"trim=duration={cta_d:.3f},setpts=PTS-STARTPTS+{stt:.3f}/TB[ct{i}]"
            )
            vf.append(
                f"[{cur}][ct{i}]overlay=x=(W-w)/2:y={cta_y}:eof_action=pass:repeatlast=0:"
                f"enable='between(t,{stt:.3f},{en:.3f})'[vct{i}]"
            )
            cur = f"vct{i}"

    # Outro overlay
    if outro_idx is not None and outro_d > 0:
        out_start = float(intro_d) + float(main_d)
        ck = (f",colorkey=0x00FF00:{cta_similarity:.2f}:{cta_blend:.2f}") if outro_chroma_key else ""
        vf.append(
            f"[{outro_idx}:v]scale=1920:1080:force_original_aspect_ratio=decrease,"
            f"pad=1920:1080:-1:-1,fps={int(fps)},format=rgba{ck},"
            f"trim=duration={outro_d:.3f},setpts=PTS-STARTPTS+{out_start:.3f}/TB[outro]"
        )
        vf.append(
            f"[{cur}][outro]overlay=eof_action=pass:repeatlast=0:"
            f"enable='between(t,{out_start:.3f},{(out_start+outro_d):.3f})'[vout]"
        )
        cur = "vout"

    return vf, cur

# ---------------- audio mix graph ----------------

def _build_audio_filters(
    main_idx: int,
    intro_idx: Optional[int],
    outro_idx: Optional[int],
    cta_idx: Optional[int],
    intro_d: float,
    main_d: float,
    cta_times: List[float],
    cta_d: float
) -> List[str]:
    """
    Build audio filtergraph: mix main with optional intro/outro/CTA stems.
    Returns a list of af clauses ending in [afinal].
    """
    af: List[str] = []
    inputs: List[str] = [f"[{main_idx}:a]"]

    if intro_idx is not None and has_audio_stream_cached(_idx_to_asset(intro_idx)):  # type: ignore[name-defined]
        af.append(f"[{intro_idx}:a]volume=0.5,aresample=48000[introa]")
        inputs.append("[introa]")

    if outro_idx is not None and has_audio_stream_cached(_idx_to_asset(outro_idx)):  # type: ignore[name-defined]
        ms = int((float(intro_d) + float(main_d)) * 1000)
        af.append(f"[{outro_idx}:a]volume=0.5,adelay={ms}|{ms}:all=1,aresample=48000[outroa]")
        inputs.append("[outroa]")

    if cta_idx is not None and cta_times and has_audio_stream_cached(_idx_to_asset(cta_idx)):  # type: ignore[name-defined]
        af.append(f"[{cta_idx}:a]volume=0.5,aresample=48000[ctab]")
        for i, stt in enumerate(cta_times):
            ms = int(stt * 1000)
            af.append(f"[ctab]atrim=0:{cta_d:.3f},asetpts=PTS-STARTPTS,adelay={ms}|{ms}:all=1[cta{i}]")
            inputs.append(f"[cta{i}]")

    if len(inputs) > 1:
        af.append(f"{''.join(inputs)}amix=inputs={len(inputs)}:duration=longest:normalize=0,"
                  f"dynaudnorm=f=150:g=15,alimiter=limit=0.95[afinal]")
    else:
        af.append(f"{inputs[0]}dynaudnorm=f=150:g=15,alimiter=limit=0.95[afinal]")

    return af

# ---------------- ffmpeg runner ----------------

def _run_ffmpeg_with_env(cmd: List[str], env: Dict[str, str]) -> Tuple[int, str]:
    """
    Run ffmpeg with a custom environment, capturing a bounded stderr tail.
    Returns (returncode, stderr_tail).
    """
    p = sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.PIPE, universal_newlines=True, bufsize=1, env=env)
    err_buf: List[str] = []
    while True:
        ln = p.stderr.readline()
        if not ln:
            break
        err_buf.append(ln)
        if len(err_buf) > 400:
            err_buf = err_buf[-300:]
    p.wait()
    return p.returncode, "".join(err_buf[-2000:])

# =========================
# Main render coroutine
# =========================

def render_video(
    audio_name, audio_upload, bg_name, bg_upload,
    whisper_size, font_file,
    ui_font_size, caption_color, border_thickness,
    caption_pos, caption_hmargin, caption_vmargin, karaoke_offset_ms,
    intro_mp4, intro_chroma_key, outro_mp4, outro_chroma_key,
    cta_mp4, cta_start_sec, cta_repeat_sec, cta_key_color, cta_similarity, cta_blend, cta_position,
    profile, fps, fast_audio, subscale_percent, enc_mode, smart_mode,
    line_count, max_words_per_line, max_words_per_caption, strict_word,
    force_bold_ui,
    highlight_mode_ui, align_mode, safe_area_pct, target_cps, line_space_pct, max_extend_sec,
    progress=gr.Progress()
):
    """
    Orchestrate the full pipeline: stage assets, ASR, ASS, main track, overlays, encode.
    Yields (video_path_or_None, status_html, percent).
    """
    started = time.monotonic()

    def tick(step_index: int, hint: float = 0.0, note: str = ""):
        pc = _overall_percent(step_index, hint)  # type: ignore[name-defined]
        yield None, _status_or(step_index, started, hint, note, default="Working..."), pc  # type: ignore[name-defined]

    try:
        # Drive prep
        try:
            if _in_colab() and not Path("/content/drive").exists():  # type: ignore[name-defined]
                _mount_drive(force=False)  # type: ignore[name-defined]
        except Exception:
            pass
        ensure_drive_tree(DRIVE_ROOT)  # type: ignore[name-defined]
        for r in tick(0, 0.35, "Scanning assets..."):
            yield r

        # Stage core assets
        st_audio = stage_by_upload(audio_upload) or stage_by_name(audio_name)  # type: ignore[name-defined]
        st_bg = stage_by_upload(bg_upload) or stage_by_name(bg_name)  # type: ignore[name-defined]
        if not st_audio:
            yield None, _status_or(0, started), 0.0
            yield None, "Provide an Audio (upload or from Assets).", 0.0
            return
        if not st_bg:
            yield None, _status_or(0, started), 0.0
            yield None, "Provide a Background image (upload or from Assets).", 0.0
            return

        st_intro = _stage_optional(intro_mp4)
        st_outro = _stage_optional(outro_mp4)
        st_cta = _stage_optional(cta_mp4)

        # Font resolution and outline
        font_info = FONTM.resolve_font_selection(font_file if font_file and str(font_file).lower() != "none" else None)  # type: ignore[name-defined]
        selected_font_path = Path(font_file) if font_file and str(font_file).lower() != "none" else None
        internal_name = _extract_internal_font_name(selected_font_path) if selected_font_path else None
        if internal_name:
            font_info.family = internal_name  # type: ignore[attr-defined]

        mapped_fs = ui_size_to_ass(int(ui_font_size))  # type: ignore[name-defined]
        thinish = any(k in (getattr(font_info, "subfam", "") or "").lower() for k in ["thin", "extralight", "ultralight", "light"])
        auto_outline = max(int(border_thickness or 0), max(2, int(round(mapped_fs * 0.05)))) if thinish else int(border_thickness or 0)
        for r in tick(1):
            yield r

        # Transcription + normalization
        words, norm_audio = _transcribe_and_normalize(st_audio, whisper_size, bool(fast_audio))
        for r in tick(2):
            yield r

        # Durations & timing config
        intro_d, outro_d, main_d = _probe_durations(st_intro, st_outro, norm_audio)
        total_d = float(intro_d + main_d + (outro_d or 0.0))
        if total_d <= 0:
            yield None, _status_or(2, started), 5.0
            yield None, "Invalid duration", 5.0
            return

        cap_offset = float(intro_d)
        subscale = max(0.7, min(1.0, (subscale_percent or 80) / 100.0))
        rw, rh, _ = effective_render_size(1920, 1080, subscale)  # type: ignore[name-defined]
        fps_val = int(fps or 24)

        # Smart assist (optional)
        if smart_mode:
            dec = smart_decide(words, st_bg)  # type: ignore[name-defined]
            caption_hmargin, caption_vmargin = dec["hmargin"], dec["vmargin"]
            fps_val, enc_mode = dec["fps"], dec["enc_mode"]
            wpm = int(extract_speech_stats(words)["wpm"])  # type: ignore[name-defined]
            note = f"{wpm}wpm • {fps_val}fps • sub {int(subscale*100)}% • {str(enc_mode).upper()}"
            for r in tick(3, 0.22, note):
                yield r
        else:
            for r in tick(3, 0.22):
                yield r

        # Fonts env for ffmpeg
        default_assets = DRIVE_ROOT / "VideoRobot" / "Assets"  # type: ignore[name-defined]
        _, env_ffmpeg = _ensure_fonts_env(selected_font_path, default_assets)

        # ASS generation
        ass_file = generate_ass_track(  # type: ignore[name-defined]
            words=words,
            font_info=font_info,
            ui_font_size=int(ui_font_size),
            position=str(caption_pos),
            margin_h=int(caption_hmargin),
            margin_v=int(caption_vmargin),
            border_thickness=int(border_thickness or 0),
            primary_color=(caption_color or "#FFFFFF"),
            highlight_color=HL_FIXED_COLOR,
            time_offset=cap_offset,
            karaoke_offset_ms=int(karaoke_offset_ms or 0),
            style_profile=str(profile),
            highlight_mode_ui=str(highlight_mode_ui),
            rw=int(rw),
            rh=int(rh),
            keyword_overlay=False,
            keyword_list=[],
            key_txt_color="#FFFFFF",
            key_bg_color="#000000",
            key_pos="Top",
            audio_for_snap=Path(norm_audio),
            force_bold_ui=bool(force_bold_ui),
            outline_override=int(auto_outline),
            line_count=int(line_count),
            max_words_per_line=int(max_words_per_line),
            max_words_per_caption=int(max_words_per_caption),
            strict_word=bool(strict_word),
            align_mode=str(align_mode),
            safe_area_pct=int(safe_area_pct),
            line_space_pct=int(line_space_pct),
            target_cps=int(target_cps),
            min_caption_sec=0.25,
            max_extend_sec=float(max_extend_sec),
        )
        if not ass_file.exists():
            yield None, _status_or(5, started), 0.0
            yield None, f"ASS not generated: {ass_file}", 0.0
            return
        for r in tick(4, 0.55):
            yield r

        # Main track (burn ASS)
        main_track = build_main_track_cached(  # type: ignore[name-defined]
            st_bg, ass_file, Path(norm_audio), float(intro_d), float(total_d), int(fps_val), float(subscale)
        )
        if not main_track.exists():
            yield None, _status_or(6, started), 0.0
            yield None, f"Main track not built: {main_track}", 0.0
            return
        for r in tick(5):
            yield r

        # Optional overlays: intro/outro/cta
        inputs: List[str] = ["-thread_queue_size", "512", "-i", str(main_track)]
        idx = 1
        intro_idx = outro_idx = cta_idx = None
        cta_d = 0.0
        cta_times: List[float] = []

        if st_intro and intro_d > 0:
            inputs += ["-thread_queue_size", "512", "-i", str(st_intro)]
            intro_idx = idx
            idx += 1
        if st_outro and outro_d > 0:
            inputs += ["-thread_queue_size", "512", "-i", str(st_outro)]
            outro_idx = idx
            idx += 1
        if st_cta:
            cta_d = probe_duration_cached(st_cta)  # type: ignore[name-defined]
            if cta_d > 0:
                inputs += ["-thread_queue_size", "512", "-i", str(st_cta)]
                cta_idx = idx
                idx += 1
                cta_times = _compute_cta_schedule(
                    start_sec=float(cta_start_sec or 0.0),
                    repeat_sec=float(cta_repeat_sec or 0.0),
                    intro_d=float(intro_d),
                    main_d=float(main_d),
                    cta_d=float(cta_d),
                    cap_offset=float(cap_offset),
                )

        # Map indexes → asset Paths for audio-stream checks
        _ASSET_INDEX_MAP.clear()
        if intro_idx is not None and st_intro:
            _ASSET_INDEX_MAP[int(intro_idx)] = Path(st_intro)
        if outro_idx is not None and st_outro:
            _ASSET_INDEX_MAP[int(outro_idx)] = Path(st_outro)
        if cta_idx is not None and st_cta:
            _ASSET_INDEX_MAP[int(cta_idx)] = Path(st_cta)

        vf_parts, last_v_label = _compose_video_filters(
            fps=int(fps_val),
            cur_label="vbg",
            intro_idx=intro_idx,
            outro_idx=outro_idx,
            cta_idx=cta_idx,
            intro_d=float(intro_d),
            main_d=float(main_d),
            outro_d=float(outro_d),
            cta_times=cta_times,
            cta_d=float(cta_d),
            cta_key_hex="0x" + str(cta_key_color or "#00FF00").lstrip("#").upper(),
            cta_similarity=float(max(0.0, min(1.0, float(cta_similarity or 0.0)))),
            cta_blend=float(max(0.0, min(1.0, float(cta_blend or 0.0)))),
            cta_position=str(cta_position),
            intro_chroma_key=bool(intro_chroma_key),
            outro_chroma_key=bool(outro_chroma_key),
        )

        af_parts = _build_audio_filters(
            main_idx=0,
            intro_idx=intro_idx,
            outro_idx=outro_idx,
            cta_idx=cta_idx,
            intro_d=float(intro_d),
            main_d=float(main_d),
            cta_times=cta_times,
            cta_d=float(cta_d),
        )

        filter_complex = ";".join(vf_parts + af_parts)
        enc_common = ["-threads", "0", "-filter_threads", "2", "-filter_complex_threads", "2"]

        out_dir = OUTPUT_ROOT  # type: ignore[name-defined]
        out_dir.mkdir(parents=True, exist_ok=True)
        ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        out_path = out_dir / f"{ts}_{Path(st_audio).stem}_1920x1080_{str(enc_mode).upper()}.mp4"
        tmp = out_path.with_name(out_path.stem + ".tmp.mp4")
        try:
            if tmp.exists():
                tmp.unlink()
        except Exception:
            pass

        v_color = ["-colorspace", "bt709", "-color_primaries", "bt709", "-color_trc", "bt709", "-color_range", "tv"]

        v_nv = ["-c:v", "h264_nvenc"] + (
            ["-preset", "p3", "-rc", "constqp", "-qp", "18"] if str(enc_mode) == "race"
            else ["-preset", "p4", "-tune", "hq", "-rc", "vbr", "-cq", "19", "-b:v", "6M", "-maxrate", "8M", "-bufsize", "12M"]
        ) + ["-g", str(int(fps_val) * 2), "-bf", "2", "-profile:v", "high", "-pix_fmt", "yuv420p", "-movflags", "+faststart", *v_color]

        v_x = ["-c:v", "libx264", "-preset", "fast", "-crf", "21",
               "-g", str(int(fps_val) * 2), "-bf", "2", "-profile:v", "high", "-pix_fmt", "yuv420p", "-movflags", "+faststart", *v_color]

        vmap = f"[{last_v_label}]"
        cmd_base = ["ffmpeg", "-hide_banner", "-nostdin", "-loglevel", "error", "-y",
                    *enc_common, *inputs,
                    "-filter_complex", filter_complex, "-map", vmap, "-map", "[afinal]",
                    "-t", str(float(total_d)), "-r", str(int(fps_val))]
        cmd_nv = [*cmd_base, *v_nv, "-c:a", "aac", "-b:a", "128k", "-ac", "2",
                  "-map_metadata", "-1", "-map_chapters", "-1", "-max_muxing_queue_size", "1024", str(tmp)]
        cmd_x  = [*cmd_base, *v_x , "-c:a", "aac", "-b:a", "128k", "-ac", "2",
                  "-map_metadata", "-1", "-map_chapters", "-1", "-max_muxing_queue_size", "1024", str(tmp)]

        for r in tick(7, 0.05, "Encoding..."):
            yield r

        used_nv = False
        if NVENC_AVAILABLE:  # type: ignore[name-defined]
            rc, tail = _run_ffmpeg_with_env(cmd_nv, env_ffmpeg); used_nv = True
            if rc != 0:
                rc2, tail2 = _run_ffmpeg_with_env(cmd_nv, env_ffmpeg)
                if rc2 != 0:
                    rcx, tailx = _run_ffmpeg_with_env(cmd_x, env_ffmpeg)
                    if rcx != 0 or (not tmp.exists()) or tmp.stat().st_size <= 0:
                        yield None, _status_or(7, started), _overall_percent(7, 0.0)  # type: ignore[name-defined]
                        yield None, f"FFmpeg failed:\n{(tailx or tail2 or tail)[-2000:]}", _overall_percent(7, 0.0)  # type: ignore[name-defined]
                        return
        else:
            rcx, tailx = _run_ffmpeg_with_env(cmd_x, env_ffmpeg)
            if rcx != 0 or (not tmp.exists()) or tmp.stat().st_size <= 0:
                yield None, _status_or(7, started), _overall_percent(7, 0.0)  # type: ignore[name-defined]
                yield None, f"FFmpeg failed:\n{(tailx or '')[-2000:]}", _overall_percent(7, 0.0)  # type: ignore[name-defined]
                return

        for r in tick(8, 0.7):
            yield r

        try:
            tmp.replace(out_path)
        except Exception:
            yield None, _status_or(8, started), _overall_percent(8, 0.7)  # type: ignore[name-defined]
            yield None, "Failed to finalize output file", _overall_percent(8, 0.7)  # type: ignore[name-defined]
            return

        try:
            if hasattr(os, "sync"):
                os.sync()
        except Exception:
            pass

        local_play = _safe_local_copy(out_path)
        size_mb = (out_path.stat().st_size / (1024 * 1024)) if out_path.exists() else 0.0
        msg = f"Rendered<br>{out_path.name}<br>{size_mb:.1f} MB<br>1920x1080@{fps_val}fps"
        yield str(local_play), _status_or(len(STEPS), started, 1.0) + f"<div style='margin-top:8px'>{msg}</div>", 100.0  # type: ignore[name-defined]

    except Exception as e:
        import traceback
        yield None, _status_or(0, started), 0.0  # type: ignore[name-defined]
        yield None, f"Failed:<br><pre>{traceback.format_exc()}\n{e}</pre>", 0.0
        return

# =========================
# UI factory
# =========================

def create_ui():
    """
    Build the Gradio Blocks UI and wire callbacks.
    Returns the Blocks instance.
    """
    ensure_drive_tree(DRIVE_ROOT)  # type: ignore[name-defined]
    a, b, m, f = discover_assets_union()  # type: ignore[name-defined]
    nv = "✅" if NVENC_AVAILABLE else "❌"  # type: ignore[name-defined]

    theme = gr.themes.Soft(primary_hue="blue", secondary_hue="cyan", neutral_hue="slate")
    css = ".pro-header{background:linear-gradient(135deg,#667eea 0%,#764ba2 100%);padding:1rem;border-radius:0.9rem;margin-bottom:1rem;color:white;text-align:center;}"

    with gr.Blocks(theme=theme, title="VideoRobot Studio Pro — CLEAN", css=css) as ui:
        gr.HTML(
            f'<div class="pro-header"><h1>🎬 VideoRobot Studio Pro</h1>'
            f'<p style="opacity:.9">WORK:{WORK_ROOT} • DRIVE:{DRIVE_ROOT} • NVENC {nv}</p></div>'  # type: ignore[name-defined]
        )

        with gr.Tabs():
            with gr.Tab("Inputs"):
                with gr.Row():
                    with gr.Column():
                        audio    = gr.Dropdown(choices=a, value=(a[0] if a else None), label="Audio (Assets)")
                        audio_up = gr.File(file_count="single", file_types=[".mp3", ".wav", ".m4a", ".aac"], label="Or upload audio")
                        bg       = gr.Dropdown(choices=b, value=(b[0] if b else None), label="Background (Assets)")
                        bg_up    = gr.File(file_count="single", file_types=[".png", ".jpg", ".jpeg"], label="Or upload background")
                        whisper  = gr.Radio(["large-v3", "medium", "small"], value="small", label="Whisper")
                        refresh  = gr.Button("🔄 Refresh Assets", variant="secondary")
                    with gr.Column():
                        gr.HTML("<div style='opacity:.85'>Preview and live progress are in the <b>Output</b> tab.</div>")

            with gr.Tab("Captions"):
                with gr.Row():
                    with gr.Column():
                        size         = gr.Slider(10, 100, 60, 1, label="Caption Size (0–100)")
                        color        = gr.ColorPicker("#FFFFFF", label="Text Color")
                        border       = gr.Slider(1, 12, 3, 1, label="Outline")
                        pos          = gr.Radio(["Top", "Middle", "Bottom"], value="Bottom", label="Position")
                        hmargin      = gr.Slider(0, 400, 154, 2, label="Horizontal Margin (px)")
                        vmargin      = gr.Slider(0, 400, 97, 2, label="Vertical Margin (px)")
                        koffset      = gr.Slider(-400, 400, 60, 10, label="Karaoke Offset (ms)")
                        strict_word  = gr.Checkbox(True, label="Strict per-word box")
                        line_count   = gr.Radio([1, 2], value=1, label="Lines per Caption")
                        max_wpl      = gr.Slider(1, 12, 6, 1, label="Max Words per Line")
                        max_wpc      = gr.Slider(1, 24, 12, 1, label="Max Words per Caption")
                    with gr.Column():
                        highlight    = gr.Radio(["word_fill", "word_pill", "word_bg", "none"], value="word_fill", label="Highlight Mode")
                        align_mode   = gr.Radio(["left", "center", "right"], value="left", label="Align")
                        safe_area    = gr.Slider(60, 100, 92, 1, label="Safe Area %")
                        target_cps   = gr.Slider(8, 28, 17, 1, label="Target CPS")
                        line_space   = gr.Slider(60, 200, 100, 2, label="Line Spacing %")
                        max_extend   = gr.Slider(0.0, 1.0, 0.40, 0.01, label="Max Extend (s)")
                        font         = gr.Dropdown(["None"] + f, value="None", label="Font file (TTF/OTF)")
                        force_bold_ui= gr.Checkbox(False, label="Force Bold text")

            with gr.Tab("Intro / Outro / CTA"):
                a2, b2, m2, f2 = discover_assets_union()  # type: ignore[name-defined]
                mp4_opts = ["None"] + m2
                with gr.Row():
                    with gr.Column():
                        intro    = gr.Dropdown(mp4_opts, value="None", label="Intro")
                        intro_ck = gr.Checkbox(False, label="Chroma Key Intro")
                        outro    = gr.Dropdown(mp4_opts, value="None", label="Outro")
                        outro_ck = gr.Checkbox(False, label="Chroma Key Outro")
                    with gr.Column():
                        cta       = gr.Dropdown(mp4_opts, value="None", label="CTA Loop")
                        cta_pos   = gr.Radio(["Top", "Middle", "Bottom"], value="Middle", label="CTA Position")
                        cta_start = gr.Slider(5, 300, 30, 5, label="CTA Start (s)")
                        cta_repeat= gr.Slider(10, 600, 120, 10, label="CTA Repeat every (s)")
                        cta_key   = gr.ColorPicker("#00FF00", label="Chroma Key Color")
                        cta_sim   = gr.Slider(0.01, 1.0, 0.42, 0.01, label="Similarity")
                        cta_blend = gr.Slider(0.0, 1.0, 0.08, 0.01, label="Blend")

            with gr.Tab("Performance"):
                with gr.Row():
                    with gr.Column():
                        profile    = gr.Radio(["balanced", "turbo"], value="turbo", label="Style Profile")
                        fps        = gr.Slider(24, 60, 24, step=6, label="FPS")
                        fast_audio = gr.Checkbox(True, label="Fast Audio (skip loudnorm)")
                        subscale   = gr.Slider(70, 100, 80, 1, label="Subtitle Subscale %")
                        enc_mode   = gr.Radio(["studio", "race"], value="studio", label="Encoder Mode")
                        smart_mode = gr.Checkbox(True, label="Smart Assist", value=True)

            with gr.Tab("Output"):
                with gr.Row():
                    with gr.Column(scale=1):
                        btn  = gr.Button("🎬 Render", variant="primary")
                        sout = gr.HTML(value=_status_or(0, time.monotonic()), label="📊 Status")
                        pbar = gr.Slider(0, 100, 0, step=1, interactive=False, label="Live Progress %")
                    with gr.Column(scale=1):
                        vout = gr.Video(label="📹 Output Preview", height=420, autoplay=True, show_download_button=True)

        btn.click(
            render_video,
            [
                audio, audio_up, bg, bg_up, whisper, font,
                size, color, border, pos, hmargin, vmargin, koffset,
                intro, intro_ck, outro, outro_ck,
                cta, cta_start, cta_repeat, cta_key, cta_sim, cta_blend, cta_pos,
                profile, fps, fast_audio, subscale, enc_mode, smart_mode,
                line_count, max_wpl, max_wpc, strict_word,
                force_bold_ui,
                highlight, align_mode, safe_area, target_cps, line_space, max_extend
            ],
            [vout, sout, pbar],
        )
        refresh.click(refresh_assets, outputs=[audio, bg, intro, outro, cta, font])
    return ui

print("UI factory ready.")
ui = create_ui()


UI factory ready.


In [11]:
#@title 11) Launch — Hidden
#@markdown Start the app inline in Colab.
ui = create_ui()
ui.queue(api_open=False).launch(inline=True, show_error=True, prevent_thread_lock=True, debug=False)
print("READY")


It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://9f5dd1a10eeb841a6d.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


READY
