Cell 1: 环境设置和导入

In [None]:
# 环境设置
import sys
import os
from pathlib import Path

# 添加项目路径
project_root = Path.cwd().parent  # 根据您的项目结构调整
sys.path.append(str(project_root))

# 导入必要的库
import numpy as np
import json
from datetime import datetime
from typing import List, Dict, Optional
import traceback

# 导入工作流相关模块
from app.core.data.workflow import DataQaWorkflow, WorkflowConfig
from app.models import (
    DataQACompletionRequest,
    DataQAChatCompletionResponse,
    ChatMessage
)
from czce_ai.llm.chat import LLMChat as LLMModel
from czce_ai.utils.log import logger

# 设置日志级别
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

print("环境设置完成")
print(f"项目根目录: {project_root}")
print(f"Python路径: {sys.path}")

Cell 2: 初始化LLM客户端和工作流

In [None]:
# 初始化LLM模型（根据您的实际配置调整）
from app.core.components import qwen3_llm, qwen3_thinking_llm

# 创建LLM实例
ans_llm = qwen3_llm  # 答案生成LLM
ans_thinking_llm = qwen3_thinking_llm  # 思考模式LLM
query_llm = qwen3_llm  # 查询优化LLM

# 配置参数
workflow_config = WorkflowConfig(
    history_round=1,  # 历史对话轮数
    follow_up_round=1,  # 追问轮数
    reranking_threshold=0.2,
    collection="hybrid_sql",
    enable_entity_recognition=True,
    enable_reranker=True
)

# 初始化工作流
workflow = DataQaWorkflow(
    ans_llm=ans_llm,
    ans_thinking_llm=ans_thinking_llm,
    query_llm=query_llm,
    history_round=1,
    reranking_threshold=0.2,
    config=workflow_config,
    knowledge_id="3cc33ed2-21fb-4452-9e10-528867bd5f99",  # 您的知识库ID
    bucket_name="czce-ai-dev",
    collection="hybrid_sql",
    use_cache=True  # 使用FAQ缓存
)

print("工作流初始化完成")
print(f"FAQ数据已加载: {len(workflow.faq_data)}条")

Cell 3: 构造测试数据

In [None]:
# 基于FAQ知识库构造不同相似度的测试查询

# 测试数据集
test_queries = {
    # 高相似度查询 (>=0.85) - 这些应该直接从FAQ返回结果
    "high_similarity": [
        "查询郑商所当前主力合约的代码、结算价及成交量",  # 完全匹配FAQ
        "筛选当前主力合约结算价超过2000元/吨的品种",  # 完全匹配FAQ
        "查询郑商所当前投机成交量占总成交量占比超过50%的品种",  # 几乎完全匹配
    ],
    
    # 中等相似度查询 (0.7-0.85) - 这些会作为参考添加到prompt
    "medium_similarity": [
        "查询郑商所主力合约的价格和成交情况",  # 与FAQ类似但不完全相同
        "郑商所当前的主要合约信息",  # 简化版本
        "主力合约结算价大于2000的品种有哪些",  # 表述略有不同
    ],
    
    # 低相似度查询 (<0.7) - 这些会走完整的表格定位和SQL生成流程
    "low_similarity": [
        "统计上个月的期货交易总额",  # 与FAQ差异较大
        "分析期货市场的风险指标",  # 完全不同的查询
        "计算所有交易所的平均手续费",  # 新的查询类型
    ],
    
    # 需要追问的查询（信息不足）
    "follow_up_needed": [
        "查询成交量",  # 缺少具体品种和时间
        "统计持仓量",  # 缺少详细信息
        "最近的交易数据",  # 模糊查询
    ],
}

print("测试数据构造完成")
print(f"高相似度查询: {len(test_queries['high_similarity'])}个")
print(f"中等相似度查询: {len(test_queries['medium_similarity'])}个")
print(f"低相似度查询: {len(test_queries['low_similarity'])}个")
print(f"需要追问的查询: {len(test_queries['follow_up_needed'])}个")

Cell 4: 测试单个函数 - modify_query

In [None]:
def test_modify_query():
    """测试问题改写功能"""
    print("=" * 50)
    print("测试 modify_query 函数")
    print("=" * 50)
    
    test_messages = [
        ChatMessage(role="user", content="查询棉花的成交量和价格")
    ]
    
    try:
        # 创建请求对象
        request = DataQACompletionRequest(
            messages=test_messages,
            model="test",
            created=int(datetime.now().timestamp())
        )
        
        # 提取消息
        input_messages = workflow._extract_input_messages(request)
        
        # 执行问题改写
        optimized_query = workflow.modify_query(
            input_messages=input_messages,
            enable_follow_up=True
        )
        
        print(f"原始查询: {test_messages[0].content}")
        print(f"改写后查询: {optimized_query.rewritten_query}")
        print(f"是否充分: {optimized_query.is_sufficient}")
        
        return optimized_query
        
    except Exception as e:
        print(f"错误: {e}")
        traceback.print_exc()
        return None

# 执行测试
result = test_modify_query()

Cell 5: 测试单个函数 - entity_recognition

In [None]:
def test_entity_recognition():
    """测试实体识别功能"""
    print("=" * 50)
    print("测试 entity_recognition 函数")
    print("=" * 50)
    
    test_queries = [
        "查询FG2509的成交量",
        "棉花CF2409合约的价格",
        "苹果期货AP2501的持仓量",
        "查询白糖SR的交易情况"
    ]
    
    for query in test_queries:
        try:
            enhanced = workflow.entity_recognition(query)
            print(f"原始: {query}")
            print(f"增强: {enhanced}")
            print("-" * 30)
        except Exception as e:
            print(f"错误处理 '{query}': {e}")
    
    return True

# 执行测试
test_entity_recognition()

Cell 6: 测试单个函数 - semantic_search_faq

In [None]:
def test_semantic_search_faq():
    """测试FAQ语义搜索功能"""
    print("=" * 50)
    print("测试 semantic_search_faq 函数")
    print("=" * 50)
    
    # 测试不同相似度的查询
    test_cases = [
        ("查询郑商所当前主力合约的代码、结算价及成交量", "高相似度"),
        ("查询主力合约的价格信息", "中等相似度"),
        ("计算年度总收益", "低相似度"),
    ]
    
    for query, expected_similarity in test_cases:
        try:
            # 先进行实体识别
            enhanced_query = workflow.entity_recognition(query)
            
            # 执行FAQ搜索
            results = workflow.semantic_search_faq(enhanced_query, top_k=3)
            
            print(f"查询: {query}")
            print(f"增强查询: {enhanced_query}")
            print(f"期望相似度: {expected_similarity}")
            
            if results:
                for i, result in enumerate(results[:2], 1):  # 只显示前2个结果
                    print(f"结果{i}:")
                    print(f"问题: {result['question'][:50]}...")
                    print(f"相似度: {result['similarity']:.3f}")
                    print(f"表: {result['table']}")
            else:
                print("未找到相关FAQ")
            
            print("-" * 30)
            
        except Exception as e:
            print(f"错误: {e}")
            traceback.print_exc()
    
    return True

# 执行测试
test_semantic_search_faq()

SQL提取与展示

In [None]:
# SQL提取与展示
import re

def show_sql(content: str) -> None:
    """
    目的：从一段文本 content 中提取并完整打印 SQL 语句。
    提取策略：
            1. 优先匹配围栏代码块：```sql 或无语言标注的三引号代码块。
            2. 如果上一步没有匹配到，再兜底匹配以 WITH/SELECT 开头、到分号或文本结尾的片段。
    去重：strip 去空白后，用 seen 集合和有序列表 uniq 实现“去重且保留原顺序”。
    展示：逐段打印 SQL，两侧加分隔线；若没有抓到，打印内容前 300 个字符预览。
    """
    blocks = re.findall(r"```[ \t]*(?:sql)?\s*([\s\S]*?)```", content, flags=re.IGNORECASE)
    if not blocks:
        blocks = re.findall(r"(?is)\b(?:WITH|SELECT)\b[\s\S]*?(?:;|$)", content)

    seen, uniq = set(), []
    for b in (s.strip() for s in blocks):
        if b and b not in seen:
            seen.add(b)
            uniq.append(b)

    if not uniq:
        print("未检测到SQL。内容预览：", content[:300])
        return

    for b in uniq:
        print("\n" + "-"*80)
        print(b)
        print("-"*80)

Cell 7: 端到端测试 - 高相似度路径 (>=0.9)

In [None]:
def test_e2e_high_similarity():
    """
    端到端测试：高相似度FAQ路径
    User Query -> Step1-3 -> SQL Output (FAQ直接返回)
    """
    print("=" * 80)
    print("端到端测试：高相似度FAQ路径 (>=0.9)")
    print("=" * 80)
    
    # 使用与FAQ完全匹配的查询
    user_query = "查询郑商所当前主力合约的代码、结算价及成交量"
    print(f"用户查询: {user_query}")
    print("-" * 80)
    
    messages = [ChatMessage(role="user", content=user_query)]
    request = DataQACompletionRequest(
        messages=messages,
        model="test",
        created=int(datetime.now().timestamp()),
        follow_up_num=0,  # 不启用追问
        knowledge_base_ids=["3cc33ed2-21fb-4452-9e10-528867bd5f99"],  # 使用FAQ知识库ID
        use_reranker=True
    )
    
    try:
        # 执行完整流程
        print("执行do_generate函数...")
        response = workflow.do_generate(
            request=request,
            enable_follow_up=False,  # 不启用追问
            thinking=True  # 启用思考模式
        )
        
        # 展示执行路径
        print("\n执行路径:")
        for i, step in enumerate(response.steps, 1):
            print(f"  Step {step.number}: {step.name} [{step.key}]")
            if step.key == "modify_query":
                print(f"    ➜ 改写后: {step.prompt}")
            elif step.key == "query_entity_recognition":
                print(f"    ➜ 实体识别后: {step.prompt}")
            elif step.key == "semantic_search_faq":
                print(f"    ➜ FAQ匹配: {step.prompt}")
        
        # 展示最终SQL输出
        print("\n 最终输出:")
        if response.choices:
            content = response.choices[0].message.content
            show_sql(content)

        # 验证路径
        print("\n验证结果:")
        if len(response.steps) == 3 and response.model == "faq":
            print("高相似度FAQ快速路径验证成功！")
            print("执行步骤数: 3 (Step1→Step2→Step3)")
            print("直接从FAQ返回SQL，未调用LLM")
        else:
            print(f"未走预期路径，实际步骤数: {len(response.steps)}")
        
        return response
        
    except Exception as e:
        print(f"错误: {e}")
        traceback.print_exc()
        return None

# 执行测试
high_sim_response = test_e2e_high_similarity()

In [None]:
from tqdm import tqdm

from app.core.data.workflow_batch_faqs_test_utils import (
    load_faq_questions, 
    extract_sql, 
    compare_sql, 
    save_results,
    print_statistics
)

def test_e2e_high_similarity():
    """
    端到端测试：252个问题的批量测试
    """
    print("=" * 60)
    print("端到端测试：252个问题批量测试")
    print("=" * 60)
    
    # 加载252个问题
    try:
        questions = load_faq_questions()
    except Exception as e:
        print(f"加载问题失败: {e}")
        return None
    
    # 存储结果
    results = []
    
    # 使用tqdm显示进度
    for question_data in tqdm(questions, desc="测试进度"):
        user_query = question_data['question']
        
        # ===== 保持原有的核心结构 =====
        messages = [ChatMessage(role="user", content=user_query)]
        request = DataQACompletionRequest(
            messages=messages,
            model="test",
            created=int(datetime.now().timestamp()),
            follow_up_num=0,
            knowledge_base_ids=["3cc33ed2-21fb-4452-9e10-528867bd5f99"],
            use_reranker=True
        )
        
        try:
            # 执行工作流
            response = workflow.do_generate(
                request=request,
                enable_follow_up=False,
                thinking=False
            )
            
            # 提取生成的SQL
            generated_sql = ""
            if response.choices:
                content = response.choices[0].message.content
                generated_sql = extract_sql(content)
            
            # 检查FAQ快速路径
            is_faq_path = (len(response.steps) == 3 and response.model == "faq")
            
            # 混合策略比较
            is_match, match_type, match_score = compare_sql(
                question_data['expected_sql'], 
                generated_sql
            )
            
            # 记录结果
            result = {
                'question_id': question_data['id'],
                'original_question': question_data['question'],
                'expected_sql': question_data['expected_sql'],
                'generated_sql': generated_sql,
                'is_match': is_match,
                'match_type': match_type,
                'match_score': match_score,
                'is_faq_path': is_faq_path
            }
            results.append(result)
            
        except Exception as e:
            # 记录错误
            error_result = {
                'question_id': question_data['id'],
                'original_question': question_data['question'],
                'expected_sql': question_data['expected_sql'],
                'generated_sql': "",
                'is_match': False,
                'match_type': "error",
                'match_score': 0.0,
                'is_faq_path': False
            }
            results.append(error_result)
    
    # 保存结果
    save_results(results)
    
    # 打印统计
    print_statistics(results)
    
    return results

# 执行测试
batch_test_results = test_e2e_high_similarity()

Cell 8: 端到端测试 - 中等相似度路径 (0.7-0.85)

In [None]:
def test_e2e_medium_similarity():
    """
    端到端测试：中等相似度FAQ路径
    User Query -> Step1-6 -> SQL Output (FAQ作为参考)
    """
    print("=" * 80)
    print("端到端测试：中等相似度FAQ路径 (0.7-0.85)")
    print("=" * 80)
    
    # 使用与FAQ相似但不完全匹配的查询
    user_query = "查询郑商所主力合约的价格和交易情况"
    print(f"用户查询: {user_query}")
    print("-" * 80)
    
    messages = [ChatMessage(role="user", content=user_query)]
    request = DataQACompletionRequest(
        messages=messages,
        model="test",
        created=int(datetime.now().timestamp()),
        follow_up_num=0,
        knowledge_base_ids=["3cc33ed2-21fb-4452-9e10-528867bd5f99"],  # 使用FAQ知识库ID
        use_reranker=True
    )
    
    try:
        print("执行do_generate函数...")
        response = workflow.do_generate(
            request=request,
            enable_follow_up=False,
            thinking=False
        )
        
        # 展示完整执行路径
        print("\n执行路径:")
        for i, step in enumerate(response.steps, 1):
            print(f"  Step {step.number}: {step.name} [{step.key}]")
            
            # 展示每个步骤的详细信息
            if step.key == "modify_query":
                print(f"    ➜ 改写: {step.prompt[:50]}...")
            elif step.key == "query_entity_recognition":
                print(f"    ➜ 实体识别: {step.prompt[:50]}...")
            elif step.key == "semantic_search_faq":
                print(f"    ➜ FAQ搜索: {step.prompt}")
            elif step.key == "locate_table":
                # 解析表格定位结果
                if isinstance(step.prompt, list) and len(step.prompt) > 0:
                    print(f"    ➜ 定位到 {len(step.prompt)} 个表")
                    for idx, table in enumerate(step.prompt[:2], 1):  # 只显示前2个
                        if isinstance(table, dict):
                            print(f"       表{idx}: {table.get('table_name', 'N/A')} (分数: {table.get('score', 0):.3f})")
            elif step.key == "generate_prompt":
                print(f"    ➜ 生成提示词长度: {len(str(step.prompt))} 字符")
                if "参考示例" in str(step.prompt):
                    print(f"       ✓ 包含FAQ参考")
            elif step.key == "generate_sql":
                print(f"    ➜ SQL生成完成")
        
        # 展示最终SQL输出
        print("\n最终SQL输出:")
        if response.choices:
            show_sql(response.choices[0].message.content)

        # 验证路径
        print("\n验证结果:")
        expected_steps = ["modify_query", "query_entity_recognition", "semantic_search_faq", 
                         "locate_table", "generate_prompt", "generate_sql"]
        actual_steps = [step.key for step in response.steps]
        
        if len(response.steps) == 6:
            print("中等相似度完整路径验证成功！")
            print("执行步骤数: 6 (完整流程)")
            print("FAQ作为参考增强了prompt")
            print("最终通过LLM生成SQL")
        else:
            print(f"步骤数不匹配，预期6个，实际{len(response.steps)}个")
            
        return response
        
    except Exception as e:
        print(f"错误: {e}")
        traceback.print_exc()
        return None

# 执行测试
medium_sim_response = test_e2e_medium_similarity()

Cell 9: 端到端测试 - 低相似度路径 (<0.7)

In [None]:
def test_e2e_low_similarity():
    """
    端到端测试：低相似度路径
    User Query -> Step1-6 -> SQL Output (无FAQ参考)
    """
    print("=" * 80)
    print("端到端测试：低相似度路径 (<0.7)")
    print("=" * 80)
    
    # 使用与FAQ差异较大的查询
    user_query = "统计上个月所有交易所的期货总交易额和平均手续费"
    print(f"用户查询: {user_query}")
    print("-" * 80)
    
    messages = [ChatMessage(role="user", content=user_query)]
    request = DataQACompletionRequest(
        messages=messages,
        model="test",
        created=int(datetime.now().timestamp()),
        follow_up_num=0,
        knowledge_base_ids=["3cc33ed2-21fb-4452-9e10-528867bd5f99"],  # 使用FAQ知识库ID
        use_reranker=True
    )
    
    try:
        print("执行do_generate函数...")
        response = workflow.do_generate(
            request=request,
            enable_follow_up=False,
            thinking=False
        )
        
        # 展示完整执行路径
        print("\n行路径:")
        has_faq_reference = False
        
        for i, step in enumerate(response.steps, 1):
            print(f"  Step {step.number}: {step.name} [{step.key}]")
            
            if step.key == "modify_query":
                print(f"    ➜ 改写: {step.prompt[:60]}...")
            elif step.key == "query_entity_recognition":
                print(f"    ➜ 实体识别: {step.prompt[:60]}...")
            elif step.key == "semantic_search_faq":
                print(f"    ➜ FAQ搜索: {step.prompt}")
                if "未找到" in str(step.prompt) or "相似度" in str(step.prompt):
                    if "0.7" not in str(step.prompt) or float(str(step.prompt).split("：")[-1]) < 0.7:
                        print(f"       ✓ FAQ相似度低于0.7，不作为参考")
            elif step.key == "locate_table":
                if isinstance(step.prompt, list):
                    print(f"    ➜ 定位到 {len(step.prompt)} 个相关表")
            elif step.key == "generate_prompt":
                prompt_str = str(step.prompt)
                if "参考示例" in prompt_str:
                    has_faq_reference = True
                    print(f"    ➜ 提示词包含FAQ参考")
                else:
                    print(f"    ➜ 提示词未包含FAQ参考（相似度过低）")
            elif step.key == "generate_sql":
                print(f"    ➜ LLM生成SQL完成")
        
        # 展示最终SQL输出
        print("\n最终SQL输出:")
        if response.choices:
            content = response.choices[0].message.content
            # 提取并显示SQL
            print("  " + "-" * 76)
            if "SELECT" in content.upper():
                # 简单展示包含SELECT的部分
                lines = content.split('\n')
                sql_lines = [line for line in lines if 'SELECT' in line.upper() or 'FROM' in line.upper() 
                            or 'WHERE' in line.upper() or 'GROUP' in line.upper()]
                for line in sql_lines[:5]:
                    print(f"  {line.strip()}")
            else:
                print(f"  {content[:200]}...")
            print("  " + "-" * 76)
        
        # 验证路径
        print("\n验证结果:")
        if len(response.steps) == 6 and not has_faq_reference:
            print("低相似度完整路径验证成功！")
            print("执行步骤数: 6 (完整流程)")
            print("FAQ相似度过低，未作为参考")
            print("纯粹通过表格定位和LLM生成SQL")
        else:
            print(f"验证需要确认，步骤数: {len(response.steps)}")
            
        return response
        
    except Exception as e:
        print(f"错误: {e}")
        traceback.print_exc()
        return None

# 执行测试
low_sim_response = test_e2e_low_similarity()

Cell 10: 端到端测试 - 追问路径

In [None]:
def test_e2e_follow_up():
    """
    端到端测试：追问路径
    User Query(不充分) -> 追问 -> User补充 -> Step1-6 -> SQL Output
    """
    print("=" * 80)
    print("端到端测试：追问路径")
    print("=" * 80)
    
    # 第一轮：信息不充分的查询
    user_query_1 = "查询成交量"
    print(f"第1轮用户查询: {user_query_1}")
    print("-" * 80)
    
    messages = [ChatMessage(role="user", content=user_query_1)]
    request_1 = DataQACompletionRequest(
        messages=messages,
        model="test",
        created=int(datetime.now().timestamp()),
        follow_up_num=0,
        knowledge_base_ids=["3cc33ed2-21fb-4452-9e10-528867bd5f99"],  # 使用FAQ知识库ID
        use_reranker=True
    )
    
    try:
        print("第1轮执行...")
        response_1 = workflow.do_generate(
            request=request_1,
            enable_follow_up=True,
            thinking=True  # 启用思考模式
        )
        
        # 检查是否触发追问
        if response_1.choices[0].message.is_follow_up:
            print(f"触发追问: {response_1.choices[0].message.content}")
            print("\n" + "-" * 80)
            
            # 第二轮：用户补充信息
            user_query_2 = "查询棉花CF品种2025年7月2日的成交量"
            print(f"\n第2轮用户补充: {user_query_2}")
            print("-" * 80)
            
            # 添加历史对话
            messages.append(response_1.choices[0].message)
            messages.append(ChatMessage(role="user", content=user_query_2))
            
            request_2 = DataQACompletionRequest(
                messages=messages,
                model="test",
                created=int(datetime.now().timestamp()),
                follow_up_num=1,
                knowledge_base_ids=["3cc33ed2-21fb-4452-9e10-528867bd5f99"],  # 使用FAQ知识库ID
                use_reranker=True
            )
            
            print("第2轮执行...")
            response_2 = workflow.do_generate(
                request=request_2,
                enable_follow_up=True,
                thinking=False
            )
            
            # 展示完整执行路径
            print("\n最终执行路径:")
            for step in response_2.steps:
                print(f"  Step {step.number}: {step.name} [{step.key}]")
                if step.key == "modify_query":
                    print(f"    ➜ 整合后查询: {step.prompt[:60]}...")
            
            # 展示最终SQL
            print("\n最终SQL输出:")
            if response_2.choices:
                content = response_2.choices[0].message.content
                show_sql(content)
            
            print("\n验证结果:")
            print("追问流程验证成功！")
            print(f"第1轮：触发追问（信息不足）")
            print(f"第2轮：执行完整查询（{len(response_2.steps)}个步骤）")
            print("最终生成SQL")
            
            return response_2
        else:
            print("未触发追问，可能查询被优化器补充了信息")
            return response_1
            
    except Exception as e:
        print(f"错误: {e}")
        traceback.print_exc()
        return None

# 执行测试
follow_up_response = test_e2e_follow_up()

Cell 11: 端到端测试 - Thinking模式

In [None]:
def test_e2e_thinking_mode():
    """
    端到端测试：Thinking模式对比
    同一查询分别用普通模式和Thinking模式执行
    """
    print("=" * 80)
    print("端到端测试：Thinking模式对比")
    print("=" * 80)
    
    # 使用一个复杂查询
    user_query = "分析棉花期货CF品种最近7天的价格走势，计算日均成交量和波动率"
    print(f"用户查询: {user_query}")
    print("-" * 80)
    
    messages = [ChatMessage(role="user", content=user_query)]
    request = DataQACompletionRequest(
        messages=messages,
        model="test",
        created=int(datetime.now().timestamp()),
        follow_up_num=0
    )
    
    try:
        # 1. 普通模式
        print("\n普通模式执行:")
        print("执行do_generate函数...")
        response_normal = workflow.do_generate(
            request=request,
            enable_follow_up=False,
            thinking=False
        )
        
        print("  执行步骤:")
        for step in response_normal.steps:
            print(f"    - {step.name}")
        print(f"  Token使用: {response_normal.usage.total_tokens}")
        
        # 2. Thinking模式
        print("\nThinking模式执行:")
        print("执行do_generate函数...")
        response_thinking = workflow.do_generate(
            request=request,
            enable_follow_up=False,
            thinking=True
        )
        
        print("  执行步骤:")
        for step in response_thinking.steps:
            print(f"    - {step.name}")
        print(f"  Token使用: {response_thinking.usage.total_tokens}")
        
        # 展示SQL对比
        print("\nSQL输出对比:")
        
        print("\n普通模式SQL:")
        print("  " + "-" * 76)
        if response_normal.choices:
            content_normal = response_normal.choices[0].message.content
            # 提取SQL展示
            if "SELECT" in content_normal.upper():
                lines = content_normal.split('\n')
                for line in lines[:5]:
                    if any(kw in line.upper() for kw in ['SELECT', 'FROM', 'WHERE', 'GROUP', 'ORDER']):
                        print(f"  {line.strip()}")
        print("  " + "-" * 76)
        
        print("\nThinking模式SQL:")
        print("  " + "-" * 76)
        if response_thinking.choices:
            content_thinking = response_thinking.choices[0].message.content
            # 检查reasoning内容
            if response_thinking.choices[0].message.reasoning_content:
                print(f"  [包含推理过程，长度: {len(response_thinking.choices[0].message.reasoning_content)}]")
            # 提取SQL展示
            if "SELECT" in content_thinking.upper():
                lines = content_thinking.split('\n')
                for line in lines[:5]:
                    if any(kw in line.upper() for kw in ['SELECT', 'FROM', 'WHERE', 'GROUP', 'ORDER']):
                        print(f"  {line.strip()}")
        print("  " + "-" * 76)
        
        # 验证对比
        print("\n验证结果:")
        token_diff = response_thinking.usage.total_tokens - response_normal.usage.total_tokens
        print(f"两种模式都成功生成SQL")
        print(f"Thinking模式多使用 {token_diff} 个tokens")
        if response_thinking.choices[0].message.reasoning_content:
            print(f"Thinking模式包含推理内容")
        print(f"步骤数相同: {len(response_normal.steps)} 个")
        
        return response_thinking
        
    except Exception as e:
        print(f"错误: {e}")
        traceback.print_exc()
        return None

# 执行测试
thinking_response = test_e2e_thinking_mode()

Cell 12: 综合测试总结

In [None]:
def test_e2e_comprehensive_summary():
    """
    综合测试总结：汇总所有端到端测试结果
    """
    print("=" * 80)
    print("端到端测试综合总结")
    print("=" * 80)
    
    # 定义测试场景和对应的查询
    test_scenarios = {
        "高相似度(>=0.85)": {
            "queries": [
                "查询郑商所当前主力合约的代码、结算价及成交量",
                "筛选当前主力合约结算价超过2000元/吨的品种"
            ],
            "expected_steps": 3,
            "expected_path": "Step1→Step2→Step3(FAQ直接返回)"
        },
        "中等相似度(0.7-0.85)": {
            "queries": [
                "查询郑商所主力合约的价格信息",
                "主力合约结算价大于2000的品种有哪些"
            ],
            "expected_steps": 6,
            "expected_path": "Step1→Step2→Step3→Step4→Step5(FAQ参考)→Step6"
        },
        "低相似度(<0.7)": {
            "queries": [
                "统计上个月的期货交易总额",
                "计算所有交易所的平均手续费"
            ],
            "expected_steps": 6,
            "expected_path": "Step1→Step2→Step3→Step4→Step5→Step6"
        }
    }
    
    # 执行测试并收集结果
    test_results = []
    
    for scenario_name, scenario_config in test_scenarios.items():
        print(f"\n测试场景: {scenario_name}")
        print(f"   预期步骤数: {scenario_config['expected_steps']}")
        print(f"   预期路径: {scenario_config['expected_path']}")
        print("-" * 80)
        
        for query in scenario_config['queries'][:1]:  # 每个场景测试1个查询
            print(f"   查询: {query[:50]}...")
            
            try:
                request = DataQACompletionRequest(
                    messages=[ChatMessage(role="user", content=query)],
                    model="test",
                    created=int(datetime.now().timestamp()),
                    follow_up_num=0
                )
                
                response = workflow.do_generate(
                    request=request,
                    enable_follow_up=False,
                    thinking=False
                )
                
                # 检查结果
                actual_steps = len(response.steps)
                has_sql = False
                
                if response.choices:
                    content = response.choices[0].message.content
                    has_sql = "SELECT" in content.upper() or "sql" in content.lower()
                
                # 记录结果
                result = {
                    "scenario": scenario_name,
                    "query": query[:50],
                    "expected_steps": scenario_config['expected_steps'],
                    "actual_steps": actual_steps,
                    "steps_match": actual_steps == scenario_config['expected_steps'],
                    "has_sql": has_sql,
                    "success": actual_steps == scenario_config['expected_steps'] and has_sql
                }
                
                test_results.append(result)
                
                # 打印结果
                status = "SUCCESSFUL" if result['success'] else "WARNING"
                print(f"   {status} 步骤数: {actual_steps}/{scenario_config['expected_steps']}, SQL生成: {has_sql}")
                
            except Exception as e:
                print(f"错误: {str(e)[:50]}")
                test_results.append({
                    "scenario": scenario_name,
                    "query": query[:50],
                    "success": False,
                    "error": str(e)
                })
    
    # 汇总统计
    print("\n" + "=" * 80)
    print("测试统计")
    print("=" * 80)
    
    total_tests = len(test_results)
    successful_tests = sum(1 for r in test_results if r.get('success', False))
    
    print(f"总测试数: {total_tests}")
    print(f"成功数: {successful_tests}")
    print(f"成功率: {successful_tests/total_tests*100:.1f}%")
    
    # 分场景统计
    print("\n分场景统计:")
    for scenario in test_scenarios.keys():
        scenario_results = [r for r in test_results if r['scenario'] == scenario]
        scenario_success = sum(1 for r in scenario_results if r.get('success', False))
        print(f"  {scenario}: {scenario_success}/{len(scenario_results)} 成功")
    
    print("\n端到端测试完成!")
    print("所有步骤验证:")
    print("Step1: modify_query (问题改写)")
    print("Step2: entity_recognition (实体识别)")
    print("Step3: semantic_search_faq (FAQ语义搜索)")
    print("Step4: locate_table (表格定位)")
    print("Step5: generate_single_table_prompt (生成提示词)")
    print("Step6: generate_sql (生成SQL)")
    
    return test_results

# 执行综合测试
final_results = test_e2e_comprehensive_summary()