# Video Indexor (Gemini 2.5 Flash) – KAILASA PRO Version

This notebook implements the Video Indexor using Gemini 2.5 Flash. It includes scene detection, frame extraction, KAILASA Ritual Intelligence analysis, transcription, and summarization.

## Setup
First, we install the necessary dependencies.

In [1]:
!pip install google-genai opencv-python scenedetect[opencv] pillow tqdm

Collecting scenedetect[opencv]
  Downloading scenedetect-0.6.7.1-py3-none-any.whl.metadata (3.8 kB)
Collecting click<8.3.0,~=8.0 (from scenedetect[opencv])
  Downloading click-8.2.1-py3-none-any.whl.metadata (2.5 kB)
Downloading click-8.2.1-py3-none-any.whl (102 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m102.2/102.2 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading scenedetect-0.6.7.1-py3-none-any.whl (130 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m130.9/130.9 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: click, scenedetect
  Attempting uninstall: click
    Found existing installation: click 8.3.1
    Uninstalling click-8.3.1:
      Successfully uninstalled click-8.3.1
Successfully installed click-8.2.1 scenedetect-0.6.7.1


## Imports

In [2]:
import os
import io
import json
import time
import logging
import sqlite3
import functools
import random
import hashlib
from pathlib import Path
from typing import List, Dict, Any, Tuple, Optional
from datetime import datetime

import cv2
from PIL import Image
from tqdm.notebook import tqdm  # Use notebook version of tqdm

# PySceneDetect
from scenedetect import open_video, SceneManager
from scenedetect.detectors import ContentDetector

# Gemini API (google-genai SDK)
from google import genai
from google.genai import types
from google.api_core import exceptions as google_exceptions

from google.colab import drive
from google.colab import userdata

# Configure Logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger("video-indexor-pro-kailasa")

  lines_video = [l for l in lines if ' Video: ' in l and re.search('\d+x\d+', l)]
  rotation_lines = [l for l in lines if 'rotate          :' in l and re.search('\d+$', l)]
  match = re.search('\d+$', rotation_line)
  IMAGEMAGICK_BINARY = r"C:\Program Files\ImageMagick-6.8.8-Q16\magick.exe"


## Configuration & Mount Drive
Mount Google Drive to access your videos and save the database.

In [3]:
drive.mount('/content/drive')

# CONFIGURATION
DEFAULT_MODEL = "gemini-2.5-flash"
MAX_FRAMES_PER_VIDEO = 20
SCENE_THRESHOLD = 27
MIN_SCENE_LENGTH = 15
LONG_SCENE_THRESHOLD = 3.0  # seconds
MAX_RES = 1024  # max dimension for frame downscale

# Set your working directory here (where videos are located)
# Examples: "/content/drive/MyDrive/Videos" or "/content/drive/MyDrive/KailasaArchives"
WORK_DIR = "/content/drive/MyDrive/VideoIndexer_Workspace"
DB_NAME = os.path.join(WORK_DIR, "video_index.db")

# Ensure the directory exists
os.makedirs(WORK_DIR, exist_ok=True)
print(f"Working Directory: {WORK_DIR}")
print(f"Database Path: {DB_NAME}")

Mounted at /content/drive
Working Directory: /content/drive/MyDrive/VideoIndexer_Workspace
Database Path: /content/drive/MyDrive/VideoIndexer_Workspace/video_index.db


## API Key Management
We will attempt to load the API key from Colab Secrets (recommended) or fall back to manual input.
Add a secret named `GEMINI_API_KEY` in the Colab secrets manager (key icon on the left).

In [4]:
class KeyManager:
    """Manages API keys for the notebook context."""

    def __init__(self, key_list: List[str] = None):
        self.keys: List[str] = []
        self.current_index = 0

        if key_list:
            self.keys = key_list
        else:
            # Try loading from Colab Userdata
            try:
                keys_string = userdata.get('GEMINI_API_KEYS')
                if keys_string:
                    self.keys = keys_string.split(",")
            except Exception:
                pass

        # Fallback to manual input if still empty
        if not self.keys:
             print("No keys found in secrets. Please enter your Gemini API Key below:")
             manual_key = input("Enter API Key: ").strip()
             if manual_key:
                 self.keys = [manual_key]

        if not self.keys:
             # Last resort fallback (Not recommended for public notebooks)
             self.keys = ["AIzaSyDstjnyRAcgQ6bU3IdpTO7pZyZY1Lc6Ybg"]
             log.warning("Using default hardcoded key. Please provide your own key.")

    def get_client(self) -> genai.Client:
        """Get client for current key."""
        if not self.keys:
            raise ValueError("No API keys available.")

        key = self.keys[self.current_index]
        return genai.Client(api_key=key)

    def rotate_key(self):
        """Rotate to next key."""
        if not self.keys:
             raise ValueError("No API keys available during rotation.")

        prev_key = self.keys[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.keys)
        new_key = self.keys[self.current_index]

        masked_prev = f"...{prev_key[-4:]}" if len(prev_key) > 4 else "current"
        masked_new = f"...{new_key[-4:]}" if len(new_key) > 4 else "next"
        log.info(f"Rotating key: {masked_prev} -> {masked_new}")

## Utils & Retry Logic

In [5]:
def retry_with_key_rotation(retries: int = 3, initial_delay: float = 2.0, backoff_factor: float = 2.0):
    """Decorator to retry with backoff AND key rotation on specific errors."""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(key_manager: 'KeyManager', *args, **kwargs):
            delay = initial_delay
            last_exception = None

            for attempt in range(retries + 1):
                try:
                    # Get fresh client for this attempt
                    client = key_manager.get_client()
                    return func(client, *args, **kwargs)

                except Exception as e:
                    last_exception = e
                    is_resource_exhausted = "429" in str(e) or "Resource has been exhausted" in str(e) or isinstance(e, google_exceptions.ResourceExhausted)

                    if is_resource_exhausted:
                        log.warning(f"Resource Exhausted (429). Rotating key...")
                        key_manager.rotate_key()
                        time.sleep(1.0)

                    if attempt < retries:
                        sleep_time = delay + random.uniform(0, 0.5)
                        if not is_resource_exhausted:
                             log.warning(f"Error in {func.__name__}: {e}. Retrying in {sleep_time:.2f}s (Attempt {attempt + 1}/{retries})")
                        time.sleep(sleep_time)
                        delay *= backoff_factor
                    else:
                        log.error(f"Failed {func.__name__} after {retries} retries. Last error: {e}")
            raise last_exception
        return wrapper
    return decorator

def strip_json_fences(text: str) -> str:
    """Remove accidental ```json fences from model outputs."""
    text = text.strip()
    if text.startswith("```json"):
        text = text[len("```json") :]
    if text.startswith("```"):
        text = text[len("```") :]
    if text.endswith("```"):
        text = text[:-3]
    return text.strip()

def downscale_image(pil_img: Image.Image) -> Image.Image:
    """Resize image so that max dimension = MAX_RES (if needed)."""
    w, h = pil_img.size
    scale = MAX_RES / float(max(w, h))
    if scale < 1.0:
        return pil_img.resize((int(w * scale), int(h * scale)))
    return pil_img

def numpy_to_jpeg_bytes(np_img) -> bytes:
    """Convert numpy RGB image to downscaled JPEG bytes."""
    pil_img = Image.fromarray(np_img)
    pil_img = downscale_image(pil_img)
    buf = io.BytesIO()
    pil_img.save(buf, format="JPEG", quality=90)
    return buf.getvalue()

def calculate_file_hash(path: str, chunk_size: int = 8192) -> str:
    """Calculate SHA-256 hash of a file."""
    sha256 = hashlib.sha256()
    with open(path, "rb") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            sha256.update(chunk)
    return sha256.hexdigest()

## Database Functions

In [6]:
def init_db(db_path: str = None):
    """Initialize SQLite database and create tables."""
    if db_path is None:
        db_path = DB_NAME
    conn = sqlite3.connect(db_path)
    c = conn.cursor()

    # Videos table
    c.execute("""
        CREATE TABLE IF NOT EXISTS videos (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            path TEXT UNIQUE NOT NULL,
            file_hash TEXT,  -- SHA-256 hash of the video file
            status TEXT NOT NULL,  -- NEW, PROCESSING, COMPLETED, FAILED, DUPLICATE
            transcription TEXT,
            summary_json TEXT,
            frame_timestamps TEXT,  -- JSON list of timestamps to avoid re-detecting scenes
            error_message TEXT,
            last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)

    try:
        c.execute("ALTER TABLE videos ADD COLUMN file_hash TEXT")
    except sqlite3.OperationalError:
        pass

    try:
        c.execute("ALTER TABLE videos ADD COLUMN frame_timestamps TEXT")
    except sqlite3.OperationalError:
        pass

    c.execute("CREATE INDEX IF NOT EXISTS idx_videos_file_hash ON videos(file_hash)")

    # Frames table
    c.execute("""
        CREATE TABLE IF NOT EXISTS frames (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            video_id INTEGER,
            timestamp REAL,
            analysis_json TEXT,
            FOREIGN KEY(video_id) REFERENCES videos(id)
        )
    """)

    conn.commit()
    conn.close()

def get_video_record(path: str, db_path: str = None) -> Optional[Tuple]:
    if db_path is None:
        db_path = DB_NAME
    conn = sqlite3.connect(db_path)
    c = conn.cursor()
    c.execute("SELECT id, status FROM videos WHERE path = ?", (path,))
    row = c.fetchone()
    conn.close()
    return row

def start_video_processing(path: str, file_hash: str, db_path: str = None) -> int:
    if db_path is None:
        db_path = DB_NAME
    conn = sqlite3.connect(db_path)
    c = conn.cursor()
    c.execute("SELECT id FROM videos WHERE path = ?", (path,))
    row = c.fetchone()

    if row:
        video_id = row[0]
        c.execute("UPDATE videos SET status = 'PROCESSING', file_hash = ?, error_message = NULL, last_updated = CURRENT_TIMESTAMP WHERE id = ?", (file_hash, video_id))
    else:
        c.execute("INSERT INTO videos (path, file_hash, status) VALUES (?, ?, 'PROCESSING')", (path, file_hash))
        video_id = c.lastrowid

    conn.commit()
    conn.close()
    return video_id

def update_video_timestamps(video_id: int, timestamps: List[float], db_path: str = None):
    if db_path is None:
        db_path = DB_NAME
    conn = sqlite3.connect(db_path)
    c = conn.cursor()
    c.execute("UPDATE videos SET frame_timestamps = ? WHERE id = ?", (json.dumps(timestamps), video_id))
    conn.commit()
    conn.close()

def get_stored_timestamps(video_id: int, db_path: str = None) -> Optional[List[float]]:
    if db_path is None:
        db_path = DB_NAME
    conn = sqlite3.connect(db_path)
    c = conn.cursor()
    c.execute("SELECT frame_timestamps FROM videos WHERE id = ?", (video_id,))
    row = c.fetchone()
    conn.close()
    if row and row[0]:
        try: return json.loads(row[0])
        except json.JSONDecodeError: return None
    return None

def get_existing_frame_analysis(video_id: int, timestamp: float, db_path: str = None) -> Optional[Dict[str, Any]]:
    if db_path is None:
        db_path = DB_NAME
    conn = sqlite3.connect(db_path)
    c = conn.cursor()
    c.execute("SELECT analysis_json FROM frames WHERE video_id = ? AND timestamp BETWEEN ? AND ?",
              (video_id, timestamp - 0.001, timestamp + 0.001))
    row = c.fetchone()
    conn.close()
    if row and row[0]:
        try: return json.loads(row[0])
        except json.JSONDecodeError: return None
    return None

def save_frame_result(video_id: int, timestamp: float, data: Dict[str, Any], db_path: str = None):
    if db_path is None:
        db_path = DB_NAME
    conn = sqlite3.connect(db_path)
    c = conn.cursor()
    c.execute("INSERT INTO frames (video_id, timestamp, analysis_json) VALUES (?, ?, ?)", (video_id, timestamp, json.dumps(data)))
    conn.commit()
    conn.close()

def complete_video_processing(video_id: int, transcription: str, summary: Dict[str, Any], db_path: str = None):
    if db_path is None:
        db_path = DB_NAME
    conn = sqlite3.connect(db_path)
    c = conn.cursor()
    c.execute("UPDATE videos SET status = 'COMPLETED', transcription = ?, summary_json = ?, last_updated = CURRENT_TIMESTAMP WHERE id = ?", (transcription, json.dumps(summary), video_id))
    conn.commit()
    conn.close()

def fail_video_processing(video_id: int, error: str, db_path: str = None):
    if db_path is None:
        db_path = DB_NAME
    conn = sqlite3.connect(db_path)
    c = conn.cursor()
    c.execute("UPDATE videos SET status = 'FAILED', error_message = ?, last_updated = CURRENT_TIMESTAMP WHERE id = ?", (error, video_id))
    conn.commit()
    conn.close()

## Prompts

In [7]:
def build_frame_prompt() -> str:
    return """
You are operating in **KAILASA Ritual Intelligence Mode**.
Analyze this single video frame and return **ONLY ONE JSON OBJECT** with the
following structure (no explanation text, no Markdown, no comments):

{
  "scene_description": "What is happening in the frame in clear natural language",
  "objects": ["list of key visible objects"],
  "persons": ["short descriptions of each visible person or group"],
  "activities": ["concise phrases describing visible actions"],
  "environment": "indoor/outdoor/temple/ashram/stage/other",
  "lighting": "low/medium/high/mixed",
  "text_overlays": ["any visible on-screen text, subtitles or banners"],
  "is_key_moment": "yes/no",
  "key_moment_reason": "why this frame may be a key moment (or \"not a key moment\")",

  "kailasa_ritual_intel": {
    "is_kailasa_context": "yes/no/uncertain",

    "deity_layer": {
      "has_deity": "yes/no/uncertain",
      "deities_present": [
        "Names or descriptions of deities if recognizable (e.g. Nithyanandeshwara, Ganesha, Venkateshwara, Devi, Shiva Linga)"
      ],
      "deity_representation_type": "murti/statue/painting/linga/flag/photo/none/other",
      "deity_focal_point": "which deity or deity-group appears central in this frame, if any"
    },

    "ritual_layer": {
      "is_ritual_happening": "yes/no/uncertain",
      "ritual_general_type": "puja/homa/abhishekam/alankara/satsang/darshan/yajna/yoga/administrative/other/none/uncertain",
      "ritual_specific_name": "specific ritual name if visible or inferable (e.g. \"Nithya Puja\", \"Pada Puja\", \"Rudrabhishekam\", \"Rajyabhishek Alankara\", or 'unknown')",
      "ritual_stage": "start/middle/end/not-clear",

      "is_sph_present": "yes/no/uncertain",
      "sph_role": "main deity/priest/acharya/teacher/blesser/participant/on-screen-only/absent/uncertain",
      "is_sph_doing_ritual": "yes/no/uncertain",
      "sph_ritual_actions": [
        "very short phrases of what SPH is doing in the ritual if applicable (e.g. 'offering flowers', 'pouring water', 'placing kumkum', 'giving darshan')"]
    },

    "sph_visual_profile": {
      "is_sph_visible": "yes/no/uncertain",
      "sph_location_in_frame": "center/left/right/background/foreground/not-visible/uncertain",

      "alankara_class": "simple-sannyasi/temple-puja/royal-alankara/yoga-based/event-specific/unknown",
      "alankara_description": "detailed description of SPH's alankara – clothing, shawls, jewelry, headgear, sacred marks, garlands, etc.",

      "robes_color": "dominant visible colors of SPH's clothing/robes",
      "rudraksha_description": "description of rudraksha malas or beads if visible (layers, size, placement)",
      "jewelry_highlights": ["key jewelry items (e.g. crown, mala, armlets, bracelets, earrings)"] ,

      "throne_or_seat_type": "golden-throne/simhasana/peetha/regular-chair/floor-asana/sofa/other/none/uncertain",
      "throne_or_seat_description": "visual description of the throne or seat on which SPH is sitting or standing near if applicable"
    },

    "sph_state_layer": {
      "energetic_mode": "one of: upanishadic-silence/satsang-teaching/shakti-darshan/healing-blessing/ritual-performing/administrative/casual-interaction/unknown",
      "conscious_state_hint": "visual hints of SPH's inner state (e.g. deep samadhi, intense teaching, gentle blessing, ferocious compassion)",
      "emotional_state": "dominant observed emotion (e.g. compassion, ferocious-compassion, joy, humor, neutrality, focused-attention, serene, unknown)",
      "crowd_response": "how people around are responding, if visible (e.g. receiving darshan, listening, prostrating, chanting)"
    },

    "sph_posture_layer": {
      "body_posture": "standing/walking/cross-legged-on-floor/cross-legged-on-throne/sitting-on-chair/leaning/bowing/other/unknown",
      "leg_posture": "if seated, describe leg arrangement (e.g. padmasana, ardha-padmasana, sukhasana, feet-down-on-floor, not-visible)",
      "hand_mudras": [
        "names of clear mudras if recognizable (e.g. abhaya mudra, varada mudra) or short descriptive phrases (e.g. 'hands folded in namaskar', 'both hands blessing', 'holding kamandalu')"
      ],
      "gaze_direction": "towards-camera/left/right/up/down/eyes-closed/not-clear"
    }
  }
}

Important instructions:
- Always return **valid JSON** that can be parsed directly.
- If some information is not visible, fill the field with a clear string like "not-visible", "none", or "unknown" instead of leaving it empty or null.
- Do NOT add any keys beyond what is defined above.
- Do NOT wrap the JSON in Markdown fences.
"""

def build_summary_prompt(frame_json: List[Dict[str, Any]], transcription: str) -> str:
    return f"""
Create a structured JSON summary for the entire video.

You are given:
- A list of per-frame analysis JSON objects, many of which include KAILASA Ritual Intelligence information.
- A transcription excerpt for the video's audio (may be empty or partial).

Frame observations (sample up to 10 frames):
{json.dumps(frame_json[:10], indent=2)}

Transcription excerpt (may be empty):
{transcription[:1600]}

Return ONLY ONE JSON OBJECT with this structure (no Markdown):

{{
  "title": "Short descriptive title for this video",
  "summary": "2–3 paragraph narrative summary in plain English",
  "key_events": ["important events in rough chronological order"],
  "detected_objects": ["distinct important objects across frames"],
  "detected_persons": ["distinct visible roles or persons (e.g. SPH, devotees, priests, musicians)"],
  "overall_environment": "overall environment inference (e.g. main-temple, outdoor-stage, satsang-hall)",
  "lighting_profile": "overall lighting style (e.g. bright daylight, indoor stage lighting, dim ritual lighting)",
  "recommended_tags": ["tags useful for searchability", "like: SPH, puja, homa, satsang, darshan"],

  "kailasa_summary": {{
    "main_deities": ["primary deities present across the video if identifiable"],
    "primary_rituals": ["main rituals or event types in this video (e.g. Nithya Puja, Pada Puja, Satsang, Shakti Darshan)"],
    "sph_presence_summary": "never-present/present-in-some-frames/present-throughout/uncertain",
    "sph_roles": ["summary of SPH's roles across the video (e.g. guru-teacher, ritual-performer, blesser, administrative-head)"],
    "sph_energetic_modes": ["set of energetic modes observed (e.g. upanishadic-silence, satsang-teaching, shakti-darshan)"],
    "sph_emotional_states": ["dominant emotional tones observed (e.g. compassion, ferocious-compassion, joy, serene)",],
    "throne_and_seating_patterns": "short narrative of what SPH usually sits on or stands near throughout the video",
    "notable_mudras": ["list of mudras or blessing gestures that appear repeatedly or are especially significant"]
  }}
}}

Important:
- Only use information that is reasonably supported by the frame JSON or transcription.
- If some aspect is unclear, mark it as "unknown" rather than guessing.
- Return **valid JSON** only, with no Markdown fences and no extra keys.
"""

## Scene Detection & Frame Extraction

In [8]:
def detect_scenes(video_path: str):
    """Run PySceneDetect to identify scene boundaries."""
    video = open_video(video_path)
    manager = SceneManager()
    manager.add_detector(
        ContentDetector(
            threshold=SCENE_THRESHOLD,
            min_scene_len=MIN_SCENE_LENGTH,
        )
    )
    manager.detect_scenes(video)
    return manager.get_scene_list()

def get_video_timestamps(video_path: str, max_frames: int = MAX_FRAMES_PER_VIDEO) -> List[float]:
    """Detect scenes and return list of timestamps to extract."""
    # Check if video file exists
    if not os.path.exists(video_path):
        log.error(f"Video file not found: {video_path}")
        return []

    log.info(f"Detecting scenes for {video_path}")
    try:
        scenes = detect_scenes(video_path)
    except Exception as e:
        log.error(f"Scene detection failed: {e}. Using fallback.")
        scenes = []

    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration = float(total_frames) / float(fps)
    cap.release()

    if not scenes:
        timestamps = [0.0]
        if duration > 2.0:
            timestamps += [duration / 2.0, max(duration - 1.0, 0.0)]
        log.info(f"No scenes detected. Using fallback timestamps {timestamps}")
        return timestamps

    timestamps = []
    extracted = 0

    for start_tc, end_tc in scenes:
        if extracted >= max_frames:
            break

        start_time = float(start_tc.get_seconds() if hasattr(start_tc, "get_seconds") else start_tc)
        end_time = float(end_tc.get_seconds() if hasattr(end_tc, "get_seconds") else end_tc)

        timestamps.append(start_time)
        extracted += 1

        # Middle frame for long scenes
        if end_time - start_time > LONG_SCENE_THRESHOLD and extracted < max_frames:
            mid = start_time + (end_time - start_time) / 2.0
            timestamps.append(mid)
            extracted += 1

    timestamps.sort()
    return timestamps

def extract_frames_at_timestamps(video_path: str, timestamps: List[float]) -> List[Tuple[Any, float]]:
    """Extract frames at specific timestamps."""
    if not timestamps:
        return []

    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS) or 25.0

    frames: List[Tuple[Any, float]] = []

    for ts in timestamps:
        frame_no = int(ts * fps)
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_no)
        ok, frame = cap.read()
        if ok and frame is not None:
            frames.append((cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), ts))
        else:
            log.warning(f"Could not read frame at {ts}s")

    cap.release()
    return frames

## API Calling Functions

In [9]:
@retry_with_key_rotation(retries=5)
def analyze_frame(client: genai.Client, model_name: str, frame_np, timestamp: float) -> Dict[str, Any]:
    """Analyze a single frame with Gemini (sequential, with retries)."""

    jpeg = numpy_to_jpeg_bytes(frame_np)

    contents = types.Content(
        parts=[
            types.Part.from_text(text=build_frame_prompt()),
            types.Part.from_bytes(data=jpeg, mime_type="image/jpeg"),
        ]
    )

    response = client.models.generate_content(
        model=model_name,
        contents=contents,
        config=types.GenerateContentConfig(temperature=0.2),
    )

    raw = response.text or ""
    trimmed = strip_json_fences(raw)

    try:
        data = json.loads(trimmed)
    except Exception:
        data = {"error": "Invalid JSON returned by model", "raw": trimmed}

    data["timestamp_sec"] = float(timestamp)
    return data

@retry_with_key_rotation(retries=5)
def transcribe_video(client: genai.Client, model_name: str, video_path: str) -> str:
    """Upload → poll for processing → single-shot transcription."""
    try:
        # Use only basename for file upload to avoid permission issues with Drive paths in some contexts
        # But with google-genai SDK 0.6+ we can upload file-like objects or paths.
        # The SDK handles reading the file.
        up = client.files.upload(file=video_path)

        # Poll until file is ready or failed
        while getattr(up, "state", None) and getattr(up.state, "name", None) == "PROCESSING":
            time.sleep(2.0)
            up = client.files.get(name=up.name)

        if getattr(up, "state", None) and getattr(up.state, "name", None) == "FAILED":
            raise RuntimeError("Transcription failed (file processing failed)")

        contents = types.Content(
            parts=[
                types.Part.from_uri(file_uri=up.uri, mime_type=up.mime_type),
                types.Part.from_text(
                    text=(
                        "Transcribe the audio of this video. Return plain text only. "
                        "If there are Sanskrit or other Indian-language chants, mantras, "
                        "or songs, transcribe them as faithfully as possible, and add a "
                        "simple English paraphrase in-line in brackets where appropriate."
                    )
                ),
            ]
        )

        resp = client.models.generate_content(
            model=model_name,
            contents=contents,
            config=types.GenerateContentConfig(temperature=0.1),
        )

        try:
            client.files.delete(name=up.name)
        except Exception:
            pass

        return (resp.text or "").strip()

    except Exception as e:
        raise e

@retry_with_key_rotation(retries=5)
def summarize_video(
    client: genai.Client,
    model_name: str,
    frame_json: List[Dict[str, Any]],
    transcription: str,
) -> Dict[str, Any]:
    """Summarize the entire video using per-frame analysis + transcription."""

    prompt = build_summary_prompt(frame_json, transcription)

    resp = client.models.generate_content(
        model=model_name,
        contents=prompt,
        config=types.GenerateContentConfig(temperature=0.3),
    )

    raw = strip_json_fences(resp.text or "")

    try:
        return json.loads(raw)
    except Exception:
        raise ValueError(f"Invalid summary JSON received: {raw[:100]}...")

## Processing Orchestration

In [10]:
def process_video(
    key_manager: KeyManager,
    model_name: str,
    video_path: str,
    skip_transcript: bool = False,
    skip_summary: bool = False,
) -> Dict[str, Any]:

    # Check DB status
    record = get_video_record(video_path)
    if record:
        vid_id, status = record
        if status == "COMPLETED":
            log.info(f"Skipping {video_path} (already COMPLETED in DB)")
            # Retrieve result from DB/File if needed, or just return empty to signal skip
            return {}

    # Calculate hash
    log.info(f"Calculating hash for {video_path}...")
    file_hash = calculate_file_hash(video_path)

    # Check for duplicates
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    c.execute("""
        SELECT id, transcription, summary_json
        FROM videos
        WHERE file_hash = ? AND status = 'COMPLETED' AND path != ?
        LIMIT 1
    """, (file_hash, video_path))
    duplicate = c.fetchone()
    conn.close()

    if duplicate:
        orig_id, orig_transcript, orig_summary = duplicate
        log.info(f"Duplicate detected! Match with video ID {orig_id}. Skipping reprocessing.")
        return {}

    log.info(f"Processing video: {video_path}")
    video_id = start_video_processing(video_path, file_hash)

    try:
        # Step 1: Resume or Detect Timestamps
        stored_timestamps = get_stored_timestamps(video_id)
        if stored_timestamps:
            log.info(f"Resuming with {len(stored_timestamps)} cached frame timestamps.")
            timestamps = stored_timestamps
        else:
            timestamps = get_video_timestamps(video_path)
            update_video_timestamps(video_id, timestamps)

        # Step 2: Analysis Loop
        frame_results: List[Dict[str, Any]] = []
        timestamps_to_extract = []

        cached_analyses = {}
        for ts in timestamps:
            existing = get_existing_frame_analysis(video_id, ts)
            if existing:
                cached_analyses[ts] = existing
                frame_results.append(existing)
            else:
                timestamps_to_extract.append(ts)

        if timestamps_to_extract:
            log.info(f"Extracting {len(timestamps_to_extract)} new frames.")
            new_frames = extract_frames_at_timestamps(video_path, timestamps_to_extract)

            for frame_np, ts in tqdm(new_frames, desc="Analyzing new frames"):
                analysis = analyze_frame(key_manager, model_name, frame_np, ts)
                frame_results.append(analysis)
                save_frame_result(video_id, ts, analysis)
                time.sleep(0.3)
        else:
            log.info("All frames already analyzed.")

        frame_results.sort(key=lambda x: x.get("timestamp_sec", 0.0))

        transcription = ""
        if not skip_transcript:
            log.info("Transcribing audio…")
            try:
                transcription = transcribe_video(key_manager, model_name, video_path)
            except Exception as e:
                log.error(f"Transcription failed: {e}")
                transcription = f"[Transcription Failed: {e}]"

        summary: Dict[str, Any] = {}
        if not skip_summary:
            log.info("Generating summary…")
            try:
                summary = summarize_video(key_manager, model_name, frame_results, transcription)
            except Exception as e:
                log.error(f"Summary failed: {e}")
                summary = {"error": f"Summary generation failed: {e}"}

        output = {
            "video": video_path,
            "frames": frame_results,
            "transcription": transcription,
            "summary": summary,
        }

        out_path = Path(video_path).with_suffix(".analysis.json")
        out_path.write_text(json.dumps(output, indent=2, ensure_ascii=False))
        log.info(f"Saved analysis → {out_path}")

        complete_video_processing(video_id, transcription, summary)
        log.info(f"Marked {video_path} as COMPLETED")
        return output

    except Exception as e:
        log.error(f"Failed to process {video_path}: {e}")
        fail_video_processing(video_id, str(e))
        raise e

## Run Processing
Run the following cell to start processing videos in the `WORK_DIR`.

In [11]:
# Initialize DB
init_db()

# Initialize Key Manager
# Note: Ensure you have added your API Key to the secrets or provided it above
key_manager = KeyManager()

work_path = Path(WORK_DIR)
videos = sorted(list(work_path.glob("*.mp4")) + list(work_path.glob("*.mkv")))

print(f"Found {len(videos)} videos in {WORK_DIR}")

for index, video_file in enumerate(videos):
    print(f"\n[{index+1}/{len(videos)}] Start: {video_file.name}")
    try:
        process_video(
            key_manager=key_manager,
            model_name=DEFAULT_MODEL,
            video_path=str(video_file)
        )
        print(f"[{index+1}/{len(videos)}] Done: {video_file.name}")
    except Exception as e:
        print(f"[{index+1}/{len(videos)}] Failed: {video_file.name} - {e}")
        # Continue to next video
        pass

No keys found in secrets. Please enter your Gemini API Key below:
Enter API Key: AIzaSyCdV1pyx1Z1uONEoxrto7TpMbpYkIVN9yc
Found 3 videos in /content/drive/MyDrive/VideoIndexer_Workspace

[1/3] Start: How to Design Your Body by Conscious Will.mp4
[1/3] Done: How to Design Your Body by Conscious Will.mp4

[2/3] Start: Leader Consciousness.mp4


Analyzing new frames:   0%|          | 0/13 [00:00<?, ?it/s]

[2/3] Done: Leader Consciousness.mp4

[3/3] Start: output.mp4


Analyzing new frames:   0%|          | 0/3 [00:00<?, ?it/s]

[3/3] Done: output.mp4
