In [None]:
import time
import asyncio
import pandas as pd
from typing import List, Dict, Any
from llama_index.llms.openai_like import OpenAILike
from llama_index.core.llms import ChatMessage

class ModelBenchmark:
    def __init__(self, model_configs: Dict[str, Dict[str, Any]]):
        """
        初始化模型基准测试类
        
        Args:
            model_configs: 模型配置字典，格式为 {
                "model_name_1": {"api_base": "http://localhost:8001", "api_key": "empty"},
                "model_name_2": {"api_base": "http://localhost:8002", "api_key": "empty"}
            }
        """
        self.models = {}
        for model_name, config in model_configs.items():
            self.models[model_name] = OpenAILike(
                model=model_name,
                api_base=config["api_base"],
                api_key=config.get("api_key", "sk-xxx"),
                is_chat_model=True,
                max_retries=3,
                timeout=100,
                additional_kwargs={"extra_body": {"chat_template_kwargs": {"enable_thinking": False}}}
            )
    
    def create_test_messages(self, prompt: str) -> List[ChatMessage]:
        """创建测试对话消息"""
        return [
            ChatMessage(role="system", content="你是一个有帮助的AI助手。"),
            ChatMessage(role="user", content=prompt)
        ]
    
    async def test_single_model(self, model_name: str, prompt: str, num_runs: int = 5) -> Dict[str, Any]:
        """测试单个模型的性能"""
        print(f"正在测试模型: {model_name}")
        
        results = {
            "model": model_name,
            "total_time": 0,
            "total_tokens": 0,
            "response_times": [],
            "throughputs": [],
            "success_count": 0
        }
        
        messages = self.create_test_messages(prompt)
        
        for i in range(num_runs):
            try:
                start_time = time.time()
                response = await self.models[model_name].acomplete(
                    prompt,
                    messages
                )
                
                end_time = time.time()
                elapsed_time = end_time - start_time
                
                # 计算token数量（近似值）
                tokens = len(response.text.split())
                
                results["total_time"] += elapsed_time
                results["total_tokens"] += tokens
                results["response_times"].append(elapsed_time)
                results["throughputs"].append(tokens / elapsed_time)
                results["success_count"] += 1
                
                print(f"  运行 {i+1}/{num_runs}: {elapsed_time:.2f}s, {tokens} tokens")
                
            except Exception as e:
                print(f"  运行 {i+1}/{num_runs} 失败: {str(e)}")
            
            # 在运行之间添加短暂延迟
            await asyncio.sleep(1)
        
        return results
    
    async def compare_models(self, prompts: List[str], num_runs: int = 5) -> pd.DataFrame:
        """比较多个模型的性能"""
        all_results = []
        
        for prompt in prompts:
            print(f"\n测试提示: '{prompt[:50]}...'")
            
            prompt_results = {"prompt": prompt}
            
            # 为每个模型运行测试
            tasks = []
            for model_name in self.models.keys():
                task = self.test_single_model(model_name, prompt, num_runs)
                tasks.append(task)
            
            model_results = await asyncio.gather(*tasks)
            
            # 处理结果
            for result in model_results:
                model_name = result["model"]
                if result["success_count"] > 0:
                    avg_time = result["total_time"] / result["success_count"]
                    avg_throughput = result["total_tokens"] / result["total_time"] if result["total_time"] > 0 else 0
                    
                    prompt_results[f"{model_name}_avg_time"] = avg_time
                    prompt_results[f"{model_name}_avg_throughput"] = avg_throughput
                    prompt_results[f"{model_name}_success_rate"] = result["success_count"] / num_runs
                else:
                    prompt_results[f"{model_name}_avg_time"] = None
                    prompt_results[f"{model_name}_avg_throughput"] = None
                    prompt_results[f"{model_name}_success_rate"] = 0
            
            all_results.append(prompt_results)
        
        return pd.DataFrame(all_results)
    
    def generate_report(self, results_df: pd.DataFrame) -> str:
        """生成性能比较报告"""
        report = []
        report.append("=" * 60)
        report.append("大语言模型推理速度比较报告")
        report.append("=" * 60)
        
        for index, row in results_df.iterrows():
            report.append(f"\n提示 {index + 1}: '{row['prompt'][:50]}...'")
            report.append("-" * 40)
            
            for model_name in self.models.keys():
                if f"{model_name}_avg_time" in row:
                    report.append(
                        f"{model_name}: "
                        f"{row[f'{model_name}_avg_time']:.2f}s, "
                        f"{row[f'{model_name}_avg_throughput']:.1f} tokens/s, "
                        f"成功率: {row[f'{model_name}_success_rate']:.0%}"
                    )
        
        # 计算总体平均值
        report.append("\n" + "=" * 60)
        report.append("总体性能比较")
        report.append("=" * 60)
        
        for model_name in self.models.keys():
            time_col = f"{model_name}_avg_time"
            throughput_col = f"{model_name}_avg_throughput"
            
            if time_col in results_df.columns:
                avg_time = results_df[time_col].mean()
                avg_throughput = results_df[throughput_col].mean()
                report.append(f"{model_name}: 平均响应时间 {avg_time:.2f}s, 平均吞吐量 {avg_throughput:.1f} tokens/s")
        
        return "\n".join(report)


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
    # 配置你的模型
model_configs = {
    "qwen3": {
        "api_base": "http://192.168.100.30:16001/v1",
        "api_key": "empty"
    },
    "qwen3-vllm": {
        "api_base": "http://192.168.100.30:16002/v1",
        "api_key": "empty"
    }
}

# 测试提示语
test_prompts = [
    "请解释一下机器学习的基本概念",
    "写一个关于人工智能的短故事",
    "如何提高编程技能？请给出具体建议",
    "用Python写一个快速排序算法",
    "谈谈气候变化对全球经济的影响"
]

# 创建基准测试实例
benchmark = ModelBenchmark(model_configs)

# 运行测试
print("开始模型性能比较测试...")
results = await benchmark.compare_models(test_prompts, num_runs=3)

# 生成报告
report = benchmark.generate_report(results)
print(report)

# 保存结果到CSV
results.to_csv("model_benchmark_results.csv", index=False)
print("\n详细结果已保存到 model_benchmark_results.csv")

开始模型性能比较测试...

测试提示: '请解释一下机器学习的基本概念...'
正在测试模型: qwen3
hhhhhhh
请解释一下机器学习的基本概念
[ChatMessage(role=<MessageRole.SYSTEM: 'system'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='你是一个有帮助的AI助手。')]), ChatMessage(role=<MessageRole.USER: 'user'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='请解释一下机器学习的基本概念')])]
qqqqq
正在测试模型: qwen3-vllm
hhhhhhh
请解释一下机器学习的基本概念
[ChatMessage(role=<MessageRole.SYSTEM: 'system'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='你是一个有帮助的AI助手。')]), ChatMessage(role=<MessageRole.USER: 'user'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='请解释一下机器学习的基本概念')])]
qqqqq
  运行 1/3: 25.80s, 174 tokens
hhhhhhh
请解释一下机器学习的基本概念
[ChatMessage(role=<MessageRole.SYSTEM: 'system'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='你是一个有帮助的AI助手。')]), ChatMessage(role=<MessageRole.USER: 'user'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='请解释一下机器学习的基本概念')])]
qqqqq
  运行 1/3: 30.91s, 