### Natural language media search
### Goals & scope
- Given one or many video/audio files and a natural-language query, return the most relevant segments with: start/end timestamps, transcript snippet, thumbnail(s), and a relevance score.
- **MVP scope** : Single-machine prototype that can index and search a handful of videos (hours of content).
- **Core capabilities** to learn: audio transcription and alignment, audio/visual/text embeddings, vector indexing (FAISS), multimodal retrieval & reranking.

In [None]:
# Required packages

In [1]:
# media_preprocessor.py
import ffmpeg
from pathlib import Path
from typing import List, Dict, Tuple, Optional
import math
import uuid
import shutil
import os
import json
import sys

# Optional VAD dependency
try:
    import webrtcvad
    import wave
    _VAD_AVAILABLE = True
except Exception:
    _VAD_AVAILABLE = False

class MediaPreprocessor:
    def __init__(self,
                 keyframe_interval: int = 5,
                 audio_segment_length: int = 30,
                 target_sample_rate: int = 16000,
                 frame_size: Tuple[int, int] = (224, 224),
                 temp_dir: str = "./media_processed",
                 target_lufs: float = -16.0,
                 use_vad: bool = False,
                 scene_detect: bool = False):
        """
        keyframe_interval: seconds between extracted keyframes (if scene_detect=False)
        audio_segment_length: seconds per chunk (used only if use_vad=False)
        target_sample_rate: e.g., 16000 for Whisper
        frame_size: (width, height) for resizing keyframes (224,224)
        temp_dir: root for all outputs (a per-file UUID subdir will be created)
        target_lufs: target loudness in LUFS for loudnorm (default -16)
        use_vad: if True, attempt VAD-based segmentation (requires webrtcvad)
        scene_detect: if True, use ffmpeg scene-change selection instead of fixed interval
        """
        self.keyframe_interval = int(keyframe_interval)
        self.audio_segment_length = int(audio_segment_length)
        self.target_sample_rate = int(target_sample_rate)
        self.frame_size = frame_size
        self.root_temp_dir = Path(temp_dir)
        self.root_temp_dir.mkdir(parents=True, exist_ok=True)
        self.target_lufs = float(target_lufs)
        self.use_vad = bool(use_vad) and _VAD_AVAILABLE
        self.scene_detect = bool(scene_detect)

        print("✅ MediaPreprocessor initialized:")
        print(f"  - Keyframe interval: {self.keyframe_interval}s")
        print(f"  - Audio segment length: {self.audio_segment_length}s")
        print(f"  - Target sample rate: {self.target_sample_rate}Hz")
        print(f"  - Frame size: {self.frame_size}")
        print(f"  - Root temp directory: {self.root_temp_dir.resolve()}")
        print(f"  - Target LUFS: {self.target_lufs}")
        print(f"  - VAD enabled: {self.use_vad} (webrtcvad available: {_VAD_AVAILABLE})")
        print(f"  - Scene-detect keyframes: {self.scene_detect}")

    # --------------------
    def _make_run_dir(self, file_path: Path) -> Path:
        """
        Create a unique directory for processing this file to avoid collisions.
        """
        uid = uuid.uuid4().hex[:8]
        out_dir = self.root_temp_dir / f"{file_path.stem}_{uid}"
        out_dir.mkdir(parents=True, exist_ok=True)
        return out_dir

    # --------------------
    def _get_duration(self, file_path: Path) -> float:
        probe = ffmpeg.probe(str(file_path))
        return float(probe['format']['duration'])

    # --------------------
    def _extract_audio(self, file_path: Path, out_dir: Path, normalize: bool = True) -> Path:
        """
        Extract audio as 16kHz mono PCM WAV (pcm_s16le).
        Uses loudnorm single-pass if normalize=True. For better accuracy you could
        run 2-pass loudnorm (analyze then apply measured params).
        """
        out_path = out_dir / f"{file_path.stem}_audio.wav"

        stream = ffmpeg.input(str(file_path))

        if normalize:
            # Simple single-pass loudnorm. For best fidelity: run two-pass approach (analyze, then apply).
            ffmpeg_stream = stream.filter('loudnorm', I=self.target_lufs, TP=-1.5, LRA=7)
        else:
            ffmpeg_stream = stream

        try:
            (
                ffmpeg_stream
                .output(str(out_path),
                        format='wav',
                        acodec='pcm_s16le',
                        ac=1,  # mono
                        ar=self.target_sample_rate)
                .overwrite_output()
                .run(quiet=True)
            )
        except ffmpeg.Error as e:
            # include stderr for debugging
            raise RuntimeError(f"ffmpeg failed extracting audio: {e.stderr.decode() if e.stderr else e}") from e

        return out_path

    # --------------------
    def _segment_audio_fixed(self, audio_path: Path, out_dir: Path) -> List[Dict]:
        """
        Fixed-length segmentation using ffmpeg segment (deterministic).
        Returns list of {path, start_sec, end_sec}
        """
        duration = self._get_duration(audio_path)
        segments = []
        num_chunks = math.ceil(duration / self.audio_segment_length)

        for i in range(num_chunks):
            start = i * self.audio_segment_length
            seg_len = min(self.audio_segment_length, max(0.0, duration - start))
            start_str = f"{start:.3f}"
            seg_name = f"{audio_path.stem}_chunk_{i:04d}_{int(start)}s.wav"
            chunk_path = out_dir / seg_name

            try:
                (
                    ffmpeg
                    .input(str(audio_path), ss=start, t=seg_len)
                    .output(str(chunk_path),
                            format='wav',
                            acodec='pcm_s16le',
                            ac=1,
                            ar=self.target_sample_rate)
                    .overwrite_output()
                    .run(quiet=True)
                )
            except ffmpeg.Error as e:
                raise RuntimeError(f"ffmpeg failed segmenting audio: {e.stderr.decode() if e.stderr else e}") from e

            segments.append({
                "path": str(chunk_path),
                "start_sec": float(start),
                "end_sec": float(start + seg_len)
            })

        return segments

    # --------------------
    def _segment_audio_vad(self, audio_path: Path, out_dir: Path, aggressiveness: int = 2) -> List[Dict]:
        """
        Very simple VAD-based segmentation using webrtcvad.
        Produces speech-only chunks by grouping contiguous voiced frames.
        Requirements: webrtcvad installed.
        Note: This is a basic implementation and may need tuning for production.
        """
        if not _VAD_AVAILABLE:
            print("webrtcvad not available; falling back to fixed segmentation.")
            return self._segment_audio_fixed(audio_path, out_dir)

        # read WAV
        with wave.open(str(audio_path), 'rb') as wf:
            sample_rate = wf.getframerate()
            assert wf.getnchannels() == 1, "VAD expects mono WAV"
            width = wf.getsampwidth()
            pcm = wf.readframes(wf.getnframes())

        vad = webrtcvad.Vad(aggressiveness)

        # frame size in ms. webrtcvad supports 10,20,30
        frame_ms = 30
        bytes_per_frame = int(sample_rate * (frame_ms / 1000.0) * width)
        frames = [pcm[i:i+bytes_per_frame] for i in range(0, len(pcm), bytes_per_frame)]

        voiced_flags = [False] * len(frames)
        for i, f in enumerate(frames):
            if len(f) < bytes_per_frame:
                # pad last frame
                f = f.ljust(bytes_per_frame, b'\0')
            try:
                voiced_flags[i] = vad.is_speech(f, sample_rate)
            except Exception:
                voiced_flags[i] = False

        # group contiguous voiced frames
        segments = []
        i = 0
        while i < len(voiced_flags):
            if voiced_flags[i]:
                start_frame = i
                while i < len(voiced_flags) and voiced_flags[i]:
                    i += 1
                end_frame = i - 1
                start_time = start_frame * (frame_ms / 1000.0)
                end_time = (end_frame + 1) * (frame_ms / 1000.0)
                seg_name = f"{audio_path.stem}_vad_{int(start_time)}s_{int(end_time)}s.wav"
                chunk_path = out_dir / seg_name
                seg_len = end_time - start_time
                # extract segment with ffmpeg for robust encoding
                try:
                    (
                        ffmpeg
                        .input(str(audio_path), ss=start_time, t=seg_len)
                        .output(str(chunk_path), format='wav', acodec='pcm_s16le', ac=1, ar=self.target_sample_rate)
                        .overwrite_output()
                        .run(quiet=True)
                    )
                except ffmpeg.Error as e:
                    raise RuntimeError(f"ffmpeg failed extracting VAD segment: {e.stderr.decode() if e.stderr else e}") from e

                segments.append({"path": str(chunk_path),
                                 "start_sec": float(start_time),
                                 "end_sec": float(end_time)})
            else:
                i += 1

        # if no voiced segments found, fallback to fixed
        if len(segments) == 0:
            return self._segment_audio_fixed(audio_path, out_dir)
        return segments

    # --------------------
    def _extract_keyframes(self, file_path: Path, out_dir: Path) -> List[Dict]:
        """
        Extract keyframes and produce list of {"path": str, "timestamp": float}.
        If scene_detect=True, extract frames only at scene changes; otherwise extract
        at fixed intervals (every keyframe_interval seconds).
        """
        frames = []
        if self.scene_detect:
            # Use scene-change detection
            out_pattern = str(out_dir / f"{file_path.stem}_frame_scene_%04d.jpg")
            try:
                (
                    ffmpeg
                    .input(str(file_path))
                    .filter("select", "gt(scene,0.4)")
                    .filter("scale", self.frame_size[0], self.frame_size[1])
                    .output(out_pattern, vsync="vfr", format='image2')
                    .overwrite_output()
                    .run(quiet=True)
                )
            except ffmpeg.Error as e:
                raise RuntimeError(f"ffmpeg scene-detect failed: {e.stderr.decode() if e.stderr else e}") from e

            # No exact timestamps provided; we will estimate by probing each frame file via ffmpeg.probe (slow),
            # or compute using ffmpeg select expression to include pts — for simplicity, estimate by reading
            # the creation order and mapping approximate times using frame count * keyframe_interval as fallback.
            frame_files = sorted(out_dir.glob(f"{file_path.stem}_frame_scene_*.jpg"))
            # try to read pts via ffprobe per frame — expensive; fallback to index*interval
            for idx, p in enumerate(frame_files):
                frames.append({"path": str(p), "timestamp": float(idx * self.keyframe_interval)})
            return frames

        # Fixed interval
        out_pattern = str(out_dir / f"{file_path.stem}_frame_%06d.jpg")
        try:
            # fps=1/N -> extract one frame every N seconds; ffmpeg expects fraction or float
            fps_value = 1.0 / max(1, self.keyframe_interval)
            (
                ffmpeg
                .input(str(file_path))
                .filter("fps", fps=fps_value)
                .filter("scale", self.frame_size[0], self.frame_size[1])
                .output(out_pattern, vsync="vfr", format='image2')
                .overwrite_output()
                .run(quiet=True)
            )
        except ffmpeg.Error as e:
            raise RuntimeError(f"ffmpeg fixed-interval keyframe extraction failed: {e.stderr.decode() if e.stderr else e}") from e

        frame_files = sorted(out_dir.glob(f"{file_path.stem}_frame_*.jpg"))
        # map each frame to a timestamp: frame_index * keyframe_interval (approximate)
        for idx, p in enumerate(frame_files):
            timestamp = float(idx * self.keyframe_interval)
            frames.append({"path": str(p), "timestamp": timestamp})

        return frames

    # --------------------
    def _process_video_file(self, file_path: Path) -> Dict:
        run_dir = self._make_run_dir(file_path)
        print(f"Processing video -> working directory: {run_dir}")

        audio_path = self._extract_audio(file_path, out_dir=run_dir, normalize=True)
        if self.use_vad:
            audio_segments = self._segment_audio_vad(audio_path, out_dir=run_dir)
        else:
            audio_segments = self._segment_audio_fixed(audio_path, out_dir=run_dir)

        keyframes = self._extract_keyframes(file_path, out_dir=run_dir)
        duration = self._get_duration(file_path)

        return {
            "file_type": "video",
            "original_file": str(file_path.resolve()),
            "working_dir": str(run_dir.resolve()),
            "audio_segments": audio_segments,
            "video_keyframes": keyframes,
            "metadata": {
                "duration": duration,
                "frame_size": self.frame_size,
                "sample_rate": self.target_sample_rate
            }
        }

    # --------------------
    def _process_audio_file(self, file_path: Path) -> Dict:
        run_dir = self._make_run_dir(file_path)
        print(f"Processing audio -> working directory: {run_dir}")

        audio_path = self._extract_audio(file_path, out_dir=run_dir, normalize=True)
        if self.use_vad:
            audio_segments = self._segment_audio_vad(audio_path, out_dir=run_dir)
        else:
            audio_segments = self._segment_audio_fixed(audio_path, out_dir=run_dir)

        duration = self._get_duration(audio_path)
        return {
            "file_type": "audio",
            "original_file": str(file_path.resolve()),
            "working_dir": str(run_dir.resolve()),
            "audio_segments": audio_segments,
            "metadata": {
                "duration": duration,
                "sample_rate": self.target_sample_rate
            }
        }

    # --------------------
    def process_media_file(self, file_path: str) -> Dict:
        file_path = Path(file_path)
        if not file_path.exists():
            raise FileNotFoundError(f"File not found: {file_path}")

        video_exts = {'.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm'}
        audio_exts = {'.wav', '.mp3', '.flac', '.aac', '.ogg', '.m4a'}

        ext = file_path.suffix.lower()
        if ext in video_exts:
            return self._process_video_file(file_path)
        elif ext in audio_exts:
            return self._process_audio_file(file_path)
        else:
            raise ValueError(f"Unsupported file format: {ext}")

    # --------------------
    def cleanup_run_dir(self, run_dir: str):
        """
        Remove a previous working directory if needed.
        """
        p = Path(run_dir)
        if p.exists() and p.is_dir():
            shutil.rmtree(p)
            print(f"Removed working dir: {p}")

if __name__ == "__main__":
    # Quick CLI test (runs when the file is executed directly)
    if len(sys.argv) < 2:
        print("Usage: python media_preprocessor.py <path-to-media-file>")
        sys.exit(1)

    mp = MediaPreprocessor(use_vad=False, scene_detect=False)
    result = mp.process_media_file(sys.argv[1])
    print(json.dumps(result, indent=2))


In [None]:
mymediaprocessor = MediaPreprocessor()

