In [None]:
import sys
from pathlib import Path
import os

%cd ./nanovlm-lab/
project_root = Path.cwd()

nanovlm_root = project_root / "nanovlm"

# Add paths in the correct order
if str(nanovlm_root) not in sys.path:
    sys.path.insert(0, str(nanovlm_root))  # Add nanoVLM FIRST
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))  # Then add project root

# Change working directory to nanovlm so relative imports work
os.chdir(nanovlm_root)

print(f"Project root: {project_root}")
print(f"NanoVLM root: {nanovlm_root}")
print(f"Current directory: {os.getcwd()}")

/workspace/RL-VLM-Lab
Project root: /workspace/RL-VLM-Lab
NanoVLM root: /workspace/RL-VLM-Lab/nanovlm
Current directory: /workspace/RL-VLM-Lab/nanovlm


In [2]:
from datasets import load_dataset
dataset_id = 'lmms-lab/multimodal-open-r1-8k-verified'
dataset = load_dataset(dataset_id, split='train[:5%]')


In [3]:
# Then convert to RGB
def convert_to_rgb(example):
    image = example["image"]
    if image.mode != "RGB":
        image = image.convert("RGB")
    example["image"] = image
    return example

dataset = dataset.map(convert_to_rgb)

def convert_fn(batch):
    texts_list = []

    for q, ans in zip(batch["problem"], batch["solution"]):

        user_text = (
            "A conversation between User and Assistant. The user asks a question, and the Assistant solves it. The assistant "
            "first thinks about the reasoning process in the mind and then provides the user with the answer. The reasoning "
            "process and answer are enclosed within <think> </think> and <answer> </answer> tags, respectively, i.e., "
            "<think> reasoning process here </think><answer> answer here </answer>"
        )

        assistant_text = f"Answer: {ans}"

        # Single-element list, with a single dictionary containing both messages
        texts_list.append([
            {
                "user": user_text+". "+q,
                "assistant": assistant_text
            }
        ])

    return {"texts": texts_list}
dataset = dataset.map(
    convert_fn,
    batched=True,
    num_proc=4,
)


In [4]:
split_dataset = dataset.train_test_split(test_size=0.2, seed=42)

train_dataset = split_dataset['train']
test_dataset = split_dataset['test']

In [5]:
train_dataset[0]

{'image': <PIL.PngImagePlugin.PngImageFile image mode=RGB size=716x200>,
 'problem': 'Based on the image, determine the constant term after combining all the polynomial expressions representing the side lengths of the triangle. Choose the correct answer from the options provided.\n\nChoices:\nA. 3\nB. 5\nC. 8\nD. 13',
 'solution': "<think>Let's examine the polynomial expressions given for each side of the triangle. The side labeled \\(4x^2 + x\\) does not have a constant term. The side labeled \\(2x + 3\\) has a constant term of 3. The side labeled \\(4x^3 + 2x^2 + 5\\) has a constant term of 5. To find the total constant term, we need to add the constant terms from these expressions. So, we add 3 and 5 together. 3 + 5 = 8</think>\n\n<answer>The correct answer is C</answer>",
 'original_question': 'According to the question shown in the image, please first perform reasoning, then finally select the right answer from the choices, e.g., Answer: xxx.\nQuestion: Based on the image, find th

In [6]:
def extract_prompt(example):
    # safe extraction in case texts is missing or empty
    if example.get('texts') and len(example['texts']) > 0:
        return {'prompt': example['texts'][0].get('user', '')}
    return {'prompt': ''}

train_dataset = train_dataset.map(extract_prompt,num_proc=4)
test_dataset = test_dataset.map(extract_prompt,num_proc=4)

In [7]:
train_dataset = train_dataset.remove_columns(["problem","original_question","texts"])
test_dataset = test_dataset.remove_columns(["problem","original_question","texts"])

In [8]:
from data.datasets import VQADataset
from data.collators import VQACollator
from data.data_utils import synchronized_dataloader_step
from data.advanced_datasets import ConstantLengthDataset
from data.processors import get_image_processor, get_tokenizer

import models.config as config
from models.vision_language_model import VisionLanguageModel

# Libraries
import math
import time
import torch
from tqdm import tqdm
import torch.optim as optim
import matplotlib.pyplot as plt
from dataclasses import dataclass, field
from torch.utils.data import DataLoader
from datasets import load_dataset, concatenate_datasets, get_dataset_config_names

#Otherwise, the tokenizer will throw a warning
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"

if torch.cuda.is_available():
    device = "cuda"
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
    device = "mps"
else:
    device = "cpu"
print(f"Using device: {device}")

torch.manual_seed(0)
torch.cuda.manual_seed_all(0)

# To reload the modules if you change something in the code
%reload_ext autoreload
%autoreload 2

Using device: cuda


In [9]:
from nanovlm.data.processors import get_tokenizer, get_image_processor
from nanovlm.models.vision_language_model import VisionLanguageModel
from rlvlm.nanovlm_grpo_trainer import NanoVLMGRPOTrainer, create_nanovlm_grpo_dataset,NanoVLMGRPOConfig
from datasets import load_dataset
from peft import LoraConfig, get_peft_model

# Load model
model = VisionLanguageModel.from_pretrained("lusxvr/nanoVLM-230M-8k")

# Get processors
tokenizer = get_tokenizer(model.cfg.lm_tokenizer, model.cfg.vlm_extra_tokens, model.cfg.lm_chat_template)
image_processor = get_image_processor(model.cfg.max_img_size, model.cfg.vit_img_size, model.cfg.resize_to_max_side_len)
peft_config = LoraConfig(
    r=16,
    lora_alpha=16,
    lora_dropout=0.1,
    target_modules=['down_proj','o_proj','k_proj','q_proj','gate_proj','up_proj','v_proj'],
    use_dora=False,
    init_lora_weights="gaussian"
)
peft_model = get_peft_model(model, peft_config)
peft_model.print_trainable_parameters()


Resize to max side len: True
trainable params: 4,331,520 || all params: 232,395,456 || trainable%: 1.8639


In [10]:
# Reward functions
import re
from typing import Optional, List
def format_reward(completions: List[str], **kwargs) -> List[float]:
    """
    Reward function that checks if the completion has the correct format.
    
    Expects format: <think>...</think> followed by <answer>...</answer>
    Flexible with whitespace and newlines.
    
    Args:
        completions: List of completion strings
        **kwargs: Additional arguments (ignored)
    
    Returns:
        List of rewards (1.0 for correct format, 0.0 otherwise)
    """
    # Pattern allows flexible whitespace between tags
    pattern = r"<think>.*?</think>\s*<answer>.*?</answer>"
    rewards = []
    
    for content in completions:
        if re.search(pattern, content, re.DOTALL):
            rewards.append(1.0)
        else:
            rewards.append(0.0)
    
    return rewards


def accuracy_reward(
    completions: List[str],
    answer: Optional[str] = None,
    partial_credit: bool = True,
    partial_credit_threshold: float = 0.5,
    **kwargs
) -> List[float]:
    """
    Reward function for answer accuracy using regex matching.
    
    Scoring:
    - 1.0: Answer extracted from <answer> tags matches expected answer exactly
    - 0.5: Expected answer is found within the extracted answer (partial credit)
    - 0.0: No match or answer not found
    
    Args:
        completions: List of completion strings
        answer: Expected answer (from dataset) - can be just the letter (e.g., "C") or full text
        partial_credit: Whether to give partial credit if answer is found within response (default: True)
        partial_credit_threshold: Reward value for partial matches (default: 0.5)
        **kwargs: Additional arguments (ignored)
    
    Returns:
        List of rewards (0.0, 0.5, or 1.0)
    """
    if completions is None or answer is None:
        return [0.0] * len(completions) if completions else []
    
    rewards = []
    answer_str = str(answer).strip().lower()
    
    # Extract just the letter/answer from the expected answer
    # Handle cases like "The answer is C" or "Answer: C" or full solution text ending with answer
    # Look for the pattern "answer:" or "answer is" followed by a letter
    answer_match_expected = re.search(r"(?:answer\s*(?:is|:)?\s*)([a-d])", answer_str)
    if answer_match_expected:
        answer_clean = answer_match_expected.group(1).lower()
    else:
        # If no "answer:" pattern found, look for the last letter [a-d] in the string
        letters = re.findall(r"[a-d]", answer_str)
        if letters:
            answer_clean = letters[-1].lower()  # Take the last occurrence
        else:
            answer_clean = answer_str
    
    for completion in completions:
        reward = 0.0
        
        # Extract answer from <answer>...</answer> tags
        answer_match = re.search(r"<answer>(.*?)</answer>", completion, re.DOTALL)
        
        if answer_match:
            extracted_answer = answer_match.group(1).strip().lower()
            
            # Check for exact match (single letter or full answer)
            if extracted_answer == answer_clean:
                reward = 1.0
            # Check for partial match (answer found within extracted answer)
            elif partial_credit and answer_clean in extracted_answer:
                reward = partial_credit_threshold
        
        rewards.append(reward)
    
    return rewards

In [11]:
train_dataset = create_nanovlm_grpo_dataset(
    dataset=train_dataset,
    prompt_column="prompt",
    image_column="image",
)
test_dataset = create_nanovlm_grpo_dataset(
    dataset=test_dataset,
    prompt_column="prompt",
    image_column="image",
)

In [12]:
grpo_config = NanoVLMGRPOConfig(
    output_dir="./nanovlm-grpo-output",
    per_device_train_batch_size=1,
    gradient_accumulation_steps=2,
    learning_rate=5e-6,
    num_train_epochs=2,
    num_generations=3,  # Generate 4 completions per prompt
    max_prompt_length=2048,
    max_completion_length=512,
    beta=0.1,  # KL penalty
    temperature=1.0,
    logging_steps=3,
    eval_strategy="steps",
    eval_steps=3,
    log_completions=False,
    # DAPO-specific settings
    loss_type="grpo",
    importance_sampling_level="token",
    mask_truncated_completions=True,
    fp16=True,
    bf16=False
)

In [13]:
trainer = NanoVLMGRPOTrainer(
    model=peft_model,
    reward_funcs=[format_reward,accuracy_reward],
    args=grpo_config,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    tokenizer=tokenizer,
    image_processor=image_processor,
)

In [14]:
torch.cuda.empty_cache()

In [None]:
trainer.train()

Step,Training Loss,Validation Loss


In [33]:
import argparse
import torch
from PIL import Image
from peft import PeftModel

from models.vision_language_model import VisionLanguageModel
from data.processors import get_tokenizer, get_image_processor, get_image_string
torch.manual_seed(0)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(0)
    device = torch.device("cuda")
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")
print(f"Using device: {device}")
source = "./nanovlm-grpo-output"
print(f"Loading weights from: {source}")

# 1. Load the base model
base_model = VisionLanguageModel.from_pretrained("lusxvr/nanoVLM-230M-8k").to(device)

# 2. Load LoRA adapters on top
model = PeftModel.from_pretrained(
    base_model,
    source,  # Your output_dir with adapter_config.json
)

# 3. Get tokenizer and image processor
tokenizer = get_tokenizer(model.cfg.lm_tokenizer, model.cfg.vlm_extra_tokens, model.cfg.lm_chat_template)
image_processor = get_image_processor(model.cfg.max_img_size, model.cfg.vit_img_size, False)

# 4. Set to eval mode
model.eval()
# Now you can use model for inference!

Using device: cuda
Resize to max side len: True


In [None]:
idx = 10
img = test_dataset[idx]["image"].convert('RGB')
processed_image, splitted_image_ratio = image_processor(img)
if not hasattr(tokenizer, "global_image_token") and splitted_image_ratio[0]*splitted_image_ratio[1] == len(processed_image) - 1:
    # If the tokenizer doesn't have a global image token, but the processor generated it, remove it
    processed_image = processed_image[1:]

image_string = get_image_string(tokenizer, [splitted_image_ratio], model.cfg.mp_image_token_length)

messages = [{"role": "user", "content": image_string + test_dataset[idx]['prompt']}]
encoded_prompt = tokenizer.apply_chat_template([messages], tokenize=True, add_generation_prompt=True)
tokens = torch.tensor(encoded_prompt).to(device)
img_t = processed_image.to(device)

print("\nInput:\n ", test_dataset[idx]['prompt'], "\n\nOutput:")

for i in range(3):
    gen = model.generate(tokens, img_t, max_new_tokens=512)
    out = tokenizer.batch_decode(gen, skip_special_tokens=True)[0]
    print(f"  >> Generation {i+1}: {out}")