In [None]:
import torch
import os 
import json
import random
from tqdm import tqdm
from collections import Counter 

# for qwen2.5-vl
from transformers import Qwen2_5_VLForConditionalGeneration, AutoTokenizer, AutoProcessor
from qwen_vl_utils import process_vision_info 

In [None]:
# We recommend enabling flash_attention_2 for better acceleration and memory saving, especially in multi-image and video scenarios.
model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
    "Qwen/Qwen2.5-VL-7B-Instruct",
    torch_dtype=torch.bfloat16,
    attn_implementation="flash_attention_2",
    device_map="auto",
)

# default processer
processor = AutoProcessor.from_pretrained("Qwen/Qwen2.5-VL-7B-Instruct")


In [None]:
# Path to the folder containing bird subfolders
images_folder = '/CUB_200_2011/images'

# Dictionary to store bird names and their corresponding image paths
bird_images = {}

# Iterate over each subfolder in the images folder
for folder in os.listdir(images_folder):
    bird_name = folder.split(".")[-1]  # Extract bird name from folder name
    folder_path = os.path.join(images_folder, folder)  # Path to the bird's folder
    
    # Initialize an empty list to store image paths for the current bird
    image_paths = []
    
    # Iterate over the image files in the bird's folder
    for image_file in os.listdir(folder_path):
        image_path = os.path.join(folder_path, image_file)  # Full path to the image file
        image_paths.append(image_path)  # Store the image path
    
    # Store the list of image paths in the dictionary under the bird's name
    bird_images[bird_name] = image_paths

# Now bird_images contains a dictionary where the keys are bird names and the values are lists of image paths
print(len(bird_images))

In [None]:
bird_images['Black_footed_Albatross'][0] 

In [7]:
bird_names = bird_images.keys() 

In [None]:
# For with class name setting: 
# with open("new_cub_with_class_descriptions.json", "r") as file:
#     json_data = json.load(file)
# print(len(json_data))

# For without class name setting: 
with open("new_cub_class_descriptions.json", "r") as file:
    json_data = json.load(file)
print(len(json_data))

In [None]:
json_data[0]

In [None]:
# Use this if the json file contain the medium and hard categories
medium_data = []
hard_data = []
for data in json_data:
    if data['difficulty'] == "Medium":
        medium_data.append(data)
    else:
        hard_data.append(data)
print(len(medium_data))
print(len(hard_data))

In [8]:
def get_answer(image_path, query):
    messages = [
        {
            "role": "user",
            "content": [
                {
                    "type": "image",
                    "image": image_path,
                },
                {"type": "text", "text": query},
            ],
        }
    ]

    # Preparation for inference
    text = processor.apply_chat_template(
        messages, tokenize=False, add_generation_prompt=True
    )
    image_inputs, video_inputs = process_vision_info(messages)
    inputs = processor(
        text=[text],
        images=image_inputs,
        # videos=video_inputs,
        padding=True,
        return_tensors="pt",
    )
    inputs = inputs.to("cuda")

    # Inference: Generation of the output
    generated_ids = model.generate(**inputs, max_new_tokens=128) 
    # print(generated_ids)

    generated_ids_trimmed = [
        out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
    ]
    output_text = processor.batch_decode(
        generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
    )[0]
    
    return output_text

In [None]:
# Counters for distribution
true_distribution = Counter()
predicted_distribution = Counter()

results = []

for i, item in tqdm(enumerate(hard_data)): # for easy part json_data, for medium_data, for hard_data 
    mcq_id = item['mcq_id']
    question = item['question']
    options = item['options']
    correct_answer = item['correct_answer']

    if mcq_id not in bird_images or not bird_images[mcq_id]:
        print(f"No image for {mcq_id}") 
        continue

    image_paths = bird_images[mcq_id][:5]

    # Format the prompt
    formatted_prompt = f"{question}\n"
    for k in ['A', 'B', 'C', 'D']:  # ['D', 'C', 'B', 'A'] for position bias checking ['A', 'B', 'C', 'D']
        formatted_prompt += f"{k}. {options[k]}\n"

    # Final prompt
    prompt = f""" Your answer or response must ONLY be a single index ('A', 'B', 'C', 'D'). Do not response with any other text. 

    {formatted_prompt}

    Answer: ('A', 'B', 'C', 'D')"""

    # Run the model
    for image_path in image_paths:
        model_output = get_answer(image_path, prompt)
        # print("Model Output: ", model_output)

        # Extract predicted answer (basic string search, can refine)
        predicted_answer = None
        for option in ['A', 'B', 'C', 'D']:
            if f"{option}" in model_output or f"{option}." in model_output:
                predicted_answer = option

        # Update counters
        true_distribution[correct_answer] += 1
        if predicted_answer:
            predicted_distribution[predicted_answer] += 1
        
        results.append({
            'mcq_id': mcq_id,
            'image_path': image_path,
            'prompt': prompt,
            'model_output': model_output,
            'predicted_answer': predicted_answer,
            'correct_answer': correct_answer,
            'is_correct': predicted_answer == correct_answer
        })

# Accuracy summary 
correct = sum(r['is_correct'] for r in results if r['predicted_answer'] is not None)
total = len(results)
print(f"Accuracy: {correct}/{total} = {correct / total:.2%}") 

# Print distributions
print("True Option Distribution:", dict(true_distribution))
print("Predicted Option Distribution:", dict(predicted_distribution)) 