In [1]:
import torch
import os
import re
import json
import warnings
warnings.filterwarnings("ignore")
import numpy as np
from PIL import Image
from tqdm import tqdm
from datasets import load_dataset
from transformers import AutoProcessor, PaliGemmaForConditionalGeneration
from peft import PeftModel, PeftConfig


import logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)

# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

from huggingface_hub import login
login(token='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')

# # --- 模型配置 ---
# # 修改为你的模型路径或Hugging Face模型ID
# # MODEL_PATH = "./path/to/your/checkpoint"  # 你的checkpoint路径或HF模型ID
# # 加载 adapter 配置
# adapter_path = "/content/drive/MyDrive/VLLM/paligemma2-sat-grpo-v2/checkpoint-700"
# peft_config = PeftConfig.from_pretrained(adapter_path)

# # 加载 base 模型
# base_model = PaliGemmaForConditionalGeneration.from_pretrained(
#     peft_config.base_model_name_or_path,
#     torch_dtype="auto",
#     device_map="auto"
# )

# # 注入 adapter
# model = PeftModel.from_pretrained(base_model, adapter_path)
# model.eval()
# processor = AutoProcessor.from_pretrained(peft_config.base_model_name_or_path)

model_id = "google/paligemma-3b-pt-448"
model = PaliGemmaForConditionalGeneration.from_pretrained(model_id).to(device).eval()
processor = AutoProcessor.from_pretrained(model_id)


ModuleNotFoundError: No module named 'datasets'

In [None]:

def extract_choice_answer(text):
    """从文本中提取选择题答案"""
    text = text.lower().strip()
    pattern1 = r'\(([a-z])\)'  # (A), (B), (C)
    pattern2 = r'\b([a-z])\b'  # A, B, C

    match = re.search(pattern1, text)
    if match:
        return match.group(1).lower()

    match = re.search(pattern2, text)
    if match:
        return match.group(1).lower()

    return text

def extract_gt_answer(gt_text):
    """提取答案中的标准答案"""
    gt_text = gt_text.lower().strip()
    match = re.search(r"<answer>\s*(.*?)\s*</answer>", gt_text)
    if match:
        return match.group(1).strip().lower()
    return gt_text.strip().lower()

def normalize_answer(text, task_type=None):
    """规范化答案格式"""
    if not text:
        return ""

    text = text.lower().strip()

    # 处理CV-Bench特定的任务类型

    if "\n" in text:
      text = text.split("\n")[-1].strip()
    if task_type in ['Count', 'Relation', 'Distance', 'Depth']:
        choice = extract_choice_answer(text)
        if choice and choice.lower() in "abcde":
            return choice.lower()

    # 如需添加其他规范化规则
    # numbers = re.findall(r"\d+", text)
    # if numbers:
    #     return numbers[0]

    # if "yes" in text:
    #     return "yes"
    # elif "no" in text:
    #     return "no"

    return text

def evaluate_batch(model, processor, batch, batch_size):
    """评估一批样本"""
    images_raw = batch["image"]  # List of PIL images
    questions = batch["problem"] if "problem" in batch else batch["prompt"]
    gt_answers_raw = batch["solution"] if "solution" in batch else batch["answer"]
    gt_answers = [str(ans).lower() for ans in gt_answers_raw]

    # 预处理输入
    inputs = processor(text=questions, images=images_raw, return_tensors="pt", padding=True).to(device)

    # 生成响应
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            do_sample=False,
            eos_token_id=processor.tokenizer.eos_token_id,
            pad_token_id=processor.tokenizer.pad_token_id,
        )

        # 解码输出
        responses_batch = processor.batch_decode(outputs, skip_special_tokens=True)
        # print(responses_batch)
    return responses_batch, gt_answers, questions

def compute_accuracy(responses, ground_truths, task_type=None):
    """计算准确率"""
    assert len(responses) == len(ground_truths)
    correct = 0
    # print(responses)
    # print(ground_truths)
    for pred, gt in zip(responses, ground_truths):
        pred_norm = normalize_answer(pred, task_type)
        gt_norm = normalize_answer(extract_gt_answer(gt), task_type)
        # print(pred_norm,gt_norm)
        if pred_norm == gt_norm:
            correct += 1
        # else:
            # print(f"❌ Wrong — Pred: '{pred}' | GT: '{gt}' → ({pred_norm} ≠ {gt_norm})")

    return correct / len(responses)

def evaluate_cvbench(model, processor):
    """评估CV-Bench数据集"""
    print("\n=== Evaluating CVBench dataset ===")

    try:
        ds = load_dataset("nyu-visionx/CV-Bench")
        print(f"Loaded CVBench dataset with {len(ds['test'])} test samples")
    except Exception as e:
        print(f"Error loading CVBench dataset: {e}")
        return {}

    # 定义要评估的任务
    tasks = ['Count', 'Relation', 'Distance', 'Depth']
    # tasks = ['Count']  # 如果只想测试一个任务，可以取消注释此行

    # 按任务分类样本
    task_data = {}
    for task in tasks:
        task_data[task] = [item for item in ds['test'] if item['task'] == task]
        print(f"Found {len(task_data[task])} samples for {task} task")

    # 评估每个任务
    results = {}
    batch_size = 100  # 可根据GPU内存调整

    for task in tasks:
        print(f"\n--- Evaluating {task} task ---")
        task_samples = task_data[task]

        # 确保样本数量是batch_size的整数倍
        num_samples = (len(task_samples) // batch_size) * batch_size
        if num_samples == 0:
            print(f"Skipping {task} task: not enough samples")
            continue

        task_samples = task_samples[:num_samples]

        responses = []
        ground_truths = []

        # 批量处理样本
        for i in tqdm(range(0, len(task_samples), batch_size)):
            batch = task_samples[i:i+batch_size]

            batch_dict = {
                "image": [item["image"] for item in batch],
                "prompt": ['<image>'+ item["prompt"] for item in batch],
                "answer": [item["answer"] for item in batch]
            }

            # 评估批次
            responses_batch, gt_answers, questions = evaluate_batch(model, processor, batch_dict, batch_size)

            # 记录结果
            responses.extend(responses_batch)
            ground_truths.extend(gt_answers)

            # 定期输出中间结果
            if i % 100 == 0:
                accuracy = compute_accuracy(responses, ground_truths, task)
                print(f"{task} Accuracy: {accuracy:.2%}")

        # 计算任务准确率
        accuracy = compute_accuracy(responses, ground_truths, task)
        print(f"{task} Accuracy: {accuracy:.2%}")
        results[task] = accuracy

    # 计算总体准确率
    if results:
        overall_accuracy = sum(results.values()) / len(results)
        print(f"\nCVBench Overall Accuracy: {overall_accuracy:.2%}")
        results["overall"] = overall_accuracy

    return results

def save_results(results, output_file="cvbench_results.json"):
    """保存评估结果"""
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(results, f, indent=2, ensure_ascii=False)
    print(f"Results saved to {output_file}")

In [None]:
results = evaluate_cvbench(model, processor)

    # 保存结果
save_results(results)

    # 输出摘要
print("\n=== Summary of Results ===")
for dataset, result in results.items():
    if isinstance(result, dict):
        print(f"{dataset} results:")
        for task, acc in result.items():
            print(f"  {task}: {acc:.2%}")
    else:
        print(f"{dataset}: {result:.2%}")


=== Evaluating CVBench dataset ===


README.md:   0%|          | 0.00/6.14k [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


test_2d.parquet:   0%|          | 0.00/185M [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


test_3d.parquet:   0%|          | 0.00/220M [00:00<?, ?B/s]

Generating test split:   0%|          | 0/2638 [00:00<?, ? examples/s]

Loaded CVBench dataset with 2638 test samples
Found 788 samples for Count task
Found 650 samples for Relation task
Found 600 samples for Distance task
Found 600 samples for Depth task

--- Evaluating Count task ---


  0%|          | 0/15 [01:47<?, ?it/s]


KeyboardInterrupt: 