# Video Transcription and Subtitle Embedding Pipeline (Production-Grade)

This notebook extends the refactored version with:
- Hash-based audio fingerprints (idempotent processing)
- CPU/GPU-aware parallel transcription
- Structured JSON logs for post-run audit


## 1. Imports

In [22]:
from src.audio_transcribe import transcribe_audio
from src.embed_caption import embed_subtitles
from media_tool.media_tool import run
import os, json, hashlib, time
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import torch
import whisper

## 2. Parameters

In [23]:
VIDEO_DIR = Path(r"/mnt/c/Users/ingca/Videos/Diavoli S01 (2020) 1080p WEB-DL H264 iTA ENG AC3 - iDN_CreW")
WORKDIR = Path.cwd()
LANGS = ["eng"]
DEFAULT_LANG = "eng"
WHISPER_MODEL_SIZE = "medium"
MAX_WORKERS_CPU = os.cpu_count() // 2
MAX_WORKERS_GPU = 1

from pathlib import Path

WORKDIR = Path.cwd().resolve()
WORKDIR.mkdir(parents=True, exist_ok=True)
print(f"→ Working directory: {WORKDIR}")




→ Working directory: C:\Users\ingca\OneDrive\Documents\python\test_ac


## 3. Directories

In [None]:
from pathlib import Path

WORKDIR = Path(WORKDIR)
print(f"→ Working directory: {WORKDIR}")
print(f"→ WORKDIR type: {type(WORKDIR)}")
print("WORKDIR =", WORKDIR)
print("Exists  =", WORKDIR.exists())
print("Is dir  =", WORKDIR.is_dir())
print("Is file =", WORKDIR.is_file())


p = WORKDIR / "fingerprints"
WORKDIR = Path(WORKDIR).resolve()
WORKDIR.mkdir(parents=True, exist_ok=True)
p.mkdir(parents=True, exist_ok=True)


print(p.exists(), p.is_dir(), p.is_file())



→ Working directory: C:\Users\ingca\OneDrive\Documents\python\test_ac
→ WORKDIR type: <class 'pathlib.WindowsPath'>
WORKDIR = C:\Users\ingca\OneDrive\Documents\python\test_ac
Exists  = True
Is dir  = True
Is file = False


FileNotFoundError: [WinError 2] Impossibile trovare il file specificato: 'c:\\Users\\ingca\\OneDrive\\Documents\\python\\test_ac\\fingerprints'

In [None]:

DIRS = {
    "subbed": WORKDIR / "subbed",
    "srt": WORKDIR / "srt",
    "input_files": WORKDIR / "input_files",
    "fingerprints": WORKDIR / "fingerprints",
    "logs": WORKDIR / "logs",
}

for d in DIRS.values():
    d.mkdir(parents=True, exist_ok=True)


## 4. Structured Logger

In [None]:
LOG_FILE = DIRS["logs"] / "run_log.jsonl"

def log_event(event: dict):
    event["timestamp"] = time.time()
    with open(LOG_FILE, "a") as f:
        f.write(json.dumps(event) + "\n")

## 5. Audio Fingerprinting

In [None]:
def fingerprint_file(path: Path) -> str:
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(8192), b""):
            h.update(chunk)
    return h.hexdigest()

## 6. Whisper Model (GPU aware)

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = whisper.load_model(WHISPER_MODEL_SIZE, device=device)
MAX_WORKERS = MAX_WORKERS_GPU if device == "cuda" else MAX_WORKERS_CPU


## 7. Episode Discovery

In [None]:
episodes = sorted([p for p in VIDEO_DIR.iterdir() if p.suffix == ".mkv"])
ep_to_exclude = ['S01E01', 'S01E02', 'S01E03', 'S01E04', 'S01E05', 'S01E06', 'S01E07']
episodes = [ep for ep in episodes if not any(bad in ep for bad in ep_to_exclude)]
print(f"→ Episodes to process: {[ep.name for ep in episodes]}")

## 8. Episode Processing Function

In [None]:
def process_episode(video_path: Path):
    episode = video_path.stem
    try:
        run(
            input_path=str(video_path),
            output_dir=str(DIRS["input_files"]),
            languages=LANGS,
            default_language=DEFAULT_LANG,
        )

        audio_dir = DIRS["input_files"] / episode
        audio_file = next(f for f in audio_dir.iterdir() if f.suffix == ".mka" and DEFAULT_LANG in f.name)

        fp = fingerprint_file(audio_file)
        fp_file = DIRS["fingerprints"] / f"{fp}.json"

        srt_path = DIRS["srt"] / f"{episode}_subtitles.srt"

        if not fp_file.exists():
            log_event({"episode": episode, "stage": "transcription_start"})
            srt_path = transcribe_audio(
                audio_file=str(audio_file),
                output_dir=str(DIRS["srt"]),
                output_format="srt",
                model=model,
            )
            fp_file.write_text(json.dumps({"episode": episode, "audio": str(audio_file)}))
            log_event({"episode": episode, "stage": "transcription_done"})
        else:
            log_event({"episode": episode, "stage": "transcription_skipped"})

        embed_subtitles(
            input_mkv=str(video_path),
            subtitle_files=[{"file": str(srt_path), "language": DEFAULT_LANG}],
            output_mkv=str(DIRS["subbed"] / f"{episode}_with_subs.mkv"),
        )

        log_event({"episode": episode, "stage": "completed"})
    except Exception as e:
        log_event({"episode": episode, "stage": "error", "error": str(e)})
        raise

## 9. Parallel Execution

In [None]:
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    futures = [executor.submit(process_episode, ep) for ep in episodes]
    for f in tqdm(as_completed(futures), total=len(futures), desc="Episodes"):
        f.result()

## 10. Auditability

- Fingerprints guarantee idempotency across runs
- JSONL logs allow replay, filtering, and forensic inspection
- Parallelism adapts automatically to CPU/GPU availability
