# Qwen2.5-VL

Source: https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct

In [1]:
from transformers import Qwen2_5_VLForConditionalGeneration, AutoTokenizer, AutoProcessor
from qwen_vl_utils import process_vision_info
import torch

# default: Load the model on the available device(s)
# model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
#     "Qwen/Qwen2.5-VL-7B-Instruct", torch_dtype="auto", device_map="auto"
# )

# We recommend enabling flash_attention_2 for better acceleration and memory saving, especially in multi-image and video scenarios.
model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
    "Qwen/Qwen2.5-VL-7B-Instruct",
    torch_dtype=torch.bfloat16,
    attn_implementation="flash_attention_2",
    device_map="cuda:0",
)

# default processer
processor = AutoProcessor.from_pretrained("Qwen/Qwen2.5-VL-7B-Instruct", use_fast=True)

  from .autonotebook import tqdm as notebook_tqdm
Loading checkpoint shards: 100%|██████████| 5/5 [00:12<00:00,  2.59s/it]
You have video processor config saved in `preprocessor.json` file which is deprecated. Video processor configs should be saved in their own `video_preprocessor.json` file. You can rename the file or load and save the processor back which renames it automatically. Loading from `preprocessor.json` will be removed in v5.0.


In [6]:
messages = [
    {
        "role": "user",
        "content": [
            {
                "type": "image",
                "image": "file://data/scan.jpg",
            },
            {"type": "text", "text": "Transcribe the handwritten text in this picture."},
        ],
    }
]

# Preparation for inference
text = processor.apply_chat_template(
    messages, tokenize=False, add_generation_prompt=True
)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
    text=[text],
    images=image_inputs,
    videos=video_inputs,
    padding=True,
    return_tensors="pt",
)
inputs = inputs.to("cuda")

# Inference: Generation of the output
generated_ids = model.generate(**inputs, max_new_tokens=128)
generated_ids_trimmed = [
    out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = processor.batch_decode(
    generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
print(output_text)

  return F.conv3d(


["Une lettre, écrite à la main\n\nCette lettre ressemblera à un petit roman, un poème en prose, ou je ne sais pas. Ce que je sais, c'est que je parle suffisamment de jour pour très certainement ne pas avoir besoin de t'égailler la nuit. Mais comme tu aimes lire et qu'il arrive parfois que tu aimes m'écouter et me lire, je me suis dit que j'allais t'écrire. Je n'ai pas d'idée précise au départ. Tu auras donc devant toi : moi, juste moi et un fil sans fin de"]


In [8]:
[[{},{}] for p in [1,2,3,4]]

[[{}, {}], [{}, {}], [{}, {}], [{}, {}]]

In [16]:
from os import listdir 
from os.path import join 

image_folder = "data/insect"
system_prompt = ("You are a classification model. "
                "You only gives two possible outputs: "
                "'Yes' if the input image contains an insect visiting the "
                "flower. 'No' if the input image does not contain an insect "
                "or if the insect in the image is not on the flower. "
                "Be careful: say 'No' if the insect is not ON the flower "
                "but next to it. Say 'Yes' if the insect is hiding behind the flower. Some insect are small and hides.")
messages = [
    [{
        "role": "system",
        "content": system_prompt,
    },
    {
        "role": "user",
        "content": [
            {"type": "image", "image": f"file://{join(image_folder, p)}"}
        ],
    }] for p in sorted(listdir(image_folder))
]

# Preparation for inference
texts = [
    processor.apply_chat_template(msg, tokenize=False, add_generation_prompt=True)
    for msg in messages
]
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
    text=texts,
    images=image_inputs,
    videos=video_inputs,
    padding=True,
    return_tensors="pt",
)
inputs = inputs.to("cuda")

# Batch Inference
generated_ids = model.generate(**inputs, max_new_tokens=128)
generated_ids_trimmed = [
    out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_texts = processor.batch_decode(
    generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
print(output_texts)

['No', 'No', 'No', 'No', 'Yes', 'Yes', 'Yes', 'No']


In [20]:
for i,o in zip(sorted(listdir(image_folder)), output_texts):
    print(i,o)

WSCT9000.JPG No
WSCT9001.JPG No
WSCT9002.JPG No
WSCT9003.JPG No
WSCT9005.JPG Yes
WSCT9018.JPG Yes
WSCT9029.JPG Yes
WSCT9045.JPG No


In [None]:
from transformers import TextIteratorStreamer
from qwen_vl_utils import process_vision_info
import os
from threading import Thread

# --- The Main Chat Interface Class for API Usage ---
class QwenVLChat:
    """
    A versatile chat class for Qwen2.5-VL that supports both blocking (API-style)
    and streaming (generator-style) responses.
    """

    def __init__(self, model, processor, system_prompt=None):
        self.model = model
        self.processor = processor
        self.history = []
        self.system_msg=None
        if system_prompt is not None and isinstance(system_prompt, str):
            self.system_msg={"role": "system", "content": system_prompt}
        self.device = model.device
        print("QwenVLChat initialized. Ready to chat.")

    def _prepare_inputs_history(self, prompt: str = None, image_path: str = None):
        """Internal method to prepare model inputs and update history."""
        # Eventually add system prompt
        msg = []
        if self.system_msg is not None:
            msg.append(self.system_msg)

        # Create the user message content
        content = []
        if image_path: # TODO: adapt to URLs
            if not os.path.exists(image_path):
                print(f"Warning: Image path not found: {image_path}. Ignoring image.")
            else:
                content.append({"type": "image", "image": f"file://{image_path}"})
        if prompt is not None:
            content.append({"type": "text", "text": prompt})

        # Append the new user message to the conversation history
        msg.append({"role": "user", "content": content})

        # Append to the history
        self.history.append(msg)

        # Process the entire history for the model
        texts = [self.processor.apply_chat_template(
            msg, tokenize=False, add_generation_prompt=True
        ) for msg in self.history]
        image_inputs, video_inputs = process_vision_info(self.history)
        
        inputs = self.processor(
            text=texts,
            images=image_inputs,
            videos=video_inputs,
            padding=True,
            return_tensors="pt",
        ).to(self.device)
        
        return inputs
    
    def _prepare_inputs(self, prompt: str = None, image_path: str = None):
        """Internal method to prepare model inputs and update history."""
        # Eventually add system prompt
        msg = []
        if self.system_msg is not None:
            msg.append(self.system_msg)

        # Create the user message content
        content = []
        if image_path: # TODO: adapt to URLs
            if not os.path.exists(image_path):
                print(f"Warning: Image path not found: {image_path}. Ignoring image.")
            else:
                content.append({"type": "image", "image": f"file://{image_path}"})
        if prompt is not None:
            content.append({"type": "text", "text": prompt})

        # Append the new user message to the conversation history
        msg.append({"role": "user", "content": content})

        # Process the entire history for the model
        texts = [self.processor.apply_chat_template(
            msg, tokenize=False, add_generation_prompt=True
        )]
        image_inputs, video_inputs = process_vision_info(msg)
        
        inputs = self.processor(
            text=texts,
            images=image_inputs,
            videos=video_inputs,
            padding=True,
            return_tensors="pt",
        ).to(self.device)
        
        return inputs

    def generate_response(self, prompt: str = None, image_path: str = None, generation_args: dict = None) -> str:
        """
        Generates a complete, blocking response. This is the most robust method
        and handles all cases, including image-only inputs.
        It is not limited by token count and generates until the model stops.
        
        Args:
            prompt (str, optional): The text prompt.
            image_path (str, optional): The path to the image file.
            generation_args (dict, optional): Override generation parameters.
        
        Returns:
            str: The full text response from the model.
        """
        print(self.history)
        inputs = self._prepare_inputs(prompt, image_path)

        # Set a high max_new_tokens to allow for long responses, mimicking "unlimited" generation.
        # The model will stop on its own when it generates an EOS token.
        gen_kwargs = {
            "max_new_tokens": 4096,
            "do_sample": True,
            "temperature": 0.7,
            "top_p": 0.9,
            **(generation_args or {})
        }

        # Generate the token IDs
        generated_ids = self.model.generate(**inputs, **gen_kwargs)
        # generated_ids = self.model.generate(**inputs, max_new_tokens=128)

        # Trim the input tokens from the generated output
        generated_ids_trimmed = [
            out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
        ]

        # Decode the trimmed token IDs to get the final text
        # We access [0] because batch_decode returns a list
        response_text = processor.batch_decode(
            generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
        )
        
        # Now that generation is successful, update the history
        self.history.append({"role": "assistant", "content": response_text})
        
        return response_text

    def chat(self, prompt: str = None, image_path: str = None, generation_args: dict = None) -> str:
        """
        A simple, blocking chat method.

        Args:
            prompt (str): The text prompt from the user.
            image_path (str, optional): The path to the image file.
            generation_args (dict, optional): Arguments for model.generate().

        Returns:
            str: The complete response from the model.
        """
        # Simply call the streaming method and concatenate the results.
        assert prompt is not None or image_path is not None, "Please provide either a prompt or an image path."
        response_tokens = []
        for token in self.chat_stream(prompt, image_path, generation_args):
            response_tokens.append(token)
            
        return "".join(response_tokens)

    def chat_stream(self, prompt: str = None, image_path: str = None, generation_args: dict = None):
        """
        A streaming chat method that yields tokens as they are generated.

        Args:
            prompt (str): The text prompt from the user.
            image_path (str, optional): The path to the image file.
            generation_args (dict, optional): Arguments for model.generate().

        Yields:
            str: The next token generated by the model.
        """
        assert prompt is not None or image_path is not None, "Please provide either a prompt or an image path."
        inputs = self._prepare_inputs(prompt, image_path)
        streamer = TextIteratorStreamer(self.processor.tokenizer, skip_prompt=True, skip_special_tokens=True)

        # Default generation arguments
        gen_args = {
            "max_new_tokens": 2048,
            "do_sample": True,
            "temperature": 0.7,
            "top_p": 0.9,
            **inputs,
            "streamer": streamer,
        }
        if generation_args:
            gen_args.update(generation_args)
        
        # Run generation in a separate thread to not block the main thread
        thread = Thread(target=self.model.generate, kwargs=gen_args)
        thread.start()

        # Yield tokens as they become available and build the full response
        full_response = []
        for new_token in streamer:
            yield new_token
            full_response.append(new_token)
        
        thread.join() # Ensure the generation thread is finished

        # After streaming is complete, add the full assistant response to history
        self.history.append({"role": "assistant", "content": "".join(full_response)})

    def get_history(self):
        """Returns the current conversation history."""
        return self.history

    def clear_history(self):
        """Resets the conversation history."""
        self.history = []
        print("Conversation history cleared.")


In [None]:
# --- Initialize Chatbot ---
chatbot = QwenVLChat(model, processor)

# --- Example API Interaction ---
image_file = "data/scan.jpg" # IMPORTANT: Make sure this file exists
if not os.path.exists(image_file):
    print(f"\nERROR: The image '{image_file}' was not found.")
    print("Please download or create a sample image and place it in the 'data' directory.")
    exit()

# --- DEMO 1: Using the simple, blocking `chat()` API ---
print("\n--- DEMO 1: Simple Blocking API Call ---")
prompt1 = "Transcribe the handwritten text in this picture."
print(f"User: {prompt1}")

# The call blocks until the full response is ready
response1 = chatbot.chat(prompt=prompt1, image_path=image_file)

print(f"Assistant: {response1}")
print("----------------------------------------\n")

# --- DEMO 2: Using the `chat_stream()` API for real-time output ---
print("--- DEMO 2: Streaming API Call ---")
prompt2 = "Based on the text you transcribed, please write a short summary."
print(f"User: {prompt2}")
print("Assistant (streaming): ", end="", flush=True)

# The call returns a generator immediately. We loop through it.
full_response_streamed = ""
for token in chatbot.chat_stream(prompt=prompt2):
    print(token, end="", flush=True)
    full_response_streamed += token
print("\n----------------------------------------\n")

# --- DEMO 3: Verify conversation history ---
print("--- DEMO 3: Verifying Conversation History ---")
print("The conversation history is maintained across both blocking and streaming calls.")

# A final blocking call that relies on the context from the previous turns
prompt3 = "What was the very first thing I asked you to do?"
print(f"User: {prompt3}")
response3 = chatbot.chat(prompt=prompt3)
print(f"Assistant: {response3}")
print("----------------------------------------\n")

In [3]:
system_prompt = ("You are a classification model. "
                "You only gives two possible outputs: "
                "'Yes' if the input image contains an insect visiting the "
                "flower. 'No' if the input image does not contain an insect "
                "or if the insect in the image is not on the flower.")
insect_detector = QwenVLChat(model, processor, system_prompt)

QwenVLChat initialized. Ready to chat.


In [5]:
image_file = "data/insect/WSCT9000.JPG"
response1 = insect_detector.generate_response(image_path=image_file)

print(f"Assistant: {response1}")
print("----------------------------------------\n")

[{'role': 'assistant', 'content': ['No']}]
[{'role': 'assistant', 'content': ['No']}]
Assistant: ['No']
----------------------------------------



In [None]:
folder_path = "data/insect"
filenames = 

OOM below:

In [1]:
from transformers import pipeline
import torch

pipe = pipeline("image-text-to-text", model="Qwen/Qwen2.5-VL-7B-Instruct", torch_dtype=torch.bfloat16,
    attn_implementation="flash_attention_2",
    device_map="cuda:0", use_fast=True)
messages = [
    {
        "role": "user",
        "content": [
            {"type": "image", "url": "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/p-blog/candy.JPG"},
            {"type": "text", "text": "What animal is on the candy?"}
        ]
    },
]
pipe(text=messages)

  from .autonotebook import tqdm as notebook_tqdm
Loading checkpoint shards: 100%|██████████| 5/5 [00:04<00:00,  1.04it/s]
Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
You have video processor config saved in `preprocessor.json` file which is deprecated. Video processor configs should be saved in their own `video_preprocessor.json` file. You can rename the file or load and save the processor back which renames it automatically. Loading from `preprocessor.json` will be removed in v5.0.
Device set to use cuda:0


OutOfMemoryError: CUDA out of memory. Tried to allocate 115.33 GiB. GPU 

# InternVL3

Source: https://huggingface.co/OpenGVLab/InternVL3-8B

In [None]:
import torch
from transformers import AutoTokenizer, AutoModel
path = "OpenGVLab/InternVL3-8B"
model = AutoModel.from_pretrained(
    path,
    torch_dtype=torch.bfloat16,
    low_cpu_mem_usage=True,
    use_flash_attn=True,
    trust_remote_code=True).eval().cuda()


In [None]:
import math
import numpy as np
import torch
import torchvision.transforms as T
from decord import VideoReader, cpu
from PIL import Image
from torchvision.transforms.functional import InterpolationMode
from transformers import AutoModel, AutoTokenizer

IMAGENET_MEAN = (0.485, 0.456, 0.406)
IMAGENET_STD = (0.229, 0.224, 0.225)

def build_transform(input_size):
    MEAN, STD = IMAGENET_MEAN, IMAGENET_STD
    transform = T.Compose([
        T.Lambda(lambda img: img.convert('RGB') if img.mode != 'RGB' else img),
        T.Resize((input_size, input_size), interpolation=InterpolationMode.BICUBIC),
        T.ToTensor(),
        T.Normalize(mean=MEAN, std=STD)
    ])
    return transform

def find_closest_aspect_ratio(aspect_ratio, target_ratios, width, height, image_size):
    best_ratio_diff = float('inf')
    best_ratio = (1, 1)
    area = width * height
    for ratio in target_ratios:
        target_aspect_ratio = ratio[0] / ratio[1]
        ratio_diff = abs(aspect_ratio - target_aspect_ratio)
        if ratio_diff < best_ratio_diff:
            best_ratio_diff = ratio_diff
            best_ratio = ratio
        elif ratio_diff == best_ratio_diff:
            if area > 0.5 * image_size * image_size * ratio[0] * ratio[1]:
                best_ratio = ratio
    return best_ratio

def dynamic_preprocess(image, min_num=1, max_num=12, image_size=448, use_thumbnail=False):
    orig_width, orig_height = image.size
    aspect_ratio = orig_width / orig_height

    # calculate the existing image aspect ratio
    target_ratios = set(
        (i, j) for n in range(min_num, max_num + 1) for i in range(1, n + 1) for j in range(1, n + 1) if
        i * j <= max_num and i * j >= min_num)
    target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1])

    # find the closest aspect ratio to the target
    target_aspect_ratio = find_closest_aspect_ratio(
        aspect_ratio, target_ratios, orig_width, orig_height, image_size)

    # calculate the target width and height
    target_width = image_size * target_aspect_ratio[0]
    target_height = image_size * target_aspect_ratio[1]
    blocks = target_aspect_ratio[0] * target_aspect_ratio[1]

    # resize the image
    resized_img = image.resize((target_width, target_height))
    processed_images = []
    for i in range(blocks):
        box = (
            (i % (target_width // image_size)) * image_size,
            (i // (target_width // image_size)) * image_size,
            ((i % (target_width // image_size)) + 1) * image_size,
            ((i // (target_width // image_size)) + 1) * image_size
        )
        # split the image
        split_img = resized_img.crop(box)
        processed_images.append(split_img)
    assert len(processed_images) == blocks
    if use_thumbnail and len(processed_images) != 1:
        thumbnail_img = image.resize((image_size, image_size))
        processed_images.append(thumbnail_img)
    return processed_images

def load_image(image_file, input_size=448, max_num=12):
    image = Image.open(image_file).convert('RGB')
    transform = build_transform(input_size=input_size)
    images = dynamic_preprocess(image, image_size=input_size, use_thumbnail=True, max_num=max_num)
    pixel_values = [transform(image) for image in images]
    pixel_values = torch.stack(pixel_values)
    return pixel_values

def split_model(model_name, world_size=None):
    device_map = {}
    world_size = torch.cuda.device_count() if world_size is None else world_size
    config = AutoConfig.from_pretrained(model_path, trust_remote_code=True)
    num_layers = config.llm_config.num_hidden_layers
    # Since the first GPU will be used for ViT, treat it as half a GPU.
    num_layers_per_gpu = math.ceil(num_layers / (world_size - 0.5))
    num_layers_per_gpu = [num_layers_per_gpu] * world_size
    num_layers_per_gpu[0] = math.ceil(num_layers_per_gpu[0] * 0.5)
    layer_cnt = 0
    for i, num_layer in enumerate(num_layers_per_gpu):
        for j in range(num_layer):
            device_map[f'language_model.model.layers.{layer_cnt}'] = i
            layer_cnt += 1
    device_map['vision_model'] = 0
    device_map['mlp1'] = 0
    device_map['language_model.model.tok_embeddings'] = 0
    device_map['language_model.model.embed_tokens'] = 0
    device_map['language_model.output'] = 0
    device_map['language_model.model.norm'] = 0
    device_map['language_model.model.rotary_emb'] = 0
    device_map['language_model.lm_head'] = 0
    device_map[f'language_model.model.layers.{num_layers - 1}'] = 0

    return device_map

In [None]:
tokenizer = AutoTokenizer.from_pretrained(path, trust_remote_code=True, use_fast=False)

In [None]:
# set the max number of tiles in `max_num`
pixel_values = load_image('data/what.jpg', max_num=12).to(torch.bfloat16).cuda()
generation_config = dict(max_new_tokens=1024, do_sample=True)

# pure-text conversation (纯文本对话)
question = 'Hello, who are you?'
response, history = model.chat(tokenizer, None, question, generation_config, history=None, return_history=True)
print(f'User: {question}\nAssistant: {response}')

In [None]:
question = '<image>\nPlease describe the image shortly.'
response = model.chat(tokenizer, pixel_values, question, generation_config)
print(f'User: {question}\nAssistant: {response}')

In [None]:
pixel_values = load_image('data/scan.jpg', max_num=12).to(torch.bfloat16).cuda()
question = '<image>\nPlease transcribe the handwritten text in this picture.'
response = model.chat(tokenizer, pixel_values, question, generation_config)
print(f'User: {question}\nAssistant: {response}')