Ref: https://huggingface.co/HuggingFaceTB/SmolVLM-Instruct

Model Page: https://huggingface.co/HuggingFaceTB/SmolVLM-256M-Instruct

In [None]:
import os, re
#os.environ["CUDA_VISIBLE_DEVICES"] = "1"
os.environ["TOKENIZERS_PARALLELISM"] = "false"

In [None]:
print(os.getenv("CONDA_DEFAULT_ENV"))

In [None]:
import json
import torch
import pickle
import logging
import datasets
import numpy as np
from PIL import Image
from tqdm.auto import tqdm
from datasets import Dataset
from datasets import load_dataset
from torchvision import transforms
from torch.utils.data import DataLoader
from sentence_transformers import SentenceTransformer, util

### Set Up Logger

In [None]:
# Clear previous handlers to avoid duplicate logs in Jupyter
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Configure logging
logging.basicConfig(
    level=logging.INFO,  # Change to DEBUG for more verbosity
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]  # Ensures it logs to Jupyter cell output
)

logger = logging.getLogger(__name__)
logger.info("Logging is set up in the notebook!")

### Load the Dataset

In [None]:
dataset = load_dataset("dutta18/Quantity-Reasoning-VQA-23K")

In [None]:
dataset = dataset['train']

In [None]:
dataset

### Load COT Think Data

In [None]:
think_data = list()

with open('/home/aritrad/MOE-Directory/COT-Data-Qty-23K/Quantity-Reasoning-VQA-23K-Reasoning-Trace.jsonl', 'r') as file:
    for line in file:
        record = json.loads(line)
        think_data.append(record['generated_cot'])

In [None]:
len(think_data)

In [None]:
print(think_data[0])

### Merge with Dataset

In [None]:
dataset = dataset.add_column("cot_think_data", think_data)

In [None]:
dataset

In [None]:
# Check a single sample

dataset[0]

### Converting Output Number Words to Numeric Strings

In [None]:
num_map = {
    "zero": 0, "one": 1, "two": 2, "three": 3, "four": 4,
    "five": 5, "six": 6, "seven": 7, "eight": 8, "nine": 9,
    "ten": 10, "eleven": 11, "twelve": 12, "thirteen": 13,
    "fourteen": 14, "fifteen": 15, "sixteen": 16, "seventeen": 17,
    "eighteen": 18, "nineteen": 19, "twenty": 20
}

In [None]:
def convert_answer(example):
    word = example["answer"].strip().lower()
    return {"answer": str(num_map.get(word, None))}

In [None]:
dataset = dataset.map(convert_answer)

In [None]:
# Check a single sample
# Mismatch: 10, 100, 105, 600
dataset[200]

In [None]:
dataset

### Rejection Sampling

In [None]:
def normalize_text(text):
    """
    Standardizes answers for comparison:
    1. Lowers case.
    2. Strips whitespace.
    3. Removes trailing punctuation (like '6.' -> '6').
    4. Converts word-numbers ('six') to digits ('6').
    """
    if text is None:
        return ""
    
    # Basic cleanup
    text = str(text).strip().lower()
    
    # Remove trailing punctuation (e.g., "6." -> "6")
    text = re.sub(r'[^\w\s]', '', text)
    
    # Convert number words to digits using the map
    if text in num_map:
        text = num_map[text]
        
    return text

In [None]:
def rejection_sampling_filter(example):
    """
    Returns True if the CoT answer matches the Ground Truth answer.
    Returns False otherwise.
    """
    ground_truth = example['answer']
    cot_data = example['cot_think_data']
    
    # 1. Extract the answer from inside <answer> tags
    # We use re.DOTALL to handle newlines, though usually answer is short
    match = re.search(r"<answer>(.*?)</answer>", cot_data, re.DOTALL | re.IGNORECASE)
    
    # If no <answer> tag found, REJECT immediately
    if not match:
        return False
        
    generated_answer_content = match.group(1)
    
    # 2. Normalize both
    norm_gt = normalize_text(ground_truth)
    norm_gen = normalize_text(generated_answer_content)
    
    # 3. Compare
    return norm_gt == norm_gen

In [None]:
# --- MAIN EXECUTION ---
print(f"Original Dataset Size: {len(dataset)}")

# Apply the Rejection Sampling
# load_from_cache_file=False ensures we re-run logic if we changed code
filtered_dataset = dataset.filter(rejection_sampling_filter, load_from_cache_file=False)

In [None]:
# Print Statistics

original_count = len(dataset)
filtered_count = len(filtered_dataset)
rejected_count = original_count - filtered_count

In [None]:
print(f"\n--- Rejection Sampling Results ---\n")
print(f"Original: {original_count}")
print(f"Kept:     {filtered_count}")
print(f"Rejected: {rejected_count}")
print(f"Retention Rate: {(filtered_count/original_count)*100:.2f}%")

In [None]:
dataset = filtered_dataset

In [None]:
dataset

### Split Into Train, Test & Val

In [None]:
from datasets import DatasetDict

In [None]:
# 1. First create train (80%) and temp (20%)
train_test = dataset.train_test_split(test_size=0.25, seed=42)

# 2. Split the temp set into validation (10%) and test (10%)
test_val = train_test['test'].train_test_split(test_size=0.6, seed=42)

In [None]:
splits = {
    'train': train_test['train'],
    'validation': test_val['train'],
    'test': test_val['test'],
}

dataset_dict = DatasetDict(splits)

In [None]:
train_set, val_set, test_set = dataset_dict['train'], dataset_dict['validation'], dataset_dict['test']

In [None]:
print(f'Length of the train set: {len(train_set)}, Val set: {len(val_set)} and Test Set: {len(test_set)}')

In [None]:
# Check A Particular Sample For Reproducibility

print(train_set[100]['question'])
print(val_set[100]['question'])
print(test_set[100]['question'])

### Importing Models

In [None]:
DEVICE = 'cuda'

In [None]:
from peft import PeftModel
from transformers.image_utils import load_image
from transformers import AutoModelForImageTextToText, BitsAndBytesConfig, Idefics3ForConditionalGeneration, AutoProcessor, AutoModelForVision2Seq

In [None]:
model_id = "HuggingFaceTB/SmolVLM-256M-Instruct"
QLORA_finetuned_model_path = '/home/aritrad/main/SmolVLM-2B/RL/chkpts/best-smolvlm-256M-qty-chkpt-32' 

In [None]:
base_model = AutoModelForImageTextToText.from_pretrained(
    model_id, 
    dtype=torch.bfloat16, 
    _attn_implementation="flash_attention_2",
    device_map = 'auto',
)

processor = AutoProcessor.from_pretrained(model_id)
processor.tokenizer.padding_side = "left"     # For batched generation.

In [None]:
# Load the QLORA-trained model.

peft_model = PeftModel.from_pretrained(base_model, QLORA_finetuned_model_path)

In [None]:
len(test_set)

### Prepare Chat Messages

In [None]:
def collate_fn(examples):
    texts = []
    images = []

    for example in examples:
        image = example["image"]
        messages = [
            {
                "role": "user",
                "content": [
                    {
                        "type": "image"
                    },
                    {
                        "type": "text", 
                        "text": example["question"]
                    }
                ]
            },
        ]
        text = processor.apply_chat_template(messages, add_generation_prompt=False)
        texts.append(text.strip())
        images.append([image])

    # Batch using processor
    batch = processor(text=texts, images=images, return_tensors="pt", padding=True)
    
    # Now cast pixel_values explicitly
    batch["pixel_values"] = batch["pixel_values"].to(torch.bfloat16)
    return batch

### Create Batches

In [None]:
# Create Test Dataloader.

batch_ = 8
test_loader = DataLoader(test_set, batch_size=batch_, shuffle=False, collate_fn=collate_fn)

### Generate Answers

In [None]:
decoded_outputs=list()

for batch in tqdm(test_loader):
    
    batch = {key: value.to('cuda') for key, value in batch.items()}
    
    with torch.no_grad():
        outputs = peft_model.generate(
            **batch, 
            max_new_tokens=256,
            do_sample=True,                # enable sampling
            temperature=0.6,               # randomness factor
            num_return_sequences=1,
            repetition_penalty=1.05,       # Slight penalty to prevent <think><think> loops
        )
        model_generated_output_only = outputs[:, batch["input_ids"].shape[-1]:]
        decoded_output = processor.batch_decode(
            model_generated_output_only, 
            skip_special_tokens=True, 
            clean_up_tokenization_spaces=False
        )
        decoded_outputs.extend(decoded_output)

In [None]:
idx = 2000
print(decoded_outputs[idx].split('Assistant:')[1].strip().replace('*', ''))

In [None]:
print(test_set[idx]['cot_think_data'].replace('*', ''))

In [None]:
test_set[idx]['image']

In [None]:
len(decoded_outputs)

### Parse Outputs

In [None]:
generated_outputs = list()
groundTruth_outputs = list()

In [None]:
def process_clean_trace(rawOutputs, outputList):

    for response_text in rawOutputs:

        try:
            thought_part = response_text.split('<think>')[1].strip().split("</think>")[0].strip()
            answer_part = response_text.split("<answer>")[1].split("</answer>")[0].strip()
    
            outputList.append( (thought_part, answer_part) )
            
        except:
            
            # Poor Formatted Outputs
            outputList.append( ('NULL', 'NULL') )

In [None]:
process_clean_trace(decoded_outputs, generated_outputs)

In [None]:
groundTruthCOT = test_set['cot_think_data']

In [None]:
process_clean_trace(groundTruthCOT, groundTruth_outputs)

In [None]:
len(groundTruth_outputs), len(generated_outputs)

In [None]:
generated_outputs[0]

### Calculate Proper Formatted Output Percentage.

In [None]:
def calculateFormattedOutputPercent(targetList):
    count = 0
    for item in targetList:
        if item[0]=='NULL':
            count += 1

    return ((len(targetList)-count)/len(targetList))*100

In [None]:
calculateFormattedOutputPercent(generated_outputs)

In [None]:
calculateFormattedOutputPercent(groundTruth_outputs)

### Cosine Function

In [None]:
from sentence_transformers import SentenceTransformer, util
sbert = SentenceTransformer('all-mpnet-base-v2', device = 'cuda')

In [None]:
def findCosSim(word1:str, word2:str) -> int:

    # Compute the embeddings
    embedding1 = sbert.encode(word1, convert_to_tensor=True)
    embedding2 = sbert.encode(word2, convert_to_tensor=True)
    
    # Compute cosine similarity
    cosine_score = util.pytorch_cos_sim(embedding1, embedding2)
    return round(cosine_score.item(), 2)

### Cosine Accuracy - COT Chain

In [None]:
COTCosineAccuracy = []

for idx in tqdm(range(len(generated_outputs))):
    cos_sim = findCosSim(generated_outputs[idx][0], groundTruth_outputs[idx][0])
    if cos_sim > 0.8:
        score = 1
    else:
        score = 0
    
    COTCosineAccuracy.append(score)

In [None]:
round( ( sum(COTCosineAccuracy) / len(COTCosineAccuracy) ) * 100, 2)

### Cosine Accuracy - Short Answers

In [None]:
generated_ouputs_short_answers = [generated_outputs[idx][1] for idx in range(len(generated_outputs))]

In [None]:
groundTruth_outputs_short_answers = [groundTruth_outputs[idx][1] for idx in range(len(groundTruth_outputs))]

In [None]:
groundTruth_outputs_short_answers[500:505]

In [None]:
generated_ouputs_short_answers[500:505]

In [None]:
accuracy = ( sum([ 1 if groundTruth_outputs_short_answers[i].strip()==generated_ouputs_short_answers[i].strip() else 0 for i in range(len(generated_ouputs_short_answers)) ]) / len(generated_ouputs_short_answers) ) * 100

In [None]:
print(f'Final Short Answer Accuracy: {round(accuracy, 2)} %')