# Audio Extraction Review Pipeline

Notebook-first local pipeline for:

1. Single-file or batch media input discovery
2. Deterministic ffmpeg extraction to mono 16k FLAC
3. Local `faster-whisper` transcription with reusable transcript JSON artifacts
4. Optional best-effort diarization via WhisperX/pyannote with `UNKNOWN` fallback
5. Per-run metadata snapshots with stage timings and resumable checkpoints
6. Schema-constrained local LLM segmentation into chat-style blocks with chronology/overlap validation



In [None]:
from __future__ import annotations

import json
import os
import shlex
import subprocess
import time
import traceback
from datetime import datetime, timezone
try:
    from dotenv import load_dotenv
except Exception:
    def load_dotenv(*_args: Any, **_kwargs: Any) -> bool:
        return False
from pathlib import Path
from typing import Any
from uuid import uuid4



In [None]:
CONFIG: dict[str, Any] = {
    "input_mode": "single",  # "single" | "batch"
    "single_input": "large-files/Doug and Twitch Chat TAKE OVER EUROPE-VpmmuHlLPM0.mkv",
    "batch_inputs": [],
    "batch_glob": "*.mkv",
    "force_reextract": False,
    "force_retranscribe": False,
    "ffmpeg": {
        "audio_codec": "flac",
        "sample_rate": 16000,
        "channels": 1,
        "overwrite": True,
    },
    "transcription": {
        "model_name": "tiny.en",
        "device": "cpu",
        "compute_type": "int8",
        "beam_size": 5,
        "vad_filter": True,
        "word_timestamps": True,
    },
    "diarization": {
        "enabled": True,
        "provider": "whisperx",
        "device": "cpu",
        "min_overlap_seconds": 0.2,
},
"segmentation": {
    "enabled": True,
    "base_url": "http://localhost:1234/v1",
    "api_key": "lm-studio",
    "model": "qwen3-vl-8b-instruct",
    "temperature": 0.2,
    "window_seconds": 240.0,
    "window_overlap_seconds": 20.0,
    "minimum_overlap_seconds": 1.5,
    "max_summary_words": 80,
    "style_reference": "00-dev-log/2026-02-09.md",
    "smoke_check_timeout_seconds": 5,
    },
}



In [None]:
def now_iso() -> str:
    return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")


def resolve_project_paths(start: Path | None = None) -> dict[str, Path]:
    start_path = (start or Path.cwd()).resolve()
    candidates = [start_path, *start_path.parents]
    anchor = next((p / "00-supporting-files" for p in candidates if (p / "00-supporting-files").exists()), None)
    if anchor is None:
        raise FileNotFoundError("Could not locate 00-supporting-files anchor from current notebook path")

    project_root = anchor.parent
    project_parent = project_root.parent
    media_root = project_parent / "large-files"
    if not media_root.exists():
        media_root = project_root / "large-files"

    env_path = anchor / "data" / ".env"
    if env_path.exists():
        load_dotenv(env_path, override=False)

    data_root = anchor / "data" / "audio-extraction-review"
    paths = {
        "project_root": project_root,
        "supporting_files": anchor,
        "project_parent": project_parent,
        "media_root": media_root,
        "env_path": env_path,
        "data_root": data_root,
        "audio_dir": data_root / "audio",
        "logs_dir": data_root / "logs",
        "runs_dir": data_root / "runs",
        "transcripts_dir": data_root / "transcripts",
    }
    for key in ("data_root", "audio_dir", "logs_dir", "runs_dir", "transcripts_dir"):
        paths[key].mkdir(parents=True, exist_ok=True)
    return paths


def as_project_relative(path: Path, project_root: Path) -> str:
    try:
        return str(path.resolve().relative_to(project_root))
    except Exception:
        return str(path.resolve())


def discover_inputs(config: dict[str, Any], paths: dict[str, Path]) -> list[Path]:
    project_root = paths["project_root"]
    media_root = paths["media_root"]
    mode = config["input_mode"].strip().lower()

    if mode == "single":
        p = Path(config["single_input"])
        if not p.is_absolute():
            if p.parts and p.parts[0] == "large-files":
                p = (media_root / Path(*p.parts[1:])).resolve()
            else:
                p = (project_root / p).resolve()
        return [p]

    if mode == "batch":
        discovered: list[Path] = []
        for item in config.get("batch_inputs", []):
            p = Path(item)
            if not p.is_absolute():
                if p.parts and p.parts[0] == "large-files":
                    p = (media_root / Path(*p.parts[1:])).resolve()
                else:
                    p = (project_root / p).resolve()
            discovered.append(p)

        if config.get("batch_glob"):
            if media_root.exists():
                discovered.extend(sorted(media_root.glob(config["batch_glob"])))

        unique = sorted({p.resolve() for p in discovered})
        return unique

    raise ValueError(f"Unsupported input_mode={mode}; expected 'single' or 'batch'")


def output_audio_path(input_media: Path, audio_dir: Path) -> Path:
    safe_stem = input_media.stem.replace(" ", "_")
    return audio_dir / f"{safe_stem}.flac"


def append_jsonl(path: Path, record: dict[str, Any]) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("a", encoding="utf-8") as fh:
        fh.write(json.dumps(record, ensure_ascii=True) + "\n")


def ffprobe_duration_seconds(path: Path) -> float | None:
    cmd = [
        "ffprobe",
        "-v",
        "error",
        "-show_entries",
        "format=duration",
        "-of",
        "default=noprint_wrappers=1:nokey=1",
        str(path),
    ]
    result = subprocess.run(cmd, capture_output=True, text=True)
    if result.returncode != 0:
        return None
    raw = result.stdout.strip()
    if not raw:
        return None
    try:
        return float(raw)
    except ValueError:
        return None


def extract_audio(input_media: Path, output_audio: Path, ffmpeg_cfg: dict[str, Any]) -> subprocess.CompletedProcess[str]:
    cmd = [
        "ffmpeg",
        "-v",
        "error",
        "-y" if ffmpeg_cfg.get("overwrite", True) else "-n",
        "-i",
        str(input_media),
        "-vn",
        "-ac",
        str(ffmpeg_cfg.get("channels", 1)),
        "-ar",
        str(ffmpeg_cfg.get("sample_rate", 16000)),
        "-c:a",
        str(ffmpeg_cfg.get("audio_codec", "flac")),
        str(output_audio),
    ]
    return subprocess.run(cmd, capture_output=True, text=True)


def run_extraction_stage(
    *,
    inputs: list[Path],
    paths: dict[str, Path],
    config: dict[str, Any],
    run_id: str,
) -> dict[str, Any]:
    stage_start = time.perf_counter()
    extraction_log = paths["logs_dir"] / f"extraction-{run_id}.jsonl"
    failure_log = paths["logs_dir"] / "extraction-failures.jsonl"

    records: list[dict[str, Any]] = []
    failures: list[dict[str, Any]] = []

    for input_media in inputs:
        record: dict[str, Any] = {
            "run_id": run_id,
            "timestamp": now_iso(),
            "stage": "extract",
            "input_media": str(input_media),
            "status": "pending",
        }

        if not input_media.exists():
            record.update({
                "status": "failed",
                "error": "input_not_found",
            })
            append_jsonl(extraction_log, record)
            append_jsonl(failure_log, record)
            failures.append(record)
            records.append(record)
            continue

        output_audio = output_audio_path(input_media, paths["audio_dir"])
        if output_audio.exists() and not config.get("force_reextract", False):
            record.update({
                "status": "reused",
                "audio_path": str(output_audio),
                "resume_marker": True,
            })
            append_jsonl(extraction_log, record)
            records.append(record)
            continue

        cmd_result = extract_audio(input_media, output_audio, config["ffmpeg"])
        if cmd_result.returncode != 0:
            record.update({
                "status": "failed",
                "error": "ffmpeg_failed",
                "stderr": cmd_result.stderr.strip(),
                "command": " ".join(shlex.quote(part) for part in cmd_result.args),
            })
            append_jsonl(extraction_log, record)
            append_jsonl(failure_log, record)
            failures.append(record)
            records.append(record)
            continue

        record.update({
            "status": "ok",
            "audio_path": str(output_audio),
            "audio_duration_seconds": ffprobe_duration_seconds(output_audio),
            "resume_marker": False,
        })
        append_jsonl(extraction_log, record)
        records.append(record)

    duration = time.perf_counter() - stage_start
    return {
        "stage": "extract",
        "duration_seconds": round(duration, 3),
        "records": records,
        "failures": failures,
        "log_path": str(extraction_log),
        "failure_log_path": str(failure_log),
    }


def _segment_overlap_seconds(segment_start: float, segment_end: float, diar_start: float, diar_end: float) -> float:
    overlap = min(segment_end, diar_end) - max(segment_start, diar_start)
    return max(0.0, overlap)


def best_effort_diarization(
    *,
    audio_path: Path,
    config: dict[str, Any],
) -> tuple[list[dict[str, Any]], dict[str, Any]]:
    diar_cfg = config.get("diarization", {})
    if not diar_cfg.get("enabled", True):
        return [], {
            "attempted": False,
            "provider": diar_cfg.get("provider", "whisperx"),
            "used": False,
            "fallback_reason": "disabled_in_config",
        }

    token = os.getenv("HUGGINGFACE_TOKEN")
    if not token:
        return [], {
            "attempted": True,
            "provider": diar_cfg.get("provider", "whisperx"),
            "used": False,
            "fallback_reason": "missing_huggingface_token",
        }

    try:
        import whisperx
        from whisperx.diarize import DiarizationPipeline
    except Exception:
        return [], {
            "attempted": True,
            "provider": diar_cfg.get("provider", "whisperx"),
            "used": False,
            "fallback_reason": "whisperx_or_pyannote_not_installed",
        }

    try:
        audio = whisperx.load_audio(str(audio_path))
        diarize_model = DiarizationPipeline(token=token, device=diar_cfg.get("device", "cpu"))
        diar_df = diarize_model(audio)

        diar_segments: list[dict[str, Any]] = []
        for _, row in diar_df.iterrows():
            diar_segments.append({
                "start": float(row["start"]),
                "end": float(row["end"]),
                "speaker": str(row["speaker"]),
            })

        return diar_segments, {
            "attempted": True,
            "provider": diar_cfg.get("provider", "whisperx"),
            "used": True,
            "fallback_reason": None,
        }
    except Exception as exc:
        return [], {
            "attempted": True,
            "provider": diar_cfg.get("provider", "whisperx"),
            "used": False,
            "fallback_reason": f"diarization_failed: {exc}",
        }


def pick_segment_speaker(
    *,
    segment_start: float,
    segment_end: float,
    diar_segments: list[dict[str, Any]],
    min_overlap_seconds: float,
) -> str:
    if not diar_segments:
        return "UNKNOWN"

    best: tuple[float, str] | None = None
    for diar in diar_segments:
        overlap = _segment_overlap_seconds(segment_start, segment_end, diar["start"], diar["end"])
        if overlap <= 0:
            continue
        if best is None or overlap > best[0]:
            best = (overlap, diar["speaker"])

    if best is None or best[0] < min_overlap_seconds:
        return "UNKNOWN"
    return best[1]


def transcribe_audio_with_faster_whisper(audio_path: Path, config: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
    try:
        from faster_whisper import WhisperModel
    except Exception as exc:
        raise RuntimeError("faster_whisper_not_installed") from exc

    t_cfg = config["transcription"]
    model = WhisperModel(
        t_cfg.get("model_name", "tiny.en"),
        device=t_cfg.get("device", "cpu"),
        compute_type=t_cfg.get("compute_type", "int8"),
    )

    segments_iter, info = model.transcribe(
        str(audio_path),
        beam_size=t_cfg.get("beam_size", 5),
        vad_filter=t_cfg.get("vad_filter", True),
        word_timestamps=t_cfg.get("word_timestamps", True),
    )

    segments: list[dict[str, Any]] = []
    for idx, seg in enumerate(segments_iter, start=1):
        words = []
        for w in (seg.words or []):
            words.append({
                "start": float(w.start),
                "end": float(w.end),
                "word": w.word,
                "probability": float(w.probability),
            })

        segments.append({
            "id": idx,
            "start": float(seg.start),
            "end": float(seg.end),
            "text": seg.text.strip(),
            "words": words,
        })

    info_payload = {
        "language": getattr(info, "language", None),
        "language_probability": float(getattr(info, "language_probability", 0.0) or 0.0),
        "duration": float(getattr(info, "duration", 0.0) or 0.0),
        "duration_after_vad": float(getattr(info, "duration_after_vad", 0.0) or 0.0),
    }
    return segments, info_payload


def transcript_output_path(audio_path: Path, transcripts_dir: Path) -> Path:
    return transcripts_dir / f"{audio_path.stem}.json"


def run_transcription_stage(
    *,
    extraction_records: list[dict[str, Any]],
    paths: dict[str, Path],
    config: dict[str, Any],
    run_id: str,
) -> dict[str, Any]:
    stage_start = time.perf_counter()
    run_log = paths["logs_dir"] / f"transcription-{run_id}.jsonl"

    records: list[dict[str, Any]] = []
    failures: list[dict[str, Any]] = []

    for item in extraction_records:
        if item.get("status") not in {"ok", "reused"}:
            continue

        audio_path = Path(item["audio_path"])  # absolute path from extraction stage
        transcript_path = transcript_output_path(audio_path, paths["transcripts_dir"])
        record: dict[str, Any] = {
            "run_id": run_id,
            "timestamp": now_iso(),
            "stage": "transcribe",
            "audio_path": str(audio_path),
            "transcript_path": str(transcript_path),
            "status": "pending",
        }

        if transcript_path.exists() and not config.get("force_retranscribe", False):
            record.update({
                "status": "reused",
                "resume_marker": True,
            })
            append_jsonl(run_log, record)
            records.append(record)
            continue

        try:
            segments, info_payload = transcribe_audio_with_faster_whisper(audio_path, config)
            diar_segments, diar_meta = best_effort_diarization(audio_path=audio_path, config=config)

            min_overlap = float(config.get("diarization", {}).get("min_overlap_seconds", 0.2))
            normalized_segments: list[dict[str, Any]] = []
            for seg in segments:
                speaker = pick_segment_speaker(
                    segment_start=seg["start"],
                    segment_end=seg["end"],
                    diar_segments=diar_segments,
                    min_overlap_seconds=min_overlap,
                )
                normalized_segments.append({
                    "id": seg["id"],
                    "start": seg["start"],
                    "end": seg["end"],
                    "speaker": speaker,
                    "text": seg["text"],
                    "words": seg["words"],
                })

            source_media = item.get("input_media")
            transcript_payload = {
                "schema_version": "1.0",
                "run_id": run_id,
                "created_at": now_iso(),
                "source": {
                    "media_path": source_media,
                    "audio_path": str(audio_path),
                    "audio_duration_seconds": ffprobe_duration_seconds(audio_path),
                },
                "transcription": {
                    "engine": "faster-whisper",
                    "model_name": config["transcription"].get("model_name"),
                    "device": config["transcription"].get("device"),
                    "compute_type": config["transcription"].get("compute_type"),
                    "language": info_payload.get("language"),
                    "language_probability": info_payload.get("language_probability"),
                    "duration_seconds": info_payload.get("duration"),
                    "duration_after_vad_seconds": info_payload.get("duration_after_vad"),
                },
                "diarization": diar_meta,
                "segments": normalized_segments,
            }

            transcript_path.parent.mkdir(parents=True, exist_ok=True)
            transcript_path.write_text(json.dumps(transcript_payload, indent=2, ensure_ascii=True), encoding="utf-8")

            record.update({
                "status": "ok",
                "segment_count": len(normalized_segments),
                "word_timestamp_count": sum(len(seg["words"]) for seg in normalized_segments),
                "resume_marker": False,
                "diarization_fallback_reason": diar_meta.get("fallback_reason"),
            })
            append_jsonl(run_log, record)
            records.append(record)
        except Exception as exc:
            record.update({
                "status": "failed",
                "error": str(exc),
                "traceback": traceback.format_exc(),
            })
            append_jsonl(run_log, record)
            failures.append(record)
            records.append(record)

    duration = time.perf_counter() - stage_start
    return {
        "stage": "transcribe",
        "duration_seconds": round(duration, 3),
        "records": records,
        "failures": failures,
        "log_path": str(run_log),
    }



def read_style_reference_excerpt(paths: dict[str, Path], config: dict[str, Any], limit_chars: int = 1800) -> str:
    rel_path = config.get("segmentation", {}).get("style_reference", "00-dev-log/2026-02-09.md")
    style_path = paths["project_root"] / rel_path
    if not style_path.exists():
        return ""
    raw = style_path.read_text(encoding="utf-8")
    return raw[:limit_chars]


def chunk_transcript_for_segmentation(
    transcript_payload: dict[str, Any],
    *,
    window_seconds: float,
    window_overlap_seconds: float,
) -> list[dict[str, Any]]:
    segments = transcript_payload.get("segments", [])
    if not segments:
        return []

    sorted_segments = sorted(segments, key=lambda item: float(item.get("start", 0.0)))
    chunks: list[dict[str, Any]] = []
    cursor = float(sorted_segments[0].get("start", 0.0))
    final_end = float(sorted_segments[-1].get("end", cursor))

    while cursor <= final_end + 1e-6:
        window_end = cursor + max(30.0, window_seconds)
        in_window = [
            seg
            for seg in sorted_segments
            if float(seg.get("end", 0.0)) >= cursor and float(seg.get("start", 0.0)) <= window_end
        ]
        if in_window:
            chunks.append({
                "window_start": cursor,
                "window_end": window_end,
                "segments": in_window,
            })
        step = max(20.0, window_seconds - window_overlap_seconds)
        cursor += step
        if cursor > final_end and chunks:
            break

    return chunks


def build_segmentation_schema() -> dict[str, Any]:
    return {
        "name": "chat_segments",
        "strict": True,
        "schema": {
            "type": "object",
            "properties": {
                "segments": {
                    "type": "array",
                    "items": {
                        "type": "object",
                        "properties": {
                            "id": {"type": "string"},
                            "start_time": {"type": "number"},
                            "end_time": {"type": "number"},
                            "speaker": {"type": "string"},
                            "summary": {"type": "string"},
                        },
                        "required": ["id", "start_time", "end_time", "speaker", "summary"],
                        "additionalProperties": False,
                    },
                },
            },
            "required": ["segments"],
            "additionalProperties": False,
        },
    }




def _http_json_request(
    *,
    method: str,
    url: str,
    api_key: str,
    payload: dict[str, Any] | None = None,
    timeout_seconds: float = 10.0,
) -> dict[str, Any]:
    import urllib.error
    import urllib.request

    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {api_key}",
    }
    data = None
    if payload is not None:
        data = json.dumps(payload).encode("utf-8")

    req = urllib.request.Request(url=url, data=data, headers=headers, method=method)
    try:
        with urllib.request.urlopen(req, timeout=timeout_seconds) as response:
            raw = response.read().decode("utf-8")
            return json.loads(raw)
    except urllib.error.HTTPError as exc:
        body = exc.read().decode("utf-8", errors="replace")
        raise RuntimeError(f"http_error_{exc.code}: {body}") from exc


def local_model_smoke_check(config: dict[str, Any]) -> dict[str, Any]:
    seg_cfg = config.get("segmentation", {})
    timeout_seconds = float(seg_cfg.get("smoke_check_timeout_seconds", 5))
    base_url = str(seg_cfg.get("base_url", "http://localhost:1234/v1")).rstrip("/")
    api_key = str(seg_cfg.get("api_key", "lm-studio"))

    try:
        payload = _http_json_request(
            method="GET",
            url=f"{base_url}/models",
            api_key=api_key,
            timeout_seconds=timeout_seconds,
        )
    except Exception as exc:
        return {
            "ok": False,
            "reason": f"local_model_unreachable: {exc}",
        }

    available = [item.get("id") for item in payload.get("data", []) if item.get("id")]
    requested = seg_cfg.get("model")
    if requested and requested not in available:
        return {
            "ok": False,
            "reason": f"requested_model_not_available: {requested}",
            "available_models": available,
        }
    return {"ok": True, "available_models": available}


def llm_segment_chunk(
    *,
    chunk: dict[str, Any],
    style_excerpt: str,
    config: dict[str, Any],
) -> list[dict[str, Any]]:
    seg_cfg = config.get("segmentation", {})
    transcript_lines: list[str] = []
    for seg in chunk["segments"]:
        transcript_lines.append(
            f"[{float(seg.get('start', 0.0)):.2f}-{float(seg.get('end', 0.0)):.2f}] {seg.get('speaker', 'UNKNOWN')}: {str(seg.get('text', '')).strip()}"
        )

    max_words = int(seg_cfg.get("max_summary_words", 80))
    prompt = (
        "You segment stream transcript text into chat-style blocks. "
        "Return JSON only. Keep summaries faithful to transcript content and avoid making up details. "
        "Transcribe exactly what is visible in transcript content and do not continue a repeating pattern. "
        "Keep chronology valid and include slight overlap between adjacent segments.\n\n"
        f"Window: {chunk['window_start']:.2f}s to {chunk['window_end']:.2f}s\n"
        "Use hybrid boundaries: semantic or topic shifts, but stay within this time window.\n"
        f"Required fields: id, start_time, end_time, speaker, summary\n"
        f"Summary length: <= {max_words} words per segment.\n\n"
        "Style reference excerpt (2026-02-09):\n"
        f"{style_excerpt or '(unavailable)'}\n\n"
        "Transcript lines:\n"
        + "\n".join(transcript_lines)
    )

    body = {
        "model": seg_cfg.get("model"),
        "temperature": float(seg_cfg.get("temperature", 0.2)),
        "messages": [
            {"role": "system", "content": "You are a precise transcript segmenter."},
            {"role": "user", "content": prompt},
        ],
        "response_format": {
            "type": "json_schema",
            "json_schema": build_segmentation_schema(),
        },
    }

    base_url = str(seg_cfg.get("base_url", "http://localhost:1234/v1")).rstrip("/")
    response = _http_json_request(
        method="POST",
        url=f"{base_url}/chat/completions",
        api_key=str(seg_cfg.get("api_key", "lm-studio")),
        payload=body,
        timeout_seconds=120.0,
    )
    content = response["choices"][0]["message"]["content"]
    payload = json.loads(content)
    return payload.get("segments", [])

def normalize_and_validate_segments(
    raw_segments: list[dict[str, Any]],
    *,
    minimum_overlap_seconds: float,
) -> list[dict[str, Any]]:
    normalized: list[dict[str, Any]] = []
    for idx, item in enumerate(raw_segments, start=1):
        if not {"id", "start_time", "end_time", "speaker", "summary"}.issubset(set(item.keys())):
            raise ValueError("segment_missing_required_fields")

        start_time = float(item["start_time"])
        end_time = float(item["end_time"])
        if end_time <= start_time:
            end_time = start_time + 0.5

        normalized.append({
            "id": f"seg-{idx:04d}",
            "start_time": round(start_time, 3),
            "end_time": round(end_time, 3),
            "speaker": str(item["speaker"]).strip() or "UNKNOWN",
            "summary": str(item["summary"]).strip(),
        })

    normalized.sort(key=lambda item: (item["start_time"], item["end_time"]))

    for idx in range(1, len(normalized)):
        previous = normalized[idx - 1]
        current = normalized[idx]
        if current["start_time"] < previous["start_time"]:
            raise ValueError("segments_not_chronological")

        target_start_max = previous["end_time"] - minimum_overlap_seconds
        if current["start_time"] > target_start_max:
            current["start_time"] = round(max(0.0, target_start_max), 3)

        if current["end_time"] <= current["start_time"]:
            current["end_time"] = round(current["start_time"] + 0.5, 3)

    for idx, item in enumerate(normalized, start=1):
        item["id"] = f"seg-{idx:04d}"

    return normalized


def run_segmentation_stage(
    *,
    transcription_records: list[dict[str, Any]],
    paths: dict[str, Path],
    config: dict[str, Any],
    run_id: str,
) -> dict[str, Any]:
    stage_start = time.perf_counter()
    run_log = paths["logs_dir"] / f"segmentation-{run_id}.jsonl"

    records: list[dict[str, Any]] = []
    failures: list[dict[str, Any]] = []
    style_excerpt = read_style_reference_excerpt(paths, config)

    smoke = local_model_smoke_check(config)
    if not smoke.get("ok"):
        return {
            "stage": "segment",
            "duration_seconds": round(time.perf_counter() - stage_start, 3),
            "records": [],
            "failures": [{
                "run_id": run_id,
                "stage": "segment",
                "status": "failed",
                "error": smoke.get("reason"),
                "available_models": smoke.get("available_models", []),
            }],
            "log_path": str(run_log),
            "smoke_check": smoke,
        }

    seg_cfg = config.get("segmentation", {})

    for item in transcription_records:
        if item.get("status") not in {"ok", "reused"}:
            continue

        transcript_path = Path(item["transcript_path"])
        record: dict[str, Any] = {
            "run_id": run_id,
            "timestamp": now_iso(),
            "stage": "segment",
            "transcript_path": str(transcript_path),
            "status": "pending",
        }

        try:
            transcript_payload = json.loads(transcript_path.read_text(encoding="utf-8"))
            chunks = chunk_transcript_for_segmentation(
                transcript_payload,
                window_seconds=float(seg_cfg.get("window_seconds", 240.0)),
                window_overlap_seconds=float(seg_cfg.get("window_overlap_seconds", 20.0)),
            )

            raw_segments: list[dict[str, Any]] = []
            if not chunks:
                normalized: list[dict[str, Any]] = []
            else:
                for chunk in chunks:
                    raw_segments.extend(
                        llm_segment_chunk(
                            chunk=chunk,
                            style_excerpt=style_excerpt,
                            config=config,
                        )
                    )

                normalized = normalize_and_validate_segments(
                    raw_segments,
                    minimum_overlap_seconds=float(seg_cfg.get("minimum_overlap_seconds", 1.5)),
                )

            record.update({
                "status": "ok",
                "segment_count": len(normalized),
                "segments": normalized,
            })
            append_jsonl(run_log, record)
            records.append(record)
        except Exception as exc:
            record.update({
                "status": "failed",
                "error": str(exc),
                "traceback": traceback.format_exc(),
            })
            append_jsonl(run_log, record)
            failures.append(record)
            records.append(record)

    duration = time.perf_counter() - stage_start
    return {
        "stage": "segment",
        "duration_seconds": round(duration, 3),
        "records": records,
        "failures": failures,
        "log_path": str(run_log),
        "smoke_check": smoke,
    }


def write_run_metadata(
    *,
    run_id: str,
    paths: dict[str, Path],
    config: dict[str, Any],
    extraction: dict[str, Any],
    transcription: dict[str, Any],
) -> Path:
    records_ex = extraction["records"]
    records_tx = transcription["records"]

    run_payload = {
        "run_id": run_id,
        "created_at": now_iso(),
        "paths": {k: str(v) for k, v in paths.items()},
        "config_snapshot": config,
        "inputs": {
            "total_discovered": len({r.get("input_media") for r in records_ex}),
            "processed_ok_or_reused": sum(1 for r in records_ex if r.get("status") in {"ok", "reused"}),
            "failed": sum(1 for r in records_ex if r.get("status") == "failed"),
        },
        "stages": {
            "extract": {
                "duration_seconds": extraction["duration_seconds"],
                "log_path": extraction["log_path"],
                "failure_log_path": extraction["failure_log_path"],
                "reused_count": sum(1 for r in records_ex if r.get("status") == "reused"),
                "new_count": sum(1 for r in records_ex if r.get("status") == "ok"),
                "failed_count": len(extraction["failures"]),
            },
            "transcribe": {
                "duration_seconds": transcription["duration_seconds"],
                "log_path": transcription["log_path"],
                "reused_count": sum(1 for r in records_tx if r.get("status") == "reused"),
                "new_count": sum(1 for r in records_tx if r.get("status") == "ok"),
                "failed_count": len(transcription["failures"]),
                "fallback_reasons": [
                    r.get("diarization_fallback_reason")
                    for r in records_tx
                    if r.get("diarization_fallback_reason")
                ],
            },
        },
        "artifacts": {
            "audio": [r.get("audio_path") for r in records_ex if r.get("audio_path")],
            "transcripts": [r.get("transcript_path") for r in records_tx if r.get("transcript_path")],
        },
        "resume_markers": {
            "audio_reused": sum(1 for r in records_ex if r.get("status") == "reused"),
            "transcripts_reused": sum(1 for r in records_tx if r.get("status") == "reused"),
        },
        "failures": {
            "extract": extraction["failures"],
            "transcribe": transcription["failures"],
        },
    }

    output_path = paths["runs_dir"] / f"run-{run_id}.json"
    output_path.write_text(json.dumps(run_payload, indent=2, ensure_ascii=True), encoding="utf-8")
    return output_path


def run_pipeline(config: dict[str, Any]) -> dict[str, Any]:
    run_id = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + "-" + uuid4().hex[:8]
    paths = resolve_project_paths()
    inputs = discover_inputs(config, paths)

    extraction = run_extraction_stage(inputs=inputs, paths=paths, config=config, run_id=run_id)
    transcription = run_transcription_stage(
        extraction_records=extraction["records"],
        paths=paths,
        config=config,
        run_id=run_id,
    )

    run_meta_path = write_run_metadata(
        run_id=run_id,
        paths=paths,
        config=config,
        extraction=extraction,
        transcription=transcription,
    )

    return {
        "run_id": run_id,
        "input_count": len(inputs),
        "extract_failures": len(extraction["failures"]),
        "transcribe_failures": len(transcription["failures"]),
        "run_metadata": str(run_meta_path),
        "audio_dir": str(paths["audio_dir"]),
        "transcripts_dir": str(paths["transcripts_dir"]),
    }




In [None]:
paths = resolve_project_paths()
{
    key: as_project_relative(value, paths["project_root"])
    for key, value in paths.items()
}


## Run the pipeline

- Set `CONFIG["input_mode"]` to `"single"` or `"batch"`.
- Place `HUGGINGFACE_TOKEN` in `00-supporting-files/data/.env` for diarization-enabled runs.
- Media paths under `large-files/...` resolve from one level above repo root when present.
- For batch mode, set `CONFIG["batch_inputs"]` and/or `CONFIG["batch_glob"]`.
- Use `force_reextract` / `force_retranscribe` to override resumable behavior.


In [None]:
# Example execution:
# result = run_pipeline(CONFIG)
# result
