# VLM Benchmark for Object Property Abstraction

This notebook implements a benchmark for evaluating Vision Language Models (VLMs) on object property abstraction and visual question answering (VQA) tasks. The benchmark includes three types of questions:

1. Direct Recognition
2. Property Inference
3. Counterfactual Reasoning

And three types of images:
- REAL
- ANIMATED
- AI GENERATED

## Setup and Imports

First, let's import the necessary libraries and set up our environment.

In [1]:
# Install required packages
# %pip install transformers torch Pillow tqdm bitsandbytes accelerate

In [None]:
# %pip install qwen-vl-utils flash-attn #--no-build-isolation
%pip install -q -U google-genai

In [3]:
# Import required libraries
import torch
import os
import json
from pathlib import Path
from PIL import Image
import gc
import re
from tqdm import tqdm
from typing import List, Dict, Any
from google import genai
from dotenv import load_dotenv

load_dotenv()

# Check if CUDA is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

  from .autonotebook import tqdm as notebook_tqdm


Using device: cuda


In [None]:
# Get API key from environment
api_key = os.getenv('GOOGLE_API_KEY')
if not api_key:
    raise ValueError("GOOGLE_API_KEY not found in environment")

## Benchmark Tester Class

This class handles the evaluation of models against our benchmark.

In [4]:
class BenchmarkTester:
    def __init__(self, benchmark_path="/var/scratch/ave303/OP_bench/benchmark.json", data_dir="/var/scratch/ave303/OP_bench/"):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        with open(benchmark_path, 'r') as f:
            self.benchmark = json.load(f)
        self.data_dir = data_dir
        self.api_key = os.getenv('GOOGLE_API_KEY')

    def clean_answer(self, answer):
        """Clean the model output to extract just the number."""
        # Remove any text that's not a number
        # import re
        # numbers = re.findall(r'\d+', answer)
        # if numbers:
        #     return numbers[0]  # Return the first number found
        # return answer
        """Extract number and reasoning from the model's answer."""
        # Try to extract number and reasoning using regex
        import re
        pattern = r'(\d+)\s*\[(.*?)\]'
        match = re.search(pattern, answer)
        
        if match:
            number = match.group(1)
            objects = [obj.strip() for obj in match.group(2).split(',')]
            return {
                "count": number,
                "reasoning": objects
            }
        else:
            # Fallback if format isn't matched
            numbers = re.findall(r'\d+', answer)
            return {
                "count": numbers[0] if numbers else "0",
                "reasoning": []
            }

    # def model_generation(self, model_name, model, inputs, processor):
    #     """Generate answer and decode."""
    #     outputs = None  # Initialize outputs to None
    #     input_len = inputs["input_ids"].shape[-1]
        
    #     if model_name=="Gemma3":
    #         outputs = model.generate(**inputs, max_new_tokens=200, do_sample=False)
    #         outputs = outputs[0][input_len:]
    #         answer = processor.decode(outputs, skip_special_tokens=True)
    #         # outputs = model.generate(**inputs, do_sample=False, max_new_tokens=100)
    #         # answer = processor.batch_decode(
    #         #     outputs,
    #         #     skip_special_tokens=True,
    #         # )[0]
    #     else:
    #         print(f"Warning: Unknown model name '{model_name}' in model_generation.")
    #         answer = ""  # Return an empty string

    #     return answer, outputs
    
    def evaluate_model(self, model_name, save_path, start_idx=0, batch_size=5):
        results = []
        print(f"\nEvaluating {model_name}...")
        print(f"Using device: {self.device}")
        
        # client = genai.Client(api_key=self.api_key)

        # Force garbage collection before starting
        gc.collect()
        torch.cuda.empty_cache()

        try:
            images = self.benchmark['benchmark']['images'][start_idx:start_idx + batch_size]
            total_images = len(images)
            
            for idx, image_data in enumerate(tqdm(images, desc="Processing images")):
                try:
                    client = genai.Client(api_key=self.api_key)
                    print(f"\nProcessing image {idx+1}/{total_images}: {image_data['image_id']}")
                    image_path = Path(self.data_dir)/image_data['path']
                    if not image_path.exists():
                        print(f"Warning: Image not found at {image_path}")
                        continue
                    
                    # Load and preprocess image
                    # image = Image.open(image_path).convert("RGB")
                    
                    image = client.files.upload(file=image_path)
                    image_results = []  # Store results for current image
                    
                    for question in image_data['questions']:
                        try:
                            # prompt = self.format_question(question, model_name)
                            print(f"Question: {question['question']}")

                            # messages = [
                            #     {
                            #         "role": "user",
                            #         "content": [
                            #             {"type": "image", "image": image},
                            #             # {"type": "text", "text": f"{question['question']} Answer format: total number(numerical) objects(within square brackets)"},
                            #             # {"type": "text", "text": f"{question['question']} Provide just the total count and the list of objects in the given format \n Format: number [objects]"},
                            #             # {"type": "text", "text": f"{question['question']} Answer Format: number [objects]"},
                            #             {"type": "text", "text": f"{question["question"]} Your response MUST be in the following format and nothing else:\n <NUMBER> [<OBJECT1>, <OBJECT2>, <OBJECT3>, ...]"}
                            #         ]
                            #     },
                            # ]
                            # messages = [
                            #     {
                            #         "role": "system",
                            #         "content": [{"type": "text", "text": "You are a helpful assistant."}]
                            #     },
                            #     {
                            #         "role": "user",
                            #         "content": [
                            #             {"type": "image", "image": image},
                            #             {"type": "text", "text": f"{question['question']} Your response MUST be in the following format and nothing else:\n <NUMBER> [<OBJECT1>, <OBJECT2>, <OBJECT3>, ...]"}
                            #         ]
                            #     }
                            # ]

                            response = client.models.generate_content(
                                model=model_name,
                                contents=[image, f"{question['question']} Your response MUST be in the following format and nothing else:\n <NUMBER> [<OBJECT1>, <OBJECT2>, <OBJECT3>, ...]"],
                            )
                            # print(response.text)
                            
                            # Clear cache before processing each question
                            torch.cuda.empty_cache()
                            
                            # Process image and text
                            # inputs = processor(images=image, text=prompt, return_tensors="pt").to(self.device)
                            # if model_name=="smolVLM2":
                            # inputs = processor.apply_chat_template(
                            #     messages,
                            #     add_generation_prompt=True,
                            #     tokenize=True,
                            #     return_dict=True,
                            #     return_tensors="pt",
                            # ).to(model.device, dtype=torch.bfloat16)
                           
                            # with torch.no_grad():
                            #     answer, outputs = self.model_generation(model_name, model, inputs, processor)    #call for model.generate
        
                            cleaned_answer = self.clean_answer(response.text)
                            
                            image_results.append({
                                "image_id": image_data["image_id"],
                                "image_type": image_data["image_type"],
                                "question_id": question["id"],
                                "question": question["question"],
                                "ground_truth": question["answer"],
                                "model_answer": cleaned_answer["count"],
                                "model_reasoning": cleaned_answer["reasoning"],
                                "raw_answer": response.text,  # Keep raw answer for debugging
                                "property_category": question["property_category"]
                            })
                            
                            # # Clear memory
                            # del outputs, inputs
                            # torch.cuda.empty_cache()
                            
                        except Exception as e:
                            print(f"Error processing question: {str(e)}")
                            continue
                    
                    # Add results from this image
                    results.extend(image_results)
                    
                    # # Save intermediate results only every 2 images or if it's the last image
                    # if (idx + 1) % 2 == 0 or idx == total_images - 1:
                    #     with open(f"{save_path}_checkpoint.json", 'w') as f:
                    #         json.dump(results, f, indent=4)

                    client.files.delete(name=image.name)
                            
                except Exception as e:
                    print(f"Error processing image {image_data['image_id']}: {str(e)}")
                    continue
            
            # Save final results
            if results:
                with open(save_path, 'w') as f:
                    json.dump(results, f, indent=4)
            
        except Exception as e:
            print(f"An error occurred during evaluation: {str(e)}")
            if results:
                with open(f"{save_path}_error_state.json", 'w') as f:
                    json.dump(results, f, indent=4)
        
        return results

## Test Gemini 2.0 Flash

Let's evaluate Gemini-2.0-flash model

In [None]:
def test_Gemini2_0Flash():
    tester = BenchmarkTester()
    Gemini2_0Flash_results = tester.evaluate_model(
        "gemini-2.0-flash",
        "Gemini2_0Flash_results.json",
        start_idx=45,
        batch_size=5
    )

In [None]:
# def test_Gemini2_0Flash_loop():
#     for i in range(0, 45, 5):
#         tester = BenchmarkTester()
#         Gemini2_0Flash_results = tester.evaluate_model(
#             "gemini-2.0-flash",
#             f"Gemini2_0Flash_results{i}.json",
#             start_idx=i,
#             batch_size=5
#         )

## Run Evaluation

Now we can run our evaluation. Let's start with the SmolVLM2 model:

In [None]:
# test_Gemma3()

In [None]:
test_Gemini2_0Flash()

In [None]:
# import json
# import os

# def merge_json_results(base_filename="Gemini2_0Flash_results", output_filename="Gemini2_0Flash_results1.json"):
#     """
#     Merges multiple JSON files generated by the Gemini2_0Flash_loop into a single file.
#     Assumes each JSON file contains a list of dictionaries.

#     Args:
#         base_filename (str): The base name of the individual JSON files (e.g., "Gemini2_0Flash_results").
#         output_filename (str): The name of the consolidated output JSON file.
#     """
#     all_merged_results = [] # Initialize as an empty LIST, not a dictionary

#     # Iterate with the same `i` values as your original loop (0, 5, ..., 45)
#     for j in range(0, 50, 5):
#         filename = f"{base_filename}{j}.json" # Constructs filename like Gemini2_0Flash_results0.json, etc.

#         if os.path.exists(filename):
#             with open(filename, 'r') as f:
#                 try:
#                     batch_results = json.load(f)
#                     # Check if the loaded data is indeed a list, if not, handle error
#                     if isinstance(batch_results, list):
#                         all_merged_results.extend(batch_results) # Extend the master list
#                     else:
#                         print(f"Warning: {filename} does not contain a JSON list. Skipping.")
#                 except json.JSONDecodeError:
#                     print(f"Warning: Could not decode JSON from {filename}. Skipping.")
#         else:
#             print(f"Warning: File {filename} not found. Skipping.")

#     # Save the merged results
#     with open(output_filename, 'w') as f:
#         json.dump(all_merged_results, f, indent=4) # indent for pretty printing

#     print(f"Successfully merged {len(all_merged_results)} results into {output_filename}")


# # After your `test_Gemini2_0Flash_loop()` function finishes running:
# # (Make sure your test_Gemini2_0Flash_loop() function runs first to create the files)
# # For example:
# # test_Gemini2_0Flash_loop()
# #
# # Then call the merge function:
# merge_json_results(output_filename="Gemini2_0Flash_results1.json")