In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%%capture
import os, re
if "COLAB_" not in "".join(os.environ.keys()):
    !pip install unsloth
else:
    # Do this only in Colab notebooks! Otherwise use pip install unsloth
    import torch; v = re.match(r"[0-9]{1,}\.[0-9]{1,}", str(torch.__version__)).group(0)
    xformers = "xformers==" + ("0.0.33.post1" if v=="2.9" else "0.0.32.post2" if v=="2.8" else "0.0.29.post3")
    !pip install --no-deps bitsandbytes accelerate {xformers} peft trl triton cut_cross_entropy unsloth_zoo
    !pip install sentencepiece protobuf "datasets==4.3.0" "huggingface_hub>=0.34.0" hf_transfer
    !pip install --no-deps unsloth
!pip install transformers==4.57.1
!pip install --no-deps trl==0.22.2

In [None]:
from unsloth import FastVisionModel # FastLanguageModel for LLMs
import torch

#model_id = "unsloth/Qwen3-VL-2B-Thinking-unsloth-bnb-4bit"
model_id = "unsloth/Qwen3-VL-2B-Instruct-unsloth-bnb-4bit"
model, tokenizer = FastVisionModel.from_pretrained(
    model_id,
    load_in_4bit = True, # Use 4bit to reduce memory use. False for 16bit LoRA.
    use_gradient_checkpointing = "unsloth", # True or "unsloth" for long context
)

==((====))==  Unsloth 2026.1.2: Fast Qwen3_Vl patching. Transformers: 4.57.1.
   \\   /|    Tesla T4. Num GPUs = 1. Max memory: 14.741 GB. Platform: Linux.
O^O/ \_/ \    Torch: 2.9.0+cu126. CUDA: 7.5. CUDA Toolkit: 12.6. Triton: 3.5.0
\        /    Bfloat16 = FALSE. FA [Xformers = 0.0.33.post1. FA2 = False]
 "-____-"     Free license: http://github.com/unslothai/unsloth
Unsloth: Fast downloading is enabled - ignore downloading bars which are red colored!


In [None]:
from transformers import AutoProcessor

# 보통 processor는 HF 쪽을 쓰는 게 편함 (멀티이미지 입력)
processor = AutoProcessor.from_pretrained(model_id)
FastVisionModel.for_inference(model)  # inference 최적화 (unsloth 스타일)

Qwen3VLForConditionalGeneration(
  (model): Qwen3VLModel(
    (visual): Qwen3VLVisionModel(
      (patch_embed): Qwen3VLVisionPatchEmbed(
        (proj): Conv3d(3, 1024, kernel_size=(2, 16, 16), stride=(2, 16, 16))
      )
      (pos_embed): Embedding(2304, 1024)
      (rotary_pos_emb): Qwen3VLVisionRotaryEmbedding()
      (blocks): ModuleList(
        (0-23): 24 x Qwen3VLVisionBlock(
          (norm1): LayerNorm((1024,), eps=1e-06, elementwise_affine=True)
          (norm2): LayerNorm((1024,), eps=1e-06, elementwise_affine=True)
          (attn): Qwen3VLVisionAttention(
            (qkv): Linear(in_features=1024, out_features=3072, bias=True)
            (proj): Linear(in_features=1024, out_features=1024, bias=True)
          )
          (mlp): Qwen3VLVisionMLP(
            (linear_fc1): Linear(in_features=1024, out_features=4096, bias=True)
            (linear_fc2): Linear(in_features=4096, out_features=1024, bias=True)
            (act_fn): GELUTanh()
          )
        )
      )
 

In [None]:
def show_thumb_grid(
    images,
    scale=0.10,      # 10% 축소
    cols=4,          # 4x2 (8프레임 기준)
    gap=6,           # 썸네일 간격(px)
    bg=(18, 18, 18)  # 배경색
):
    """
    images: List[PIL.Image.Image]
    scale : 0~1 (원본 대비 축소 비율)
    cols  : 그리드 열 수
    gap   : 타일 간격
    """
    if not images:
        return

    # 1) 썸네일 생성
    thumbs = []
    for im in images:
        w, h = im.size
        tw, th = max(1, int(w * scale)), max(1, int(h * scale))
        thumbs.append(im.resize((tw, th), resample=Image.BILINEAR))

    # 2) 그리드 캔버스 만들기
    rows = (len(thumbs) + cols - 1) // cols
    cell_w = max(t.size[0] for t in thumbs)
    cell_h = max(t.size[1] for t in thumbs)

    grid_w = cols * cell_w + (cols - 1) * gap
    grid_h = rows * cell_h + (rows - 1) * gap

    grid = Image.new("RGB", (grid_w, grid_h), bg)

    # 3) paste
    for idx, t in enumerate(thumbs):
        r = idx // cols
        c = idx % cols
        x = c * (cell_w + gap)
        y = r * (cell_h + gap)
        grid.paste(t, (x, y))

    display(grid)
    return grid  # 필요하면 저장/추가 처리 가능

In [None]:
"""
영상 크기가 큰 경우(700mb 이상) ram 절약하기 위해 개선한 코드임

코드 수정한 프롬프트(제미나이 2.5 플래시):
마지막 셀 코드에 리스트에 frames을 리스트 저장이 아닌 yield 하고, 아래 for문도 generator를 인자값으로 받아서 실행할 수 있게 함수화 및 실행 코드로 바꿔줘, ram 절약 관점에서만 개선해주는 "리팩토링"이 되어야 해
"""
import cv2
from PIL import Image
from IPython.display import display
import time
from datetime import datetime
import torch

# Helper functions
def ts():
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def sync():
    if torch.cuda.is_available():
        torch.cuda.synchronize()


def frame_generator(video_path, max_seconds=None):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"Failed to open video: {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration_sec = int(total_frames / fps) if fps and fps > 0 else None

    sec = 0
    while True:
        if max_seconds is not None and sec >= max_seconds:
            break
        if duration_sec is not None and sec >= duration_sec:
            break

        cap.set(cv2.CAP_PROP_POS_MSEC, sec * 1000.0)
        ok, bgr = cap.read()
        if not ok:
            break

        rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
        yield Image.fromarray(rgb)
        sec += 1
    cap.release()

def process_video_frames(frame_gen, chunk_size, max_new_tokens, processor, model):
    prev_summary = None
    t_all0 = time.perf_counter()
    print(f"[{ts()}] Start total")

    chunk_buffer = []
    current_frame_idx = 0

    for frame in frame_gen:
        chunk_buffer.append(frame)
        current_frame_idx += 1

        if len(chunk_buffer) == chunk_size:
            t_chunk0 = time.perf_counter()
            start_s = current_frame_idx - chunk_size
            end_s = current_frame_idx - 1
            print(f"\n[{ts()}] === Chunk {start_s}s ~ {end_s}s START ===")

            chunk = chunk_buffer

            show_thumb_grid(chunk, scale=0.10, cols=4)

            resized_chunk = []
            target_w = 512
            for im in chunk:
                w, h = im.size
                if w > target_w:
                    new_h = int(h * (target_w / w))
                    im = im.resize((target_w, new_h))
                resized_chunk.append(im)

            chunk = resized_chunk

            t0 = time.perf_counter()
            content = [{"type": "image", "image": im} for im in chunk]
            t1 = time.perf_counter()
            print(f"[{ts()}] step2 build_image_content: {(t1 - t0)*1000:.1f} ms")

            t0 = time.perf_counter()
            prompt = (
                f"These are frames sampled at 1 FPS from {start_s}s to {end_s}s.\n"
                "Write a concise 2–3 sentence summary in English.\n"
                "Do NOT output any reasoning, analysis, chain-of-thought, or step-by-step thinking.\n"
                "Only output the final summary text.\n"
                "If a new character appears, briefly describe appearance and actions.\n"
                "If the character already appeared, refer to the prior description.\n"
                "Keep it under 70 words."
            )
            if prev_summary:
                prompt += f"\n\nPrevious chunk summary (context): {prev_summary}"

            content.append({"type": "text", "text": prompt})
            messages = [{"role": "user", "content": content}]
            t1 = time.perf_counter()
            print(f"[{ts()}] step3 build_prompt_messages: {(t1 - t0)*1000:.1f} ms")

            t0 = time.perf_counter()
            inputs = processor.apply_chat_template(
                messages,
                tokenize=True,
                add_generation_prompt=True,
                return_dict=True,
                return_tensors="pt",
            )
            t1 = time.perf_counter()
            print(f"[{ts()}] step4 apply_chat_template(tokenize): {(t1 - t0)*1000:.1f} ms")

            t0 = time.perf_counter()
            inputs = {k: v.to(model.device) for k, v in inputs.items()}
            sync()
            t1 = time.perf_counter()
            print(f"[{ts()}] step5 to_device(+sync): {(t1 - t0)*1000:.1f} ms")

            t0 = time.perf_counter()
            sync()
            with torch.inference_mode():
                out_ids = model.generate(
                    **inputs,
                    max_new_tokens=max_new_tokens,
                    do_sample=False,
                    num_beams=1,
                    repetition_penalty=1.05,
                )
            sync()
            t1 = time.perf_counter()
            print(f"[{ts()}] step6 generate(+sync): {(t1 - t0):.3f} s")

            t0 = time.perf_counter()
            gen_ids = out_ids[:, inputs["input_ids"].shape[1]:]
            text = processor.batch_decode(
                gen_ids,
                skip_special_tokens=True,
                clean_up_tokenization_spaces=False
            )[0].strip()
            t1 = time.perf_counter()
            print(f"[{ts()}] step7 decode: {(t1 - t0)*1000:.1f} ms")

            print(f"\n=== Chunk {start_s}s ~ {end_s}s RESULT ===")
            print(text)

            prev_summary = text
            chunk_buffer = []

            t_chunk1 = time.perf_counter()
            print(f"[{ts()}] === Chunk total: {(t_chunk1 - t_chunk0):.3f} s ===")

    if chunk_buffer:
        t_chunk0 = time.perf_counter()
        start_s = current_frame_idx - len(chunk_buffer)
        end_s = current_frame_idx - 1
        print(f"\n[{ts()}] === Final Chunk {start_s}s ~ {end_s}s START ===")

        chunk = chunk_buffer

        show_thumb_grid(chunk, scale=0.10, cols=4)

        resized_chunk = []
        target_w = 512
        for im in chunk:
            w, h = im.size
            if w > target_w:
                new_h = int(h * (target_w / w))
                im = im.resize((target_w, new_h))
            resized_chunk.append(im)

        chunk = resized_chunk

        t0 = time.perf_counter()
        content = [{"type": "image", "image": im} for im in chunk]
        t1 = time.perf_counter()
        print(f"[{ts()}] step2 build_image_content: {(t1 - t0)*1000:.1f} ms")

        t0 = time.perf_counter()
        prompt = (
            f"These are frames sampled at 1 FPS from {start_s}s to {end_s}s.\n"
            "Write a concise 2–3 sentence summary in English.\n"
            "Do NOT output any reasoning, analysis, chain-of-thought, or step-by-step thinking.\n"
            "Only output the final summary text.\n"
            "If a new character appears, briefly describe appearance and actions.\n"
            "If the character already appeared, refer to the prior description.\n"
            "Keep it under 70 words."
        )
        if prev_summary:
            prompt += f"\n\nPrevious chunk summary (context): {prev_summary}"

        content.append({"type": "text", "text": prompt})
        messages = [{"role": "user", "content": content}]
        t1 = time.perf_counter()
        print(f"[{ts()}] step3 build_prompt_messages: {(t1 - t0)*1000:.1f} ms")

        t0 = time.perf_counter()
        inputs = processor.apply_chat_template(
            messages,
            tokenize=True,
            add_generation_prompt=True,
            return_dict=True,
            return_tensors="pt",
        )
        t1 = time.perf_counter()
        print(f"[{ts()}] step4 apply_chat_template(tokenize): {(t1 - t0)*1000:.1f} ms")

        t0 = time.perf_counter()
        inputs = {k: v.to(model.device) for k, v in inputs.items()}
        sync()
        t1 = time.perf_counter()
        print(f"[{ts()}] step5 to_device(+sync): {(t1 - t0)*1000:.1f} ms")

        t0 = time.perf_counter()
        sync()
        with torch.inference_mode():
            out_ids = model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                do_sample=False,
                num_beams=1,
                repetition_penalty=1.05,
            )
        sync()
        t1 = time.perf_counter()
        print(f"[{ts()}] step6 generate(+sync): {(t1 - t0):.3f} s")

        t0 = time.perf_counter()
        gen_ids = out_ids[:, inputs["input_ids"].shape[1]:]
        text = processor.batch_decode(
            gen_ids,
            skip_special_tokens=True,
            clean_up_tokenization_spaces=False
        )[0].strip()
        t1 = time.perf_counter()
        print(f"[{ts()}] step7 decode: {(t1 - t0)*1000:.1f} ms")

        print(f"\n=== Final Chunk {start_s}s ~ {end_s}s RESULT ===")
        print(text)

        prev_summary = text

        t_chunk1 = time.perf_counter()
        print(f"[{ts()}] === Final Chunk total: {(t_chunk1 - t_chunk0):.3f} s ===")

    t_all1 = time.perf_counter()
    print(f"\n[{ts()}] End total. Total elapsed: {(t_all1 - t_all0):.3f} s")

# --- Execution Code ---
video_path = "sample01.mp4"  # 700mb 파일 제너레이터 코드 실행 테스트
chunk_size = 8
max_new_tokens = 160

frames_gen = frame_generator(video_path)
process_video_frames(frames_gen, chunk_size, max_new_tokens, processor, model)


Output hidden; open in https://colab.research.google.com to view.

In [None]:
"""
영상 크기가 큰 경우(700mb 이상) ram 절약하기 위해 개선한 코드임

코드 수정한 프롬프트(제미나이 2.5 플래시):
마지막 셀 코드에 리스트에 frames을 리스트 저장이 아닌 yield 하고, 아래 for문도 generator를 인자값으로 받아서 실행할 수 있게 함수화 및 실행 코드로 바꿔줘, ram 절약 관점에서만 개선해주는 "리팩토링"이 되어야 해
"""
import cv2
from PIL import Image
from IPython.display import display
import time
from datetime import datetime
import torch

# Helper functions
def ts():
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def sync():
    if torch.cuda.is_available():
        torch.cuda.synchronize()


def frame_generator(video_path, max_seconds=None):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"Failed to open video: {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration_sec = int(total_frames / fps) if fps and fps > 0 else None

    sec = 0
    while True:
        if max_seconds is not None and sec >= max_seconds:
            break
        if duration_sec is not None and sec >= duration_sec:
            break

        cap.set(cv2.CAP_PROP_POS_MSEC, sec * 1000.0)
        ok, bgr = cap.read()
        if not ok:
            break

        rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
        yield Image.fromarray(rgb)
        sec += 1
    cap.release()

def process_video_frames(frame_gen, chunk_size, max_new_tokens, processor, model):
    prev_summary = None
    t_all0 = time.perf_counter()
    print(f"[{ts()}] Start total")

    chunk_buffer = []
    current_frame_idx = 0

    for frame in frame_gen:
        chunk_buffer.append(frame)
        current_frame_idx += 1

        if len(chunk_buffer) == chunk_size:
            t_chunk0 = time.perf_counter()
            start_s = current_frame_idx - chunk_size
            end_s = current_frame_idx - 1
            print(f"\n[{ts()}] === Chunk {start_s}s ~ {end_s}s START ===")

            chunk = chunk_buffer

            show_thumb_grid(chunk, scale=0.10, cols=4)

            resized_chunk = []
            target_w = 512
            for im in chunk:
                w, h = im.size
                if w > target_w:
                    new_h = int(h * (target_w / w))
                    im = im.resize((target_w, new_h))
                resized_chunk.append(im)

            chunk = resized_chunk

            t0 = time.perf_counter()
            content = [{"type": "image", "image": im} for im in chunk]
            t1 = time.perf_counter()
            print(f"[{ts()}] step2 build_image_content: {(t1 - t0)*1000:.1f} ms")

            t0 = time.perf_counter()
            prompt = (
                f"These are frames sampled at 1 FPS from {start_s}s to {end_s}s.\n"
                "Write a concise 2–3 sentence summary in English.\n"
                "Do NOT output any reasoning, analysis, chain-of-thought, or step-by-step thinking.\n"
                "Only output the final summary text.\n"
                "If a new character appears, briefly describe appearance and actions.\n"
                "If the character already appeared, refer to the prior description.\n"
                "Keep it under 70 words."
            )
            if prev_summary:
                prompt += f"\n\nPrevious chunk summary (context): {prev_summary}"

            content.append({"type": "text", "text": prompt})
            messages = [{"role": "user", "content": content}]
            t1 = time.perf_counter()
            print(f"[{ts()}] step3 build_prompt_messages: {(t1 - t0)*1000:.1f} ms")

            t0 = time.perf_counter()
            inputs = processor.apply_chat_template(
                messages,
                tokenize=True,
                add_generation_prompt=True,
                return_dict=True,
                return_tensors="pt",
            )
            t1 = time.perf_counter()
            print(f"[{ts()}] step4 apply_chat_template(tokenize): {(t1 - t0)*1000:.1f} ms")

            t0 = time.perf_counter()
            inputs = {k: v.to(model.device) for k, v in inputs.items()}
            sync()
            t1 = time.perf_counter()
            print(f"[{ts()}] step5 to_device(+sync): {(t1 - t0)*1000:.1f} ms")

            t0 = time.perf_counter()
            sync()
            with torch.inference_mode():
                out_ids = model.generate(
                    **inputs,
                    max_new_tokens=max_new_tokens,
                    do_sample=False,
                    num_beams=1,
                    repetition_penalty=1.05,
                )
            sync()
            t1 = time.perf_counter()
            print(f"[{ts()}] step6 generate(+sync): {(t1 - t0):.3f} s")

            t0 = time.perf_counter()
            gen_ids = out_ids[:, inputs["input_ids"].shape[1]:]
            text = processor.batch_decode(
                gen_ids,
                skip_special_tokens=True,
                clean_up_tokenization_spaces=False
            )[0].strip()
            t1 = time.perf_counter()
            print(f"[{ts()}] step7 decode: {(t1 - t0)*1000:.1f} ms")

            print(f"\n=== Chunk {start_s}s ~ {end_s}s RESULT ===")
            print(text)

            prev_summary = text
            chunk_buffer = []

            t_chunk1 = time.perf_counter()
            print(f"[{ts()}] === Chunk total: {(t_chunk1 - t_chunk0):.3f} s ===")

    if chunk_buffer:
        t_chunk0 = time.perf_counter()
        start_s = current_frame_idx - len(chunk_buffer)
        end_s = current_frame_idx - 1
        print(f"\n[{ts()}] === Final Chunk {start_s}s ~ {end_s}s START ===")

        chunk = chunk_buffer

        show_thumb_grid(chunk, scale=0.10, cols=4)

        resized_chunk = []
        target_w = 512
        for im in chunk:
            w, h = im.size
            if w > target_w:
                new_h = int(h * (target_w / w))
                im = im.resize((target_w, new_h))
            resized_chunk.append(im)

        chunk = resized_chunk

        t0 = time.perf_counter()
        content = [{"type": "image", "image": im} for im in chunk]
        t1 = time.perf_counter()
        print(f"[{ts()}] step2 build_image_content: {(t1 - t0)*1000:.1f} ms")

        t0 = time.perf_counter()
        prompt = (
            f"These are frames sampled at 1 FPS from {start_s}s to {end_s}s.\n"
            "Write a concise 2–3 sentence summary in English.\n"
            "Do NOT output any reasoning, analysis, chain-of-thought, or step-by-step thinking.\n"
            "Only output the final summary text.\n"
            "If a new character appears, briefly describe appearance and actions.\n"
            "If the character already appeared, refer to the prior description.\n"
            "Keep it under 70 words."
        )
        if prev_summary:
            prompt += f"\n\nPrevious chunk summary (context): {prev_summary}"

        content.append({"type": "text", "text": prompt})
        messages = [{"role": "user", "content": content}]
        t1 = time.perf_counter()
        print(f"[{ts()}] step3 build_prompt_messages: {(t1 - t0)*1000:.1f} ms")

        t0 = time.perf_counter()
        inputs = processor.apply_chat_template(
            messages,
            tokenize=True,
            add_generation_prompt=True,
            return_dict=True,
            return_tensors="pt",
        )
        t1 = time.perf_counter()
        print(f"[{ts()}] step4 apply_chat_template(tokenize): {(t1 - t0)*1000:.1f} ms")

        t0 = time.perf_counter()
        inputs = {k: v.to(model.device) for k, v in inputs.items()}
        sync()
        t1 = time.perf_counter()
        print(f"[{ts()}] step5 to_device(+sync): {(t1 - t0)*1000:.1f} ms")

        t0 = time.perf_counter()
        sync()
        with torch.inference_mode():
            out_ids = model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                do_sample=False,
                num_beams=1,
                repetition_penalty=1.05,
            )
        sync()
        t1 = time.perf_counter()
        print(f"[{ts()}] step6 generate(+sync): {(t1 - t0):.3f} s")

        t0 = time.perf_counter()
        gen_ids = out_ids[:, inputs["input_ids"].shape[1]:]
        text = processor.batch_decode(
            gen_ids,
            skip_special_tokens=True,
            clean_up_tokenization_spaces=False
        )[0].strip()
        t1 = time.perf_counter()
        print(f"[{ts()}] step7 decode: {(t1 - t0)*1000:.1f} ms")

        print(f"\n=== Final Chunk {start_s}s ~ {end_s}s RESULT ===")
        print(text)

        prev_summary = text

        t_chunk1 = time.perf_counter()
        print(f"[{ts()}] === Final Chunk total: {(t_chunk1 - t_chunk0):.3f} s ===")

    t_all1 = time.perf_counter()
    print(f"\n[{ts()}] End total. Total elapsed: {(t_all1 - t_all0):.3f} s")

# --- Execution Code ---
video_path = "sample02.mp4"  # 700mb 파일 제너레이터 코드 실행 테스트
chunk_size = 8
max_new_tokens = 160

frames_gen = frame_generator(video_path)
process_video_frames(frames_gen, chunk_size, max_new_tokens, processor, model)


Output hidden; open in https://colab.research.google.com to view.

In [None]:
"""
영상 크기가 큰 경우(700mb 이상) ram 절약하기 위해 개선한 코드임

코드 수정한 프롬프트(제미나이 2.5 플래시):
마지막 셀 코드에 리스트에 frames을 리스트 저장이 아닌 yield 하고, 아래 for문도 generator를 인자값으로 받아서 실행할 수 있게 함수화 및 실행 코드로 바꿔줘, ram 절약 관점에서만 개선해주는 "리팩토링"이 되어야 해
"""
import cv2
from PIL import Image
from IPython.display import display
import time
from datetime import datetime
import torch

# Helper functions
def ts():
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def sync():
    if torch.cuda.is_available():
        torch.cuda.synchronize()


def frame_generator(video_path, max_seconds=None):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"Failed to open video: {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration_sec = int(total_frames / fps) if fps and fps > 0 else None

    sec = 0
    while True:
        if max_seconds is not None and sec >= max_seconds:
            break
        if duration_sec is not None and sec >= duration_sec:
            break

        cap.set(cv2.CAP_PROP_POS_MSEC, sec * 1000.0)
        ok, bgr = cap.read()
        if not ok:
            break

        rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
        yield Image.fromarray(rgb)
        sec += 1
    cap.release()

def process_video_frames(frame_gen, chunk_size, max_new_tokens, processor, model):
    prev_summary = None
    t_all0 = time.perf_counter()
    print(f"[{ts()}] Start total")

    chunk_buffer = []
    current_frame_idx = 0

    for frame in frame_gen:
        chunk_buffer.append(frame)
        current_frame_idx += 1

        if len(chunk_buffer) == chunk_size:
            t_chunk0 = time.perf_counter()
            start_s = current_frame_idx - chunk_size
            end_s = current_frame_idx - 1
            print(f"\n[{ts()}] === Chunk {start_s}s ~ {end_s}s START ===")

            chunk = chunk_buffer

            show_thumb_grid(chunk, scale=0.10, cols=4)

            resized_chunk = []
            target_w = 512
            for im in chunk:
                w, h = im.size
                if w > target_w:
                    new_h = int(h * (target_w / w))
                    im = im.resize((target_w, new_h))
                resized_chunk.append(im)

            chunk = resized_chunk

            t0 = time.perf_counter()
            content = [{"type": "image", "image": im} for im in chunk]
            t1 = time.perf_counter()
            print(f"[{ts()}] step2 build_image_content: {(t1 - t0)*1000:.1f} ms")

            t0 = time.perf_counter()
            prompt = (
                f"These are frames sampled at 1 FPS from {start_s}s to {end_s}s.\n"
                "Write a concise 2–3 sentence summary in English.\n"
                "Do NOT output any reasoning, analysis, chain-of-thought, or step-by-step thinking.\n"
                "Only output the final summary text.\n"
                "If a new character appears, briefly describe appearance and actions.\n"
                "If the character already appeared, refer to the prior description.\n"
                "Keep it under 70 words."
            )
            if prev_summary:
                prompt += f"\n\nPrevious chunk summary (context): {prev_summary}"

            content.append({"type": "text", "text": prompt})
            messages = [{"role": "user", "content": content}]
            t1 = time.perf_counter()
            print(f"[{ts()}] step3 build_prompt_messages: {(t1 - t0)*1000:.1f} ms")

            t0 = time.perf_counter()
            inputs = processor.apply_chat_template(
                messages,
                tokenize=True,
                add_generation_prompt=True,
                return_dict=True,
                return_tensors="pt",
            )
            t1 = time.perf_counter()
            print(f"[{ts()}] step4 apply_chat_template(tokenize): {(t1 - t0)*1000:.1f} ms")

            t0 = time.perf_counter()
            inputs = {k: v.to(model.device) for k, v in inputs.items()}
            sync()
            t1 = time.perf_counter()
            print(f"[{ts()}] step5 to_device(+sync): {(t1 - t0)*1000:.1f} ms")

            t0 = time.perf_counter()
            sync()
            with torch.inference_mode():
                out_ids = model.generate(
                    **inputs,
                    max_new_tokens=max_new_tokens,
                    do_sample=False,
                    num_beams=1,
                    repetition_penalty=1.05,
                )
            sync()
            t1 = time.perf_counter()
            print(f"[{ts()}] step6 generate(+sync): {(t1 - t0):.3f} s")

            t0 = time.perf_counter()
            gen_ids = out_ids[:, inputs["input_ids"].shape[1]:]
            text = processor.batch_decode(
                gen_ids,
                skip_special_tokens=True,
                clean_up_tokenization_spaces=False
            )[0].strip()
            t1 = time.perf_counter()
            print(f"[{ts()}] step7 decode: {(t1 - t0)*1000:.1f} ms")

            print(f"\n=== Chunk {start_s}s ~ {end_s}s RESULT ===")
            print(text)

            prev_summary = text
            chunk_buffer = []

            t_chunk1 = time.perf_counter()
            print(f"[{ts()}] === Chunk total: {(t_chunk1 - t_chunk0):.3f} s ===")

    if chunk_buffer:
        t_chunk0 = time.perf_counter()
        start_s = current_frame_idx - len(chunk_buffer)
        end_s = current_frame_idx - 1
        print(f"\n[{ts()}] === Final Chunk {start_s}s ~ {end_s}s START ===")

        chunk = chunk_buffer

        show_thumb_grid(chunk, scale=0.10, cols=4)

        resized_chunk = []
        target_w = 512
        for im in chunk:
            w, h = im.size
            if w > target_w:
                new_h = int(h * (target_w / w))
                im = im.resize((target_w, new_h))
            resized_chunk.append(im)

        chunk = resized_chunk

        t0 = time.perf_counter()
        content = [{"type": "image", "image": im} for im in chunk]
        t1 = time.perf_counter()
        print(f"[{ts()}] step2 build_image_content: {(t1 - t0)*1000:.1f} ms")

        t0 = time.perf_counter()
        prompt = (
            f"These are frames sampled at 1 FPS from {start_s}s to {end_s}s.\n"
            "Write a concise 2–3 sentence summary in English.\n"
            "Do NOT output any reasoning, analysis, chain-of-thought, or step-by-step thinking.\n"
            "Only output the final summary text.\n"
            "If a new character appears, briefly describe appearance and actions.\n"
            "If the character already appeared, refer to the prior description.\n"
            "Keep it under 70 words."
        )
        if prev_summary:
            prompt += f"\n\nPrevious chunk summary (context): {prev_summary}"

        content.append({"type": "text", "text": prompt})
        messages = [{"role": "user", "content": content}]
        t1 = time.perf_counter()
        print(f"[{ts()}] step3 build_prompt_messages: {(t1 - t0)*1000:.1f} ms")

        t0 = time.perf_counter()
        inputs = processor.apply_chat_template(
            messages,
            tokenize=True,
            add_generation_prompt=True,
            return_dict=True,
            return_tensors="pt",
        )
        t1 = time.perf_counter()
        print(f"[{ts()}] step4 apply_chat_template(tokenize): {(t1 - t0)*1000:.1f} ms")

        t0 = time.perf_counter()
        inputs = {k: v.to(model.device) for k, v in inputs.items()}
        sync()
        t1 = time.perf_counter()
        print(f"[{ts()}] step5 to_device(+sync): {(t1 - t0)*1000:.1f} ms")

        t0 = time.perf_counter()
        sync()
        with torch.inference_mode():
            out_ids = model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                do_sample=False,
                num_beams=1,
                repetition_penalty=1.05,
            )
        sync()
        t1 = time.perf_counter()
        print(f"[{ts()}] step6 generate(+sync): {(t1 - t0):.3f} s")

        t0 = time.perf_counter()
        gen_ids = out_ids[:, inputs["input_ids"].shape[1]:]
        text = processor.batch_decode(
            gen_ids,
            skip_special_tokens=True,
            clean_up_tokenization_spaces=False
        )[0].strip()
        t1 = time.perf_counter()
        print(f"[{ts()}] step7 decode: {(t1 - t0)*1000:.1f} ms")

        print(f"\n=== Final Chunk {start_s}s ~ {end_s}s RESULT ===")
        print(text)

        prev_summary = text

        t_chunk1 = time.perf_counter()
        print(f"[{ts()}] === Final Chunk total: {(t_chunk1 - t_chunk0):.3f} s ===")

    t_all1 = time.perf_counter()
    print(f"\n[{ts()}] End total. Total elapsed: {(t_all1 - t_all0):.3f} s")

# --- Execution Code ---
video_path = "sample03.mp4"  # 700mb 파일 제너레이터 코드 실행 테스트
chunk_size = 8
max_new_tokens = 160

frames_gen = frame_generator(video_path)
process_video_frames(frames_gen, chunk_size, max_new_tokens, processor, model)


Output hidden; open in https://colab.research.google.com to view.

In [None]:
"""
영상 크기가 큰 경우(700mb 이상) ram 절약하기 위해 개선한 코드임

코드 수정한 프롬프트(제미나이 2.5 플래시):
마지막 셀 코드에 리스트에 frames을 리스트 저장이 아닌 yield 하고, 아래 for문도 generator를 인자값으로 받아서 실행할 수 있게 함수화 및 실행 코드로 바꿔줘, ram 절약 관점에서만 개선해주는 "리팩토링"이 되어야 해
"""
import cv2
from PIL import Image
from IPython.display import display
import time
from datetime import datetime
import torch

# Helper functions
def ts():
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def sync():
    if torch.cuda.is_available():
        torch.cuda.synchronize()


def frame_generator(video_path, max_seconds=None):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"Failed to open video: {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration_sec = int(total_frames / fps) if fps and fps > 0 else None

    sec = 0
    while True:
        if max_seconds is not None and sec >= max_seconds:
            break
        if duration_sec is not None and sec >= duration_sec:
            break

        cap.set(cv2.CAP_PROP_POS_MSEC, sec * 1000.0)
        ok, bgr = cap.read()
        if not ok:
            break

        rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
        yield Image.fromarray(rgb)
        sec += 1
    cap.release()

def process_video_frames(frame_gen, chunk_size, max_new_tokens, processor, model):
    prev_summary = None
    t_all0 = time.perf_counter()
    print(f"[{ts()}] Start total")

    chunk_buffer = []
    current_frame_idx = 0

    for frame in frame_gen:
        chunk_buffer.append(frame)
        current_frame_idx += 1

        if len(chunk_buffer) == chunk_size:
            t_chunk0 = time.perf_counter()
            start_s = current_frame_idx - chunk_size
            end_s = current_frame_idx - 1
            print(f"\n[{ts()}] === Chunk {start_s}s ~ {end_s}s START ===")

            chunk = chunk_buffer

            show_thumb_grid(chunk, scale=0.10, cols=4)

            resized_chunk = []
            target_w = 512
            for im in chunk:
                w, h = im.size
                if w > target_w:
                    new_h = int(h * (target_w / w))
                    im = im.resize((target_w, new_h))
                resized_chunk.append(im)

            chunk = resized_chunk

            t0 = time.perf_counter()
            content = [{"type": "image", "image": im} for im in chunk]
            t1 = time.perf_counter()
            print(f"[{ts()}] step2 build_image_content: {(t1 - t0)*1000:.1f} ms")

            t0 = time.perf_counter()
            prompt = (
                f"These are frames sampled at 1 FPS from {start_s}s to {end_s}s.\n"
                "Write a concise 2–3 sentence summary in English.\n"
                "Do NOT output any reasoning, analysis, chain-of-thought, or step-by-step thinking.\n"
                "Only output the final summary text.\n"
                "If a new character appears, briefly describe appearance and actions.\n"
                "If the character already appeared, refer to the prior description.\n"
                "Keep it under 70 words."
            )
            if prev_summary:
                prompt += f"\n\nPrevious chunk summary (context): {prev_summary}"

            content.append({"type": "text", "text": prompt})
            messages = [{"role": "user", "content": content}]
            t1 = time.perf_counter()
            print(f"[{ts()}] step3 build_prompt_messages: {(t1 - t0)*1000:.1f} ms")

            t0 = time.perf_counter()
            inputs = processor.apply_chat_template(
                messages,
                tokenize=True,
                add_generation_prompt=True,
                return_dict=True,
                return_tensors="pt",
            )
            t1 = time.perf_counter()
            print(f"[{ts()}] step4 apply_chat_template(tokenize): {(t1 - t0)*1000:.1f} ms")

            t0 = time.perf_counter()
            inputs = {k: v.to(model.device) for k, v in inputs.items()}
            sync()
            t1 = time.perf_counter()
            print(f"[{ts()}] step5 to_device(+sync): {(t1 - t0)*1000:.1f} ms")

            t0 = time.perf_counter()
            sync()
            with torch.inference_mode():
                out_ids = model.generate(
                    **inputs,
                    max_new_tokens=max_new_tokens,
                    do_sample=False,
                    num_beams=1,
                    repetition_penalty=1.05,
                )
            sync()
            t1 = time.perf_counter()
            print(f"[{ts()}] step6 generate(+sync): {(t1 - t0):.3f} s")

            t0 = time.perf_counter()
            gen_ids = out_ids[:, inputs["input_ids"].shape[1]:]
            text = processor.batch_decode(
                gen_ids,
                skip_special_tokens=True,
                clean_up_tokenization_spaces=False
            )[0].strip()
            t1 = time.perf_counter()
            print(f"[{ts()}] step7 decode: {(t1 - t0)*1000:.1f} ms")

            print(f"\n=== Chunk {start_s}s ~ {end_s}s RESULT ===")
            print(text)

            prev_summary = text
            chunk_buffer = []

            t_chunk1 = time.perf_counter()
            print(f"[{ts()}] === Chunk total: {(t_chunk1 - t_chunk0):.3f} s ===")

    if chunk_buffer:
        t_chunk0 = time.perf_counter()
        start_s = current_frame_idx - len(chunk_buffer)
        end_s = current_frame_idx - 1
        print(f"\n[{ts()}] === Final Chunk {start_s}s ~ {end_s}s START ===")

        chunk = chunk_buffer

        show_thumb_grid(chunk, scale=0.10, cols=4)

        resized_chunk = []
        target_w = 512
        for im in chunk:
            w, h = im.size
            if w > target_w:
                new_h = int(h * (target_w / w))
                im = im.resize((target_w, new_h))
            resized_chunk.append(im)

        chunk = resized_chunk

        t0 = time.perf_counter()
        content = [{"type": "image", "image": im} for im in chunk]
        t1 = time.perf_counter()
        print(f"[{ts()}] step2 build_image_content: {(t1 - t0)*1000:.1f} ms")

        t0 = time.perf_counter()
        prompt = (
            f"These are frames sampled at 1 FPS from {start_s}s to {end_s}s.\n"
            "Write a concise 2–3 sentence summary in English.\n"
            "Do NOT output any reasoning, analysis, chain-of-thought, or step-by-step thinking.\n"
            "Only output the final summary text.\n"
            "If a new character appears, briefly describe appearance and actions.\n"
            "If the character already appeared, refer to the prior description.\n"
            "Keep it under 70 words."
        )
        if prev_summary:
            prompt += f"\n\nPrevious chunk summary (context): {prev_summary}"

        content.append({"type": "text", "text": prompt})
        messages = [{"role": "user", "content": content}]
        t1 = time.perf_counter()
        print(f"[{ts()}] step3 build_prompt_messages: {(t1 - t0)*1000:.1f} ms")

        t0 = time.perf_counter()
        inputs = processor.apply_chat_template(
            messages,
            tokenize=True,
            add_generation_prompt=True,
            return_dict=True,
            return_tensors="pt",
        )
        t1 = time.perf_counter()
        print(f"[{ts()}] step4 apply_chat_template(tokenize): {(t1 - t0)*1000:.1f} ms")

        t0 = time.perf_counter()
        inputs = {k: v.to(model.device) for k, v in inputs.items()}
        sync()
        t1 = time.perf_counter()
        print(f"[{ts()}] step5 to_device(+sync): {(t1 - t0)*1000:.1f} ms")

        t0 = time.perf_counter()
        sync()
        with torch.inference_mode():
            out_ids = model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                do_sample=False,
                num_beams=1,
                repetition_penalty=1.05,
            )
        sync()
        t1 = time.perf_counter()
        print(f"[{ts()}] step6 generate(+sync): {(t1 - t0):.3f} s")

        t0 = time.perf_counter()
        gen_ids = out_ids[:, inputs["input_ids"].shape[1]:]
        text = processor.batch_decode(
            gen_ids,
            skip_special_tokens=True,
            clean_up_tokenization_spaces=False
        )[0].strip()
        t1 = time.perf_counter()
        print(f"[{ts()}] step7 decode: {(t1 - t0)*1000:.1f} ms")

        print(f"\n=== Final Chunk {start_s}s ~ {end_s}s RESULT ===")
        print(text)

        prev_summary = text

        t_chunk1 = time.perf_counter()
        print(f"[{ts()}] === Final Chunk total: {(t_chunk1 - t_chunk0):.3f} s ===")

    t_all1 = time.perf_counter()
    print(f"\n[{ts()}] End total. Total elapsed: {(t_all1 - t_all0):.3f} s")

# --- Execution Code ---
video_path = "sample04.mp4"  # 700mb 파일 제너레이터 코드 실행 테스트
chunk_size = 8
max_new_tokens = 160

frames_gen = frame_generator(video_path)
process_video_frames(frames_gen, chunk_size, max_new_tokens, processor, model)


Output hidden; open in https://colab.research.google.com to view.