### This will be copied to a Google Colab File

In [None]:
# %pip install -r requirements.txt

Collecting numpy<2,>=1.26 (from -r requirements.txt (line 4))
  Using cached numpy-1.26.4-cp312-cp312-win_amd64.whl.metadata (61 kB)
Collecting torch==2.9.1 (from -r requirements.txt (line 11))
  Using cached torch-2.9.1-cp312-cp312-win_amd64.whl.metadata (30 kB)
Collecting transformers==4.57.1 (from -r requirements.txt (line 12))
  Using cached transformers-4.57.1-py3-none-any.whl.metadata (43 kB)
Collecting Pillow==12.0.0 (from -r requirements.txt (line 13))
  Using cached pillow-12.0.0-cp312-cp312-win_amd64.whl.metadata (9.0 kB)
Collecting sentencepiece==0.2.1 (from -r requirements.txt (line 16))
  Using cached sentencepiece-0.2.1-cp312-cp312-win_amd64.whl.metadata (10 kB)
Collecting torchvision==0.24.1 (from -r requirements.txt (line 17))
  Using cached torchvision-0.24.1-cp312-cp312-win_amd64.whl.metadata (5.9 kB)
Collecting sympy>=1.13.3 (from torch==2.9.1->-r requirements.txt (line 11))
  Using cached sympy-1.14.0-py3-none-any.whl.metadata (12 kB)
Collecting huggingface-hub<1.0,

    extract-msg (<=0.29.*)
                 ~~~~~~~^
  You can safely remove it manually.
  You can safely remove it manually.
  You can safely remove it manually.
  You can safely remove it manually.
  You can safely remove it manually.
  You can safely remove it manually.
  You can safely remove it manually.
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
autogluon-multimodal 1.2 requires Pillow<12,>=10.0.1, but you have pillow 12.0.0 which is incompatible.
autogluon-multimodal 1.2 requires torch<2.6,>=2.2, but you have torch 2.9.1 which is incompatible.
autogluon-multimodal 1.2 requires torchvision<0.21.0,>=0.16.0, but you have torchvision 0.24.1 which is incompatible.
autogluon-timeseries 1.2 requires torch<2.6,>=2.2, but you have torch 2.9.1 which is incompatible.
datasets 3.3.1 requires requests>=2.32.2, but you have requests 2.31.0 which is incompatible.

# FUNCTIONS

In [None]:
# src\debug_utils.py

def see_first_scene(df):
    print("Printing first captioned scene:")
    print("{")
    for key in df[0]:
        if key == "frames": continue
        print(f"{key}, {df[0][key]},")
    print("}")

def see_scenes_cuts(df):
    print(f"Found {len(df)} scenes.")
    for s in df:
        print(
            f"Scene {s['scene_index']:03d}: "
            f"{s['start_timecode']} -> {s['end_timecode']} "
            f"({s['duration_seconds']:.2f} sec)"
        )

In [None]:
# src\frame_captioning_blip.py

from typing import List, Dict
import cv2
import numpy as np
import torch
from typing import Optional
from PIL import Image

device = "cuda" if torch.cuda.is_available() else "cpu"

# ======================================================================
# Load BLIP model and processor
from transformers import BlipProcessor, BlipForConditionalGeneration
model = BlipForConditionalGeneration.from_pretrained(
    "Salesforce/blip-image-captioning-base"
).to(device)
processor = BlipProcessor.from_pretrained(
    "Salesforce/blip-image-captioning-base")
# ======================================================================
# # Load BLIP2 model and processor
# from transformers import Blip2Processor, Blip2ForConditionalGeneration
# model = Blip2ForConditionalGeneration.from_pretrained(
#     "Salesforce/blip2-flan-t5-xl",
#     torch_dtype=torch.float32,    # CPU-friendly
# ).to(device)

# processor = Blip2Processor.from_pretrained("Salesforce/blip2-flan-t5-xl")
# ======================================================================


def blip_frame(
    image,
    model: BlipForConditionalGeneration,
    processor: BlipProcessor,
    prompt: Optional[str] = None,
    max_length: int = 30,
    num_beams: int = 3,
    do_sample: bool = False,
) -> str:
    """
    Generate a BLIP caption for a single frame.

    Parameters
    ----------
    image :
        Either a NumPy array (OpenCV BGR or RGB) or a PIL.Image.
    model : BlipForConditionalGeneration
        Preloaded BLIP captioning model.
    processor : BlipProcessor
        Matching BLIP processor.
    prompt : str, optional
        Optional conditioning text, e.g. "a cartoon frame of".
        If None, uses unconditional captioning.
    max_length : int
        Maximum length of the generated caption (tokens).
    num_beams : int
        Beam search width (higher = better but slower).
    do_sample : bool
        Whether to sample (True) or keep decoding deterministic (False).

    Returns
    -------
    str
        Generated caption.
    """
    # --- Normalize image to RGB PIL.Image ---
    if isinstance(image, Image.Image):
        pil_image = image.convert("RGB")
    elif isinstance(image, np.ndarray):
        # Assume OpenCV BGR by default
        if image.ndim == 3 and image.shape[2] == 3:
            image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        else:
            image_rgb = image
        pil_image = Image.fromarray(image_rgb)
    else:
        raise TypeError("image must be a PIL.Image.Image or a numpy.ndarray")

    # Figure out model device (cpu / cuda / mps)
    device = next(model.parameters()).device

    # --- Prepare inputs for BLIP ---
    if prompt is not None:
        inputs = processor(
            pil_image,
            prompt,
            return_tensors="pt",
        )
    else:
        inputs = processor(
            pil_image,
            return_tensors="pt",
        )

    # Move tensors to same device as the model
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # --- Generate caption ---
    with torch.no_grad():
        output_ids = model.generate(
            **inputs,
            max_length=max_length,
            num_beams=num_beams,
            do_sample=do_sample,
            no_repeat_ngram_size=2,
            repetition_penalty=1.2,
        )

    caption = processor.decode(output_ids[0], skip_special_tokens=True)
    return caption.strip()


def caption_frames(
    scenes: List[Dict],
    model: BlipForConditionalGeneration = model,
    processor: BlipProcessor = processor,
    prompt: Optional[str] = None,
    max_length: int = 30,
    num_beams: int = 3,
    do_sample: bool = False,
    debug: bool = False,
) -> List[Dict]:
    """
    For each scene in `scenes`, run BLIP on each frame and attach captions.

    Parameters
    ----------
    scenes : List[Dict]
        Scene dictionaries. Each scene is expected to contain a "frames" key
        with a list of images (numpy arrays or PIL images).
    model : BlipForConditionalGeneration
        Preloaded BLIP captioning model.
    processor : BlipProcessor
        Matching BLIP processor.
    prompt : str, optional
        Optional conditioning text for all captions (e.g. "a cartoon frame of").
    max_length : int
        Max caption length (tokens).
    num_beams : int
        Beam search width.
    do_sample : bool
        Whether to sample or keep it deterministic.

    Returns
    -------
    List[Dict]
        New list of scenes; each scene dict has an extra key:
            "frame_captions": List[str]
        aligned 1:1 with the "frames" list.
    """
    enriched_scenes: List[Dict] = []

    for scene in scenes:
        if debug: print("Scene", scene.get("scene_index", "??"))
        frames = scene.get("frames", [])
        captions: List[str] = []

        for frame in frames:
            caption = blip_frame(
                image=frame,
                model=model,
                processor=processor,
                prompt=prompt,
                max_length=max_length,
                num_beams=num_beams,
                do_sample=do_sample,
            )
            captions.append(caption)
            if debug: print(f"  {caption}")

        new_scene = dict(scene)  # shallow copy so we don't mutate original reference
        new_scene["frame_captions"] = captions
        enriched_scenes.append(new_scene)

    return enriched_scenes


In [None]:
# src\frame_captioning_heavy.py

import cv2
import numpy as np
from typing import List, Optional, Dict, Any
from PIL import Image
import torch
from transformers import AutoModel, AutoTokenizer

ckpt_path = "internlm/internlm-xcomposer2-vl-7b"
device = "cuda" if torch.cuda.is_available() else "cpu"

# tokenizer + vision-language model with custom code
tokenizer = AutoTokenizer.from_pretrained(ckpt_path, trust_remote_code=True)
model = AutoModel.from_pretrained(
    ckpt_path,
    trust_remote_code=True
).to(device).eval()

def xcomposer_frame_and_captions(
    prev_captions: List[str],
    current_caption: str,
    next_captions: List[str],
    frame_image,
    model= model,
    tokenizer= tokenizer,
    extra_instruction: str = (
        "Using the image and these captions as temporal context, "
        "write ONE concise sentence describing what is happening "
        "in this frame, focusing on new details or clarifications."
    ),
    do_sample: bool = False,
) -> str:
    """
    Use InternLM-XComposer2 to refine a single frame caption with
    surrounding context (previous & next captions).

    prev_captions : captions BEFORE this frame (older → newer order).
    current_caption : BLIP caption of this frame.
    next_captions : captions AFTER this frame (newer → future order).
    frame_image : np.ndarray (BGR or RGB), PIL.Image, or image path.
    model, tokenizer : InternLM-XComposer2 loaded with trust_remote_code=True.
    """

    # ---- Normalize image type for model.chat() ----
    if isinstance(frame_image, Image.Image):
        image_for_model = frame_image.convert("RGB")
    elif isinstance(frame_image, np.ndarray):
        # assume OpenCV BGR
        if frame_image.ndim == 3 and frame_image.shape[2] == 3:
            img_rgb = cv2.cvtColor(frame_image, cv2.COLOR_BGR2RGB)
        else:
            img_rgb = frame_image
        image_for_model = Image.fromarray(img_rgb)
    elif isinstance(frame_image, str):
        # assume it's a file path
        image_for_model = frame_image
    else:
        raise TypeError("frame_image must be np.ndarray, PIL.Image, or str path")

    # ---- Build textual context prompt ----
    prev_block = "\n".join(f"- {c}" for c in prev_captions) if prev_captions else "None."
    next_block = "\n".join(f"- {c}" for c in next_captions) if next_captions else "None."

    query = (
        "Previous context:\n"
        f"{prev_block}\n\n"
        "Current frame caption:\n"
        f"- {current_caption}\n\n"
        "Upcoming context:\n"
        f"{next_block}\n\n"
        "Instruction:\n"
        f"{extra_instruction}"
    )

    device = next(model.parameters()).device

    # ---- Call InternLM-XComposer2's chat API ----
    torch.set_grad_enabled(False)

    if device.type == "cuda":
        with torch.cuda.amp.autocast():
            response, _ = model.chat(
                tokenizer,
                query=query,
                image=image_for_model,
                history=[],
                do_sample=do_sample,
            )
    else:
        response, _ = model.chat(
            tokenizer,
            query=query,
            image=image_for_model,
            history=[],
            do_sample=do_sample,
        )

    return response.strip()

def refine_caption_frames(
    scenes: List[Dict],
    model = model,
    tokenizer = tokenizer,
    num_prev: int = 1,
    num_next: int = 1,
    extra_instruction: str = (
        "Using the image and these captions as temporal context, "
        "write ONE concise sentence describing what is happening "
        "in this frame, focusing on new details or clarifications."
    ),
    do_sample: bool = False,
    debug: bool = False,
) -> List[Dict]:
    """
    For each scene and each frame, call InternLM-XComposer2 with:
      - up to num_prev previous captions
      - the current BLIP caption
      - up to num_next future captions
    and attach a refined caption.

    Expects each scene dict to contain:
      - "frames": List[np.ndarray or PIL.Image]
      - "frame_captions": List[str]  (same length as frames)

    Returns a NEW list of scenes, each with:
      - "frame_detailed_captions": List[str] aligned 1:1 with frames
    """
    refined_scenes: List[Dict] = []

    for scene in scenes:
        if debug: print("Scene", scene.get("scene_index", "??"))

        frames = scene.get("frames", [])
        base_captions = scene.get("frame_captions", [])

        if len(frames) != len(base_captions):
            raise ValueError(
                f"Scene {scene.get('scene_index', '?')} has "
                f"{len(frames)} frames but {len(base_captions)} captions."
            )

        n = len(frames)
        frame_detailed_captions: List[str] = []

        for i in range(n):
            # ---- build sliding window context ----
            start_prev = max(0, i - num_prev)
            end_prev = i  # exclusive of current
            prev_captions = base_captions[start_prev:end_prev]

            current_caption = base_captions[i]

            start_next = i + 1
            end_next = min(n, i + 1 + num_next)
            next_captions = base_captions[start_next:end_next]

            frame_image = frames[i]

            refined_caption = xcomposer_frame_and_captions(
                prev_captions=prev_captions,
                current_caption=current_caption,
                next_captions=next_captions,
                frame_image=frame_image,
                model=model,
                tokenizer=tokenizer,
                extra_instruction=extra_instruction,
                do_sample=do_sample,
            )

            frame_detailed_captions.append(refined_caption)
            if debug: print(f"  {refined_caption}")

        new_scene = dict(scene)  # shallow copy
        new_scene["frame_detailed_captions"] = frame_detailed_captions
        refined_scenes.append(new_scene)

    return refined_scenes


In [18]:
# src\frame_sampling.py

import os
from typing import List, Dict, Optional
import cv2
import numpy as np  

# 
def sample_from_clip(
    input_video_path: str,
    scene_index: int,
    start_seconds: float,
    end_seconds: float,
    num_frames: int = 5,
) -> List[np.ndarray]:
    """
    Sample `num_frames` frames from a single scene interval.
    Returns ONLY the images (as numpy arrays), no saving, no dicts.

    Parameters
    ----------
    input_video_path : str
        Path to the input video file.
    scene_index : int
        Scene index (not used in logic, just for potential logging/debug).
    start_seconds : float
        Scene start time in seconds.
    end_seconds : float
        Scene end time in seconds.
    num_frames : int, default 5
        Number of frames to sample within [start_seconds, end_seconds].

    Returns
    -------
    List[np.ndarray]
        List of frames as BGR numpy arrays (OpenCV format).
        Length may be <= num_frames if decoding fails on some positions.
    """
    cap = cv2.VideoCapture(input_video_path)
    if not cap.isOpened():
        raise ValueError(f"Cannot open video: {input_video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Convert seconds → frame indices (inclusive range)
    start_frame = int(round(start_seconds * fps))
    end_frame = int(round(end_seconds * fps)) - 1

    # Clamp to valid range
    start_frame = max(0, min(start_frame, total_frames - 1))
    end_frame = max(0, min(end_frame, total_frames - 1))

    if end_frame < start_frame:
        end_frame = start_frame

    # Compute evenly spaced positions
    if num_frames <= 1 or start_frame == end_frame:
        frame_positions = [start_frame]
    else:
        frame_positions = [
            int(round(start_frame + (i / (num_frames - 1)) * (end_frame - start_frame)))
            for i in range(num_frames)
        ]

    # Final clamp and deduplicate (just in case of rounding collisions)
    frame_positions = sorted(
        set(max(0, min(p, total_frames - 1)) for p in frame_positions)
    )

    frames: List[np.ndarray] = []

    for frame_num in frame_positions:
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
        ret, frame = cap.read()
        if not ret or frame is None:
            # Skip unreadable frames, but keep going
            continue
        frames.append(frame)

    cap.release()
    return frames

def sample_frames(
    input_video_path: str,
    scenes: List[Dict],
    num_frames: int = 4,
    output_dir: Optional[str] = None,
) -> List[Dict]:
    """
    Loop over a list of scene dictionaries and attach sampled frames to each.

    Parameters
    ----------
    input_video_path : str
        Path to the input video file.
    scenes : List[Dict]
        Output of get_scene_list(...), each with at least:
        - "scene_index"
        - "start_seconds"
        - "end_seconds"
    num_frames : int, default 5
        Number of frames to sample per scene.
    output_dir : Optional[str], default None
        If None  -> do NOT save frames to disk.
        If str   -> save frames under this directory (with subfolders per scene).

    Returns
    -------
    List[Dict]
        New list of scene dicts. Each scene dict is the same as input,
        plus:
            - "frames": List[np.ndarray]    (sampled images in memory)
            - "frame_paths": List[str] or None
              (paths where frames were saved, if output_dir is provided)
    """
    # Prepare saving directory if requested
    if output_dir is not None:
        os.makedirs(output_dir, exist_ok=True)

    enriched_scenes: List[Dict] = []

    for scene in scenes:
        scene_index = scene["scene_index"]
        start_seconds = scene["start_seconds"]
        end_seconds = scene["end_seconds"]

        # Use the singular helper: no dictionary involved here
        frames = sample_from_clip(
            input_video_path=input_video_path,
            scene_index=scene_index,
            start_seconds=start_seconds,
            end_seconds=end_seconds,
            num_frames=num_frames,
        )

        frame_paths: Optional[List[str]] = None

        # Optionally save frames if output_dir is provided
        if output_dir is not None:
            scene_folder = os.path.join(output_dir, f"scene_{scene_index:03d}")
            os.makedirs(scene_folder, exist_ok=True)

            frame_paths = []
            for idx, frame in enumerate(frames):
                filename = f"frame_{idx:02d}.jpg"
                frame_path = os.path.join(scene_folder, filename)
                cv2.imwrite(frame_path, frame)
                frame_paths.append(frame_path)

        # Build new scene dict with frames attached
        new_scene = dict(scene)  # shallow copy
        new_scene["frames"] = frames                # in-memory images
        new_scene["frame_paths"] = frame_paths      # list of paths or None

        enriched_scenes.append(new_scene)

    return enriched_scenes


In [None]:
# src\scene_cutting.py

from typing import List, Dict
from scenedetect import open_video, SceneManager
from scenedetect.detectors import ContentDetector

def get_scene_list(input_video_path: str, threshold: float = 27.0, min_scene_len: int = 15) -> List[Dict]:
    """
    Detect scenes in a video using PySceneDetect and return structured metadata.

    Parameters
    ----------
    input_video_path : str
        Path to the input video file.
    threshold : float, optional
        Sensitivity for the ContentDetector. Lower values detect more scene cuts.
        Default is 27.0.
    min_scene_len : int, optional
        Minimum scene length in frames. Default is 15.

    Returns
    -------
    List[Dict]
        A list of dictionaries, each containing:
        - "scene_index": Index of the detected scene.
        - "start_timecode": Start timecode (HH:MM:SS.mmm).
        - "end_timecode": End timecode (HH:MM:SS.mmm).
        - "start_seconds": Start time in seconds (float).
        - "end_seconds": End time in seconds (float).
        - "duration_seconds": Duration of the scene in seconds.

    Notes
    -----
    This function uses PySceneDetect's ContentDetector to locate abrupt content
    changes. It is suitable for preprocessing steps in segmentation, retrieval,
    summarization, and other video analysis workflows.
    """
    video = open_video(input_video_path)

    scene_manager = SceneManager()
    scene_manager.add_detector(ContentDetector(threshold=threshold, min_scene_len=min_scene_len))

    scene_manager.detect_scenes(video)
    scene_list = scene_manager.get_scene_list()

    result = []
    for idx, (start_time, end_time) in enumerate(scene_list):
        start_sec = start_time.get_seconds()
        end_sec = end_time.get_seconds()
        result.append({
            "scene_index": idx,
            "start_timecode": str(start_time),
            "end_timecode": str(end_time),
            "start_seconds": start_sec,
            "end_seconds": end_sec,
            "duration_seconds": end_sec - start_sec,
        })
    return result


def test():
    test_video = r'Videos\SpongeBob SquarePants - Writing Essay - Some of These - Meme Source.mp4'
    scenes = get_scene_list(test_video)

    print(f"Found {len(scenes)} scenes.")
    for s in scenes:
        print(
            f"Scene {s['scene_index']:03d}: "
            f"{s['start_timecode']} -> {s['end_timecode']} "
            f"({s['duration_seconds']:.2f} sec)"
        )
# test()

# MAIN CODE

In [None]:
test_video = r'Videos\SpongeBob SquarePants - Writing Essay - Some of These - Meme Source.mp4'

scenes = get_scene_list(test_video)
see_scenes_cuts(scenes)

scenes_with_frames = sample_frames(
    input_video_path=test_video,
    scenes=scenes,
    num_frames=2,
    output_dir="./output/frames",
)

captioned_scenes = caption_frames(
    scenes=scenes_with_frames,
    max_length=30,
    num_beams=4,
    do_sample=False,
    debug=True,
    prompt="a video frame of"
)

refined_scenes = refine_caption_frames(
    scenes=captioned_scenes,
    num_prev=1,
    num_next=1,
    extra_instruction=(
        "Using the image and these captions as temporal context, "
        "write ONE concise sentence describing what is happening "
        "in this frame, focusing on new details or clarifications."
    ),
    do_sample=False,
    debug=True,

)

see_first_scene(refined_scenes)