# GPT-5 mini 在 VLM4D 数据集上的测试

这个 notebook 测试 OpenAI 的 GPT-5 mini 模型在 VLM4D 数据集上的表现。

**数据集:**
- `real_mc.json`: 真实视频数据 (1371 个问题)
- `synthetic_mc.json`: 合成视频数据

**问题类型:** 多项选择题，关于视频中物体的运动方向、轨迹等

**参数设置:** 与 VLM4D 官方保持一致

## 1. 安装依赖

In [None]:
# 如果在 Colab 运行，取消注释下面的行
# !pip install openai aiolimiter tqdm python-dotenv opencv-python pydantic

## 2. 配置

In [None]:
import os
import json
import asyncio
import base64
import hashlib
import cv2
import requests
import random
import numpy as np
from string import Template
from tqdm.asyncio import tqdm_asyncio
from tqdm import tqdm
import aiolimiter
from pydantic import BaseModel
from openai import AsyncOpenAI, OpenAIError

# 配置
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")  # 或者直接填入你的 API key
# OPENAI_API_KEY = "sk-xxx"  # 取消注释并填入你的 key

if not OPENAI_API_KEY:
    raise ValueError("请设置 OPENAI_API_KEY 环境变量或直接在代码中填入")

print(f"API Key 已配置: {OPENAI_API_KEY[:10]}...")

## 3. 常量和 Prompt 模板

与 VLM4D 官方 `utils/constant.py` 保持一致

In [None]:
# 常量 (来自 VLM4D utils/constant.py)
MAX_TOKENS = 1024
GENERATION_TEMPERATURE = 1.0

# Prompt 模板 (来自 VLM4D utils/constant.py)
MULTI_CHOICE_COT_PROMPT = Template("""
Question: $question
$optionized_str

Answer the given multiple-choice question step by step. Begin by explaining your reasoning process clearly. In the last sentence of your response, you must conclude by stating the final answer using the following format: 'Therefore, the final answer is: $$LETTER' (without quotes), where $$LETTER must be only one of the options (A or B or C or D). Think step by step before answering.""")

MULTI_CHOICE_DO_PROMPT = Template("""
Question: $question
$optionized_str

Do not generate any intermediate reasoning process. Answer directly with the option letter from the given choices.
""")

# 评估用的 prompt (来自 VLM4D utils/eval_utils.py)
EVAL_INSTRUCTION = """Your task is to evaluate whether the model's final answer is correct by comparing it to the ground-truth answer provided for the given question.

You should first extract the final answer from the model's response, and then compare the extracted answer with the choice that matches the ground-truth answer to determine its correctness.
Output your response in the following structured format:
{
    "extracted_answer": // str value "A" "B" "C" "D", followed by a colon and the corresponding answer text, e.g., "A: Answer A text". If the model's response does not contain a valid choice and reasoning, then "No Valid Answer".
    "correct": // boolean value, True if the extracted answer matches the ground-truth answer (correct choice), False otherwise ("No Valid Answer" is also considered False).
}
"""

print("Prompt 模板已加载 (与 VLM4D 官方一致)")

## 4. 工具函数

与 VLM4D 官方 `utils/video_process.py` 保持一致

In [None]:
# 视频下载和处理 (来自 VLM4D utils/video_process.py)
def download_video(video_url, video_tmp_dir="video_cache"):
    """下载视频到本地缓存"""
    video_id = hashlib.md5(video_url.encode()).hexdigest()
    video_subdir = os.path.join(video_tmp_dir, video_id)
    os.makedirs(video_subdir, exist_ok=True)
    
    video_path = os.path.join(video_subdir, "video.mp4")
    if not os.path.exists(video_path):
        with open(video_path, "wb") as f:
            response = requests.get(video_url)
            f.write(response.content)
    
    return video_path, video_id


def read_video(video_path, total_frames):
    """从视频中均匀采样帧并转为 base64 (与 VLM4D 官方一致)"""
    video = cv2.VideoCapture(video_path)
    if not video.isOpened():
        raise ValueError(f"Could not open video file: {video_path}")
    
    try:
        base64_frames = []
        while True:
            success, frame = video.read()
            if not success:
                break
            _, buffer = cv2.imencode('.jpg', frame)
            frame_base64 = base64.b64encode(buffer).decode('utf-8')
            base64_frames.append(frame_base64)
        
        # 均匀采样 (与 VLM4D 官方一致)
        random.seed(42)
        if total_frames == 1:
            selected_indices = [np.random.choice(range(len(base64_frames)))]
        else:
            selected_indices = np.linspace(0, len(base64_frames) - 1, total_frames, dtype=int)
        
        selected_base64_frames = [base64_frames[i] for i in selected_indices]
        return selected_base64_frames
    finally:
        video.release()


def prepare_base64frames(video_url, total_frames, video_tmp_dir="video_cache"):
    """获取视频帧（带缓存）"""
    video_path, video_id = download_video(video_url, video_tmp_dir)
    
    # 检查缓存
    cache_dir = os.path.join(video_tmp_dir, video_id, f"{total_frames}_frames")
    cache_file = os.path.join(cache_dir, "base64frames.json")
    
    if os.path.exists(cache_file):
        with open(cache_file, "r") as f:
            return json.load(f)
    
    # 读取帧
    base64frames = read_video(video_path, total_frames)
    
    # 保存缓存
    os.makedirs(cache_dir, exist_ok=True)
    with open(cache_file, "w") as f:
        json.dump(base64frames, f)
    
    return base64frames


print("工具函数已加载 (与 VLM4D 官方一致)")

## 5. 准备输入

与 VLM4D 官方 `utils/prepare_input.py` 保持一致

In [None]:
def prepare_qa_text_input(query, prompt):
    """准备问答文本 (来自 VLM4D utils/prepare_input.py)"""
    question_type = query["question_type"]
    
    if question_type == "multiple-choice":
        optionized_list = [f"{key}: {value}" for key, value in query['choices'].items()]
        optionized_str = "\n".join(optionized_list)
        qa_text_prompt = prompt.substitute(question=query['question'], optionized_str=optionized_str)
    else:
        raise ValueError(f"Invalid question type: {question_type}")
    
    return {"type": "text", "text": qa_text_prompt}


def prepare_multi_image_input(base64frames):
    """准备多图像输入 (来自 VLM4D utils/prepare_input.py)"""
    return [
        {
            "type": "image_url",
            "image_url": {"url": f"data:image/jpeg;base64,{frame}"}
        }
        for frame in base64frames
    ]


def prepare_qa_inputs(queries, total_frames, prompt, video_tmp_dir="video_cache"):
    """为所有问题准备消息 (来自 VLM4D utils/prepare_input.py)"""
    messages = []
    for query in tqdm(queries, desc="准备输入"):
        qa_text_message = prepare_qa_text_input(query, prompt)
        
        if total_frames >= 1:
            base64frames = prepare_base64frames(query['video'], total_frames, video_tmp_dir)
            vision_input = prepare_multi_image_input(base64frames)
            prompt_message = [
                {
                    "role": "user",
                    "content": vision_input + [qa_text_message],
                }
            ]
        elif total_frames == 0:
            prompt_message = [
                {
                    "role": "user",
                    "content": [qa_text_message],
                }
            ]
        else:
            raise ValueError(f"Invalid total_frames: {total_frames}")
        
        messages.append(prompt_message)
    return messages


print("输入准备函数已加载 (与 VLM4D 官方一致)")

## 6. OpenAI API 调用

与 VLM4D 官方 `utils/api_utils.py` 保持一致

In [None]:
async def _throttled_openai_chat_completion_acreate(
    client,
    model,
    messages,
    temperature,
    max_tokens,
    top_p,
    limiter,
):
    """单次 API 调用（带重试）(来自 VLM4D utils/api_utils.py)"""
    async with limiter:
        for _ in range(10):
            try:
                return await client.chat.completions.create(
                    model=model,
                    messages=messages,
                    temperature=temperature,
                    max_completion_tokens=max_tokens,  # GPT-5 系列使用 max_completion_tokens
                    top_p=top_p,
                )
            except Exception as e:
                if "rate_limit" in str(e).lower():
                    print("Rate limit exceeded, retrying...")
                    await asyncio.sleep(random.randint(10, 20))
                elif "bad_request" in str(e).lower():
                    print(f"Bad request: {e}")
                    return None
                else:
                    print(f"Error: {e}")
                    await asyncio.sleep(random.randint(5, 10))
        return None


async def generate_from_openai_chat_completion(
    client,
    messages,
    engine_name,
    temperature=1.0,
    max_tokens=512,
    top_p=1.0,
    requests_per_minute=150,
):
    """批量调用 OpenAI API (来自 VLM4D utils/api_utils.py)"""
    delay = 60.0 / requests_per_minute
    limiter = aiolimiter.AsyncLimiter(1, delay)
    
    async_responses = [
        _throttled_openai_chat_completion_acreate(
            client,
            model=engine_name,
            messages=message,
            temperature=temperature,
            max_tokens=max_tokens,
            top_p=top_p,
            limiter=limiter,
        )
        for message in messages
    ]
    
    responses = await tqdm_asyncio.gather(*async_responses, desc="API 调用")
    
    outputs = []
    for response in responses:
        try:
            outputs.append(response.choices[0].message.content)
        except Exception as e:
            print(f"Error extracting response: {e}")
            outputs.append("")
    
    return outputs


print("API 调用函数已加载 (与 VLM4D 官方一致)")

## 7. 评估函数

与 VLM4D 官方 `utils/eval_utils.py` 保持一致

In [None]:
class EvaluationOutput(BaseModel):
    extracted_answer: str
    correct: bool


def prepare_evaluation_message(example, response):
    """准备评估消息 (来自 VLM4D utils/eval_utils.py)"""
    question_type = example["question_type"]
    
    if question_type == "multiple-choice":
        optionized_list = [f"{key}: {value}" for key, value in example['choices'].items()]
        optionized_str = "\n".join(optionized_list)
        question_context = f"Question: {example['question']}\n\nOptions:\n{optionized_str}"
    else:
        return None
    
    gt_answer = f"Ground Truth Answer: {example['answer']}"
    model_response = f"Model Response to the Question: {response}"
    
    user_prompt = f"{question_context}\n\n{gt_answer}\n\n{model_response}"
    
    return [
        {"role": "system", "content": EVAL_INSTRUCTION},
        {"role": "user", "content": user_prompt},
    ]


async def _throttled_eval_call(client, model, messages, limiter):
    """单次评估调用"""
    async with limiter:
        for _ in range(10):
            try:
                response = await client.beta.chat.completions.parse(
                    model=model,
                    messages=messages,
                    temperature=1.0,
                    max_tokens=1024,
                    top_p=1.0,
                    response_format=EvaluationOutput,
                )
                return response.choices[0].message.parsed
            except Exception as e:
                if "rate_limit" in str(e).lower():
                    await asyncio.sleep(random.randint(10, 20))
                else:
                    print(f"Eval error: {e}")
                    await asyncio.sleep(random.randint(5, 10))
        return None


async def get_acc_async(examples, client, eval_model="o4-mini"):
    """评估所有响应 (来自 VLM4D utils/eval_utils.py)"""
    evaluation_messages = [
        prepare_evaluation_message(example, example['response'])
        for example in examples
    ]
    
    # 批量评估
    delay = 60.0 / 1000  # 1000 requests per minute for eval
    limiter = aiolimiter.AsyncLimiter(1, delay)
    
    tasks = [_throttled_eval_call(client, eval_model, msg, limiter) for msg in evaluation_messages]
    outputs = await tqdm_asyncio.gather(*tasks, desc="评估中")
    
    # 统计结果
    count = 0
    results = []
    for example, output in zip(examples, outputs):
        result = {
            "id": example["id"],
            "question": example["question"],
            "choices": example["choices"],
            "response": example["response"],
            "ground_truth_answer": example["answer"],
        }
        try:
            result["extracted_answer"] = output.extracted_answer
            result["correct"] = output.correct
        except Exception as e:
            result["extracted_answer"] = ""
            result["correct"] = False
            print(f"Error: {e}")
        
        results.append(result)
        count += result["correct"]
    
    return count / len(examples), results


print("评估函数已加载 (与 VLM4D 官方一致)")

## 8. 加载数据

In [None]:
# 数据路径（根据你的环境调整）
# 本地运行
DATA_DIR = "../VLM4D-main/data"

# Colab 运行时取消注释下面的行
# !git clone https://github.com/ShijieZhou-UCLA/VLM4D.git
# DATA_DIR = "VLM4D/data"

# 加载数据
real_mc_path = os.path.join(DATA_DIR, "real_mc.json")
synthetic_mc_path = os.path.join(DATA_DIR, "synthetic_mc.json")

with open(real_mc_path, "r") as f:
    real_mc_data = json.load(f)

with open(synthetic_mc_path, "r") as f:
    synthetic_mc_data = json.load(f)

print(f"Real MC 数据: {len(real_mc_data)} 个问题")
print(f"Synthetic MC 数据: {len(synthetic_mc_data)} 个问题")

# 查看示例
print("\n示例问题:")
print(json.dumps(real_mc_data[1], indent=2, ensure_ascii=False))

## 9. 运行测试

参数与 VLM4D 官方脚本一致:
- 模型: `gpt-5-mini` (GPT-5 mini)
- 帧数: 32 (与 gpt-4o 一致)
- Prompt: COT 或 direct-output

In [None]:
# 配置 (与 VLM4D 官方 run_api_models.sh 一致)
MODEL_NAME = "gpt-5-mini"  # GPT-5 mini 模型 ID
TOTAL_FRAMES = 32  # 与 gpt-4o 保持一致
PROMPT_TYPE = "cot"  # "cot" 或 "direct-output"
MAX_SAMPLES = -1  # 测试样本数，-1 为全部

# 选择 prompt 模板
prompt_template = MULTI_CHOICE_COT_PROMPT if PROMPT_TYPE == "cot" else MULTI_CHOICE_DO_PROMPT

# 选择数据子集
test_data = real_mc_data[:MAX_SAMPLES] if MAX_SAMPLES > 0 else real_mc_data

print(f"配置:")
print(f"  模型: {MODEL_NAME}")
print(f"  帧数: {TOTAL_FRAMES}")
print(f"  Prompt: {PROMPT_TYPE}")
print(f"  测试样本: {len(test_data)}")

In [None]:
# 下载视频并准备消息
print("准备输入消息 (下载视频并提取帧)...")
all_messages = prepare_qa_inputs(test_data, TOTAL_FRAMES, prompt_template)

In [None]:
# 调用 API (与 VLM4D 官方 model_inference/azure_gpt.py 一致)
print("调用 OpenAI API...")
client = AsyncOpenAI(api_key=OPENAI_API_KEY)

# 计算请求速率 (与 VLM4D 官方一致)
base_rate = 100  # 默认 base rate
requests_per_minute = int(base_rate / (TOTAL_FRAMES ** 0.5))
print(f"请求速率: {requests_per_minute}/min")

responses = await generate_from_openai_chat_completion(
    client=client,
    messages=all_messages,
    engine_name=MODEL_NAME,
    temperature=GENERATION_TEMPERATURE,
    max_tokens=MAX_TOKENS,
    top_p=1.0,
    requests_per_minute=requests_per_minute
)

print(f"\n获得 {len(responses)} 个响应")

# 将响应添加到数据中
for query, response in zip(test_data, responses):
    query["response"] = response

In [None]:
# 查看部分响应
print("示例响应:")
for i in range(min(3, len(responses))):
    print(f"\n--- 问题 {i+1} ---")
    print(f"Q: {test_data[i]['question']}")
    print(f"选项: {test_data[i]['choices']}")
    print(f"正确答案: {test_data[i]['answer']}")
    resp = responses[i] if responses[i] else "(无响应)"
    print(f"模型回答: {resp[:500]}..." if len(resp) > 500 else f"模型回答: {resp}")

In [None]:
# 评估 (与 VLM4D 官方 acc_evaluation.py 一致)
print("评估响应...")
accuracy, results = await get_acc_async(test_data, client, eval_model="o4-mini")

print(f"\n" + "=" * 50)
print(f"准确率: {accuracy:.2%} ({int(accuracy * len(results))}/{len(results)})")
print("=" * 50)

In [None]:
# 查看详细结果
print("\n详细结果 (前10个):")
for i, r in enumerate(results[:10]):
    status = "✓" if r['correct'] else "✗"
    print(f"{status} Q{i+1}: {r['question'][:50]}...")
    print(f"   正确: {r['ground_truth_answer']} | 提取: {r['extracted_answer']}")

## 10. 保存结果

In [None]:
# 保存结果 (与 VLM4D 官方目录结构一致)
output_dir = f"../outputs/real_mc_{PROMPT_TYPE}"
os.makedirs(output_dir, exist_ok=True)

# 保存模型输出
output_file = os.path.join(output_dir, f"{MODEL_NAME}_{TOTAL_FRAMES}frame.json")
with open(output_file, "w", encoding="utf-8") as f:
    json.dump(test_data, f, indent=4, ensure_ascii=False)
print(f"模型输出已保存到: {output_file}")

# 保存评估结果
eval_dir = f"../processed_outputs/real_mc_{PROMPT_TYPE}"
os.makedirs(eval_dir, exist_ok=True)

eval_file = os.path.join(eval_dir, f"{MODEL_NAME}_{TOTAL_FRAMES}frame.json")
with open(eval_file, "w", encoding="utf-8") as f:
    json.dump(results, f, indent=4, ensure_ascii=False)
print(f"评估结果已保存到: {eval_file}")

# 记录准确率
log_file = "../processed_outputs/evaluation_results.txt"
with open(log_file, "a") as f:
    f.write(f"\nEvaluating output directory: {output_dir}\n")
    f.write("=" * 50 + "\n")
    f.write(f"Accuracy of {MODEL_NAME}_{TOTAL_FRAMES}frame.json: {accuracy}\n")
print(f"准确率已记录到: {log_file}")

## 11. 运行 Synthetic 数据集（可选）

In [None]:
# 取消注释以运行 synthetic 数据集测试

# # 准备 synthetic 数据
# print("准备 Synthetic 数据集...")
# synthetic_messages = prepare_qa_inputs(synthetic_mc_data, TOTAL_FRAMES, prompt_template)
# 
# # 调用 API
# print("调用 API...")
# synthetic_responses = await generate_from_openai_chat_completion(
#     client=client,
#     messages=synthetic_messages,
#     engine_name=MODEL_NAME,
#     temperature=GENERATION_TEMPERATURE,
#     max_tokens=MAX_TOKENS,
#     top_p=1.0,
#     requests_per_minute=requests_per_minute
# )
# 
# # 添加响应
# for query, response in zip(synthetic_mc_data, synthetic_responses):
#     query["response"] = response
# 
# # 评估
# print("评估中...")
# synthetic_accuracy, synthetic_results = await get_acc_async(synthetic_mc_data, client)
# 
# print(f"\nSynthetic 数据集准确率: {synthetic_accuracy:.2%}")
# 
# # 保存
# synth_output_dir = f"../outputs/synthetic_mc_{PROMPT_TYPE}"
# os.makedirs(synth_output_dir, exist_ok=True)
# synth_output_file = os.path.join(synth_output_dir, f"{MODEL_NAME}_{TOTAL_FRAMES}frame.json")
# with open(synth_output_file, "w", encoding="utf-8") as f:
#     json.dump(synthetic_mc_data, f, indent=4, ensure_ascii=False)
# print(f"Synthetic 结果已保存到: {synth_output_file}")