## 

# Context Prompting with Internvl 3.5

In [None]:
## Install the environment
%pip install -r requirements.txt

In [7]:
import os
import math
import json
import torch
import traceback
import numpy as np
from PIL import Image
from pathlib import Path
import torchvision.transforms as T
from decord import VideoReader, cpu
from torchvision.transforms.functional import InterpolationMode
from transformers import AutoModel, AutoTokenizer, BitsAndBytesConfig

In [8]:
# ─── Runtime Parameters ────────────────────────────────────────────────
NUM_FRAMES = 32          # frames sampled per clip
CLIP_DURATION = 10       # seconds per segment
TEMPERATURE = 0.1

INPUT_SIZE = 448
MAX_PATCHES = 12


GEN_CONFIG = {
    "max_new_tokens": 225,
    "do_sample": True,
    "temperature": TEMPERATURE
}

VIDEO_EXTENSIONS = [
    ".mp4", ".mov", ".avi", ".mkv", ".flv", ".wmv", ".webm",
    ".mpeg", ".mpg", ".m4v", ".3gp", ".3g2", ".mts", ".m2ts",
    ".ts", ".ogv", ".divx", ".vob", ".rm", ".rmvb", ".asf"
]

In [9]:
# ─── Quantized Model ─────────────────────────────────────────────────────────────
bnb_config = BitsAndBytesConfig(
    load_in_8bit=True,
    bnb_8bit_compute_dtype=torch.float16
)

path = "OpenGVLab/InternVL3_5-8B"
model = AutoModel.from_pretrained(
    path,
    quantization_config=bnb_config,
    low_cpu_mem_usage=True,
    use_flash_attn=True,
    trust_remote_code=True,
    device_map="cuda:1"
).eval()
tokenizer = AutoTokenizer.from_pretrained(path, trust_remote_code=True, use_fast=False)

Loading checkpoint shards: 100%|██████████| 4/4 [00:10<00:00,  2.60s/it]


In [10]:
# ─── Preprocessing ─────────────────────────────────────────────────────
IMAGENET_MEAN = (0.485, 0.456, 0.406)
IMAGENET_STD = (0.229, 0.224, 0.225)

def build_transform(input_size):
    return T.Compose([
        T.Lambda(lambda img: img.convert("RGB") if img.mode != "RGB" else img),
        T.Resize((input_size, input_size), interpolation=InterpolationMode.BICUBIC),
        T.ToTensor(),
        T.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD),
    ])

def find_closest_aspect_ratio(aspect_ratio, target_ratios, width, height, image_size):
    best_ratio_diff = float('inf')
    best_ratio = (1, 1)
    area = width * height
    for ratio in target_ratios:
        target_aspect_ratio = ratio[0] / ratio[1]
        ratio_diff = abs(aspect_ratio - target_aspect_ratio)
        if ratio_diff < best_ratio_diff:
            best_ratio_diff = ratio_diff
            best_ratio = ratio
        elif ratio_diff == best_ratio_diff:
            if area > 0.5 * image_size * image_size * ratio[0] * ratio[1]:
                best_ratio = ratio
    return best_ratio

def dynamic_preprocess(image, min_num=1, max_num=12, image_size=448, use_thumbnail=False):
    orig_width, orig_height = image.size
    aspect_ratio = orig_width / orig_height

    # calculate the existing image aspect ratio
    target_ratios = set(
        (i, j) for n in range(min_num, max_num + 1) for i in range(1, n + 1) for j in range(1, n + 1) if
        i * j <= max_num and i * j >= min_num)
    target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1])

    # find the closest aspect ratio to the target
    target_aspect_ratio = find_closest_aspect_ratio(
        aspect_ratio, target_ratios, orig_width, orig_height, image_size)

    # calculate the target width and height
    target_width = image_size * target_aspect_ratio[0]
    target_height = image_size * target_aspect_ratio[1]
    blocks = target_aspect_ratio[0] * target_aspect_ratio[1]

    # resize the image
    resized_img = image.resize((target_width, target_height))
    processed_images = []
    for i in range(blocks):
        box = (
            (i % (target_width // image_size)) * image_size,
            (i // (target_width // image_size)) * image_size,
            ((i % (target_width // image_size)) + 1) * image_size,
            ((i // (target_width // image_size)) + 1) * image_size
        )
        # split the image
        split_img = resized_img.crop(box)
        processed_images.append(split_img)
    assert len(processed_images) == blocks
    if use_thumbnail and len(processed_images) != 1:
        thumbnail_img = image.resize((image_size, image_size))
        processed_images.append(thumbnail_img)
    return processed_images


def get_index(bound, fps, max_frame, first_idx=0, num_segments=NUM_FRAMES):
    if bound:
        start, end = bound[0], bound[1]
    else:
        start, end = -100000, 100000
    start_idx = max(first_idx, round(start * fps))
    end_idx = min(round(end * fps), max_frame)
    seg_size = float(end_idx - start_idx) / num_segments
    frame_indices = np.array([
        int(start_idx + (seg_size / 2) + np.round(seg_size * idx))
        for idx in range(num_segments)
    ])
    return frame_indices

def load_video(video_path, bound=None, input_size=INPUT_SIZE, max_num=1, num_segments=NUM_FRAMES):
    vr = VideoReader(video_path, ctx=cpu(0), num_threads=1)
    max_frame = len(vr) - 1
    fps = float(vr.get_avg_fps())

    pixel_values_list, num_patches_list = [], []
    transform = build_transform(input_size=input_size)
    frame_indices = get_index(bound, fps, max_frame, first_idx=0, num_segments=num_segments)

    for frame_index in frame_indices:
        img = Image.fromarray(vr[frame_index].asnumpy()).convert('RGB')
        img = dynamic_preprocess(img, image_size=input_size, use_thumbnail=True, max_num=max_num)
        pixel_values = [transform(tile) for tile in img]
        pixel_values = torch.stack(pixel_values)
        num_patches_list.append(pixel_values.shape[0])
        pixel_values_list.append(pixel_values)

    pixel_values = torch.cat(pixel_values_list)
    return pixel_values, num_patches_list

In [None]:
# ─── Prompt Helpers ────────────────────────────────────────────────────
def clip_as_prompt(num_patches_list):
    # One <image> token per frame/tile group
    return "\n".join(f"Frame{i+1}: <image>" for i in range(len(num_patches_list)))


def build_prompt(start, num_patches_list, prev_output, first_prompt, second_prompt):
    """Builds the text prompt for a video segment."""
    if start == 0:
        return f"{clip_as_prompt(num_patches_list)}\n{first_prompt}"
    
    context = f"The previous clip showed: {prev_output}\n"
    frames = clip_as_prompt(num_patches_list)
    return f"{context}{second_prompt}\n{frames}"


# ─── Inference ─────────────────────────────────────────────────────────
def load_video_metadata(video_file_path):
    try:
        reader = VideoReader(str(video_file_path), ctx=cpu(0))
        fps = reader.get_avg_fps()
        duration = len(reader) / fps
        return fps, duration
    except Exception:
        return None, None


def process_video_segment(video_path, start, end, prev_output, first_prompt, second_prompt, system_prompt):
    print(f" Segment {start:>5.1f}s – {end:>6.1f}s")

    bound = (start, end)
    vid_t, num_patches_list = load_video(str(video_path), bound=bound)
    if vid_t.numel() == 0:
        return None, prev_output

    vid_t = vid_t.to(device=model.device, dtype=torch.float16).contiguous()
    prompt = build_prompt(start, num_patches_list, prev_output, first_prompt, second_prompt)

    model.system_message = system_prompt.strip()
    response, _ = model.chat(
        tokenizer, vid_t, prompt, GEN_CONFIG,
        history=None, return_history=True,
        num_patches_list=num_patches_list
    )

    del vid_t
    torch.cuda.empty_cache()

    return response


def describe_video(video_file_path, first_prompt, second_prompt, system_prompt, segment_duration=CLIP_DURATION):
    summaries, prev_output = [], ""
    fps, total_duration = load_video_metadata(video_file_path)
    if fps is None:
        return []

    print(f"Processing {Path(video_file_path).name} ({total_duration:.1f}s)")
    duration_int = int(total_duration + 1e-6)

    for start in range(0, duration_int, segment_duration):
        end = min(start + segment_duration, duration_int)
        seg_summary, prev_output = process_video_segment(
            video_file_path, start, end, prev_output, first_prompt, second_prompt, system_prompt
        )
        if seg_summary:
            summaries.append(seg_summary)

    return summaries


def generate_video_descriptions(media_dir, first_prompt, second_prompt, system_prompt):
    results = {}
    for filename in sorted(os.listdir(media_dir)):
        path = os.path.join(media_dir, filename)
        if filename.lower().endswith(tuple(VIDEO_EXTENSIONS)):
            clips = describe_video(path, first_prompt, second_prompt, system_prompt)
            final = clips[-1]["description"] if clips else ""
            results[filename] = final
    return results

In [None]:
FIRST_PROMPT = "Describe in detail what is happening in this video."
SECOND_PROMPT = "Use the descriptions from previous clip to generate a cumulative of the this clip from the same video."

R1_SYSTEM_PROMPT = '''
You are an AI assistant that rigorously follows this response protocol:

1. First, conduct a detailed analysis of the question. Consider different angles, potential solutions, and reason through the problem step-by-step. Enclose this entire thinking process within <think> and </think> tags.

2. After the thinking section, provide a clear, concise, and direct answer to the user's question. Separate the answer from the think section with a newline.

Ensure that the thinking process is thorough but remains focused on the query. The final answer should be standalone and not reference the thinking section.
'''.strip()

descriptions = generate_video_descriptions("toy_ds", FIRST_PROMPT, SECOND_PROMPT, R1_SYSTEM_PROMPT)
with open("video_descriptions.json", "w", encoding="utf-8") as f:
    json.dump(descriptions, f, indent=2, ensure_ascii=False)

▶ Processing TNS_0002_V.MP4 (29.5s)
 Segment   0.0s –   10.0s


Setting `pad_token_id` to `eos_token_id`:151645 for open-end generation.


 Segment  10.0s –   20.0s


Setting `pad_token_id` to `eos_token_id`:151645 for open-end generation.


 Segment  20.0s –   29.0s


Setting `pad_token_id` to `eos_token_id`:151645 for open-end generation.
