Check cuda version

In [1]:
!nvidia-smi

Tue Mar 25 16:52:42 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   45C    P8             12W /   70W |       0MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

Install packages

In [2]:
%pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu124
%pip install yt-dlp datasets transformers pyav
%pip install -U flash-attn --no-build-isolation

Looking in indexes: https://download.pytorch.org/whl/cu124


Import packages

In [3]:
import gc
import time
import os
import torch
import yt_dlp
from datasets import load_dataset
from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor

Clear GPU memory

In [4]:
def clear_memory():
    # Delete variables if they exist in the current global scope
    if "inputs" in globals():
        del globals()["inputs"]
    if "model" in globals():
        del globals()["model"]
    if "processor" in globals():
        del globals()["processor"]
    if "trainer" in globals():
        del globals()["trainer"]
    if "peft_model" in globals():
        del globals()["peft_model"]
    if "bnb_config" in globals():
        del globals()["bnb_config"]
    time.sleep(2)

    # Garbage collection and clearing CUDA memory
    gc.collect()
    time.sleep(2)
    torch.cuda.empty_cache()
    torch.cuda.synchronize()
    time.sleep(2)
    gc.collect()
    time.sleep(2)

    print(f"GPU allocated memory: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
    print(f"GPU reserved memory: {torch.cuda.memory_reserved() / 1024**3:.2f} GB")

clear_memory()

GPU allocated memory: 0.00 GB
GPU reserved memory: 0.00 GB


Load dataset and model

In [None]:
dataset = load_dataset("lmms-lab/AISG_Challenge")
device = "cuda" if torch.cuda.is_available() else "cpu"
model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
    "Qwen/Qwen2.5-VL-3B-Instruct",
    torch_dtype=torch.float16,
    attn_implementation="flash_attention_2",
    device_map=device)
processor = AutoProcessor.from_pretrained("Qwen/Qwen2.5-VL-3B-Instruct")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Download video function

In [30]:
def download_video(youtube_url):
    filename = f"{youtube_url.split('/')[-1]}.mp4"

    if (not os.path.exists("./videos")):
        os.mkdir("./videos")

    video_path = f"./videos/{filename}"

    if (not os.path.exists(video_path)):
        ydl_opts = {
            'format': 'best',
            'outtmpl': video_path,
            'nooverwrites': False
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(youtube_url)
            title = info.get("title", "Unknown Title")
    else:
        ydl_opts = {
            'quiet': True
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(youtube_url, download=False)
            title = info.get("title", "Unknown Title")

    return video_path, title

Process sample

In [31]:
def process_test_case(example):
    video_url = example["youtube_url"]
    question = example["question"]
    question_prompt = example["question_prompt"]
    expected_answer = example["answer"]
    video_path, title = download_video(video_url)

    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "text", "text": f"Summarize the video in details."},
                {"type": "video", "path": video_path},
            ],
        },
    ]

    inputs = processor.apply_chat_template(
            conversation,
            video_fps=1,
            add_generation_prompt=True,
            tokenize=True,
            return_dict=True,
            return_tensors="pt"
        ).to(model.device)

    output_ids = model.generate(**inputs, max_new_tokens=128)
    generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(inputs.input_ids, output_ids)]
    video_summary = processor.batch_decode(generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True)[0]

    conversation = [
        {
            "role": "user",
            "content": [
                    {"type": "text", "text": f"Video title: {title}\nVideo summary: {video_summary}\nQuestion:\n{question}\n{question_prompt}"},
                    {"type": "video", "path": video_path},
                ],
        },
    ]

    inputs = processor.apply_chat_template(
            conversation,
            video_fps=1,
            add_generation_prompt=True,
            tokenize=True,
            return_dict=True,
            return_tensors="pt"
        ).to(model.device)

    output_ids = model.generate(**inputs, max_new_tokens=128)
    generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(inputs.input_ids, output_ids)]
    generated_answer = processor.batch_decode(generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True)[0]

    print(f"Video URL: {video_url}")
    print(f"Question:\n{question}\n{question_prompt}")
    print(f"Answer: {generated_answer}")

Run a test case

In [32]:
start_time = time.time()
sample = dataset['test'].filter(lambda x: x['qid'] == "0008-0")[0]
process_test_case(sample)
end_time = time.time()
print(f"Time taken: {end_time - start_time} seconds")

ImportError: You chose backend=pyav for loading the video but the required library is not found in your environment Make sure to install pyav before loading the video.