In [4]:
import torch
from torch.utils.data import Dataset
from transformers import AutoProcessor
from PIL import Image
import os
import json

class Qwen2VLDataset(Dataset):
    def __init__(self, data_file, processor, image_size=(360, 420)):
        """
        Initialize the dataset.
        Args:
        - data_file (str): Path to the JSON file with your dataset
        - processor (AutoProcessor): HuggingFace processor for tokenizing
        - image_size (tuple): Resize images to this size (default is (360, 420))
        """
        with open(data_file, 'r') as f:
            self.data = json.load(f)  # Load dataset from JSON file

        self.processor = processor
        self.image_size = image_size  # Resize image to fit within limits

    def __len__(self):
        """Returns the total number of samples in the dataset."""
        return len(self.data)

    def __getitem__(self, idx):
        """
        Args:
            idx (int): Index for the dataset.
        
        Returns:
            dict: Tokenized input and labels
        """
        item = self.data[idx]
        messages = item['messages']
        
        # Extract user input (image/video + prompt)
        user_message = messages[0]
        content = user_message['content']
        image_or_video = content[0]  # Image or video input
        text_prompt = content[1]['text']  # The text prompt (question to describe the media)

        # Process image/video input
        if image_or_video['type'] == 'image':
            image_path = image_or_video['image']
            image = Image.open(image_path).convert("RGB")
            image = image.resize(self.image_size)  # Resize to match model limits
            image_input = self.processor(images=image, return_tensors="pt").pixel_values
        elif image_or_video['type'] == 'video':
            video_path = image_or_video['image']
            # Handle video processing logic (optional)
            # For simplicity, assume we're using one frame from the video or use a special processor
            video_input = self.processor(video=video_path, return_tensors="pt").pixel_values
            image_input = video_input  # You can decide how to use the video frames
            
        # Prepare the text (description from assistant)
        assistant_message = messages[1]
        assistant_content = assistant_message['content']
        text_description = assistant_content  # Target text

        # Tokenize the inputs (text + image/video)
        inputs = self.processor(
            text=[text_prompt], 
            images=image_input, 
            padding=True, 
            return_tensors="pt"
        )

        # Labels (text output)
        labels = self.processor.batch_encode_plus(
            [text_description], 
            return_tensors="pt", 
            padding=True
        ).input_ids

        # Move tensors to device (use GPU if available)
        inputs = {key: value.squeeze().to("cuda") for key, value in inputs.items()}
        labels = labels.squeeze().to("cuda")

        return {'input_ids': inputs['input_ids'], 'attention_mask': inputs['attention_mask'], 'labels': labels}

# Example Usage:
# Initialize the processor and dataset
model_name = "Qwen/Qwen2-VL-2B-Instruct"
processor = AutoProcessor.from_pretrained(model_name)
train_dataset = Qwen2VLDataset(data_file="qwen2_dataset.json", processor=processor)

# Check the first sample in the dataset
sample = train_dataset[0]
print(sample)

RuntimeError: Failed to import transformers.models.qwen2_vl.processing_qwen2_vl because of the following error (look up to see its traceback):
No module named 'transformers.models.qwen2_vl.processing_qwen2_vl'

In [None]:
training_args = TrainingArguments(
    output_dir="./qwen2vl-lora",
    per_device_train_batch_size=1,
    gradient_accumulation_steps=4,
    logging_steps=10,
    save_steps=100,
    num_train_epochs=3,
    fp16=True,
    save_total_limit=2,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=your_dataset,
    tokenizer=processor,
)
trainer.train()


In [13]:
messages

[{'role': 'user',
  'content': [{'type': 'video',
    'image': 'C:/Users/kmano/Downloads/nn.png',
    'max_pixels': 151200,
    'fps': 1.0},
   {'type': 'text',
    'text': 'Consider yourself as a airforce pilot who is operating a drone at this moment, explain this event.'}]}]

In [14]:
text = processor.apply_chat_template(
    messages, tokenize=False, add_generation_prompt=True
)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
    text=[text],
    images=image_inputs,
    videos=video_inputs,
    padding=True,
    return_tensors="pt",
)
inputs = inputs.to("cuda")
generated_ids = model.generate(**inputs, max_new_tokens=512)
generated_ids_trimmed = [
    out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = processor.batch_decode(
    generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
print("".join(output_text))

As an airforce pilot operating a drone, you are currently navigating a snowy urban environment. The drone is equipped with a traffic lights manager system, which is currently in the process of adjusting the traffic lights at a roundabout. The system is using a timed traffic lights manager to ensure that vehicles are moving smoothly and safely through the intersection.

The roundabout is marked with a red circle, indicating that the traffic lights are currently in the red phase. The drone is monitoring the traffic lights and adjusting them accordingly to maintain order and prevent any accidents. The system is designed to adapt to changing traffic conditions and ensure that the roads are clear and safe for all vehicles.

As an airforce pilot, your primary responsibility is to ensure the safety and efficiency of your drone operations. By monitoring the traffic lights and adjusting them as necessary, you are helping to maintain a smooth flow of traffic and prevent any accidents.


In [10]:
print("".join(output_text))

As an airforce pilot maneuvering a drone, you are currently at a crossroads with a traffic light that is currently red. The traffic light is positioned in the center of the intersection, and there are no vehicles or pedestrians visible in the immediate vicinity. The traffic light is likely controlling the flow of traffic at this intersection, ensuring that vehicles and pedestrians can safely navigate the intersection.

In this scenario, you would need to monitor the traffic light closely and adjust your drone's position accordingly to maintain a safe distance from the traffic light. You would need to ensure that your drone is not obstructing the traffic light or causing any delays for vehicles and pedestrians. Additionally, you would need to be aware of any changes in the traffic light's signal, such as a change to green or yellow, and adjust your drone's position accordingly to maintain a safe distance from the intersection.


In [11]:
torch.cuda.empty_cache()