In [None]:
!pip install -q -U num2words

# DriveLM

In [None]:
from datasets import load_dataset

drive_lm = load_dataset("MehdiJmlkh/DriveLM")
drive_lm

In [None]:
nuscenes = load_dataset("MehdiJmlkh/nuscenes")
nuscenes

In [None]:
import json
from datasets import Dataset


class DriveLM:
    def __init__(self, drive_lm, nuscenes):
        self.drive_lm = drive_lm
        self.nuscenes = nuscenes

    def __getitem__(self, idx):
      sample = self.drive_lm[idx]
      nuscenes_index = sample["nuscenes_index"]
      sample['images'] = self.nuscenes[nuscenes_index]

      return sample

    def __len__(self):
        return len(self.drive_lm)

dataset = {
    "test": DriveLM(drive_lm["test"], nuscenes["test"])
}

# SmolVLM

In [None]:
import torch
from transformers import AutoProcessor, AutoModelForImageTextToText
from PIL import Image
import num2words
import torch
import torch.nn as nn
from transformers import ViTModel, AutoImageProcessor
from transformers.modeling_outputs import BaseModelOutput
import matplotlib.pyplot as plt
import numpy as np
from functools import partial
from torchvision.transforms import functional as F_transforms
from huggingface_hub import PyTorchModelHubMixin

In [None]:
model_path = "HuggingFaceTB/SmolVLM2-2.2B-Instruct"

processor = AutoProcessor.from_pretrained(model_path)
model = AutoModelForImageTextToText.from_pretrained(
    model_path,
    torch_dtype=torch.bfloat16,
    device_map="cuda",
)

processor.image_processor.max_image_size["longest_edge"]= 384
processor.image_processor.do_image_splitting=False
processor.image_processor.do_resize=True

for param in model.parameters():
    param.requires_grad = False

# Test

In [None]:
from torch.utils.data import Dataset

class VLMQADataset(Dataset):
    def __init__(self, dataset: DriveLM, is_train=True):
        self.dataset = dataset
        self.is_train = is_train

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        item = self.dataset[idx]

        user_msg = []
        cameras = ['CAM_FRONT_LEFT', 'CAM_FRONT', 'CAM_FRONT_RIGHT', 'CAM_BACK_LEFT', 'CAM_BACK', 'CAM_BACK_RIGHT']
        for camera in cameras:
            user_msg.extend([
                {"type": "text", "text": camera},
                {"type": "image", "image": item["images"][camera]}
            ])

        user_msg.append({
            "type": "text",
            "text":f"Scene description:{item['scene_description']} Question: {item['question']}"
        })

        assistant_msg = [{"type": "text", "text": item["answer"]}]

        if not self.is_train:
            conversation = [
                {
                    "role": "user",
                    "content": user_msg
                }
            ]
            return self.__apply_chat_template(conversation, True)


        conversation = [
            {
                "role": "user",
                "content": user_msg
            },
            {
                "role": "assistant",
                "content": assistant_msg
            }
        ]
        inputs = self.__apply_chat_template(conversation)

        label_start_idx = self.__get_label_start_idx(inputs)

        labels = inputs['input_ids'].clone()
        labels[:, :label_start_idx] = -100
        inputs['labels'] = labels

        return inputs

    def __apply_chat_template(self, conversation, add_generation_prompt=False):
        return processor.apply_chat_template(
            conversation,
            add_generation_prompt=add_generation_prompt,
            tokenize=True,
            return_dict=True,
            return_tensors="pt"
        )

    def __get_label_start_idx(self, inputs):
        utterance_id = processor.tokenizer.convert_tokens_to_ids('<end_of_utterance>')
        utterance_idx = inputs['input_ids'][0].tolist().index(utterance_id)
        num_assistant_ids = 4
        label_idx = utterance_idx + num_assistant_ids + 1

        return label_idx


In [None]:
test_dataset = VLMQADataset(dataset["test"], is_train=False)

In [None]:
import matplotlib.pyplot as plt

def display_sample_and_output(index, model):
    sample = dataset["test"][index]

    def display_image(image, title):
        plt.imshow(image)
        plt.axis('off')
        plt.title(title)

    plt.figure(figsize=(20, 8))
    cameras = ['CAM_FRONT_LEFT', 'CAM_FRONT', 'CAM_FRONT_RIGHT', 'CAM_BACK_LEFT', 'CAM_BACK', 'CAM_BACK_RIGHT']
    for i, camera in enumerate(cameras):
        plt.subplot(2, 3, i + 1)
        display_image(sample['images'][camera], camera)
    plt.tight_layout()
    plt.show()

    for key, value in sample.items():
        if type(value) is str:
            print(f"{key}: {value}")

    output = model.generate(**test_dataset[index].to("cuda", dtype=torch.bfloat16), max_new_tokens=32)
    generated_texts = processor.batch_decode(output, skip_special_tokens=True)
    print("model answer: " + generated_texts[0].split("Assistant: ")[-1])

In [None]:
display_sample_and_output(300, model)

In [None]:
from tqdm import tqdm

test_results = []

for index in tqdm(range(0, len(dataset["test"]), 10), desc="Generating predictions"):
    sample = dataset["test"][index]

    output = model.generate(
        **test_dataset[index].to("cuda", dtype=torch.bfloat16),
        max_new_tokens=32
    )
    generated_texts = processor.batch_decode(output, skip_special_tokens=True)
    pred = generated_texts[0].split("Assistant: ")[-1]

    test_results.append({
        "scene_description": sample["scene_description"],
        "nuscenes_index": sample["nuscenes_index"],
        "task": sample["task"],
        "question": sample["question"],
        "answer": sample["answer"],
        "prediction": pred
    })

In [None]:
from datasets import Dataset

hf_dataset = Dataset.from_list(test_results)
hf_dataset.push_to_hub(
    "MehdiJmlkh/SmolDriver-Results",
    private=False,
    commit_message="Fine-tune 11000 steps"
)