In [1]:
# 已移除 AIMessageChunk 合并猴补丁，避免递归重入问题。

In [2]:

import logging
logging.basicConfig(level=logging.DEBUG)


In [3]:
import os
import sys
import uuid
from typing import Any, Dict, List, Optional, TypedDict

import importlib.metadata as meta


print(meta.version("langchain"))

# 添加 backend 目录到 Python 路径，以便导入 app 模块
backend_dir = '/root/consult/backend'
if backend_dir not in sys.path:
    sys.path.insert(0, backend_dir)
    print(f"✅ 已将 {backend_dir} 添加到 Python 路径")

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool as tool_dec
from app.utils.progress_broker import get_progress_broker
import time
import asyncio

1.0.2
✅ 已将 /root/consult/backend 添加到 Python 路径


In [4]:
os.environ["HF_ENDPOINT"]="https://hf-mirror.com"
os.environ["HF_HUB_DOWNLOAD_PROGRESS"] = "1"
os.environ['HF_HUB_OFFLINE'] = '1'  # 禁用 HuggingFace Hub 连接

In [5]:
from app.services.llamaindex_retriever import LlamaIndexRetriever


In [6]:
from app.utils.import_with_timeout import import_symbol_with_timeout
LlamaIndexRetriever = import_symbol_with_timeout(
    "app.services.llamaindex_retriever", "LlamaIndexRetriever", timeout_seconds=5.0
)

In [7]:
os.environ["HF_ENDPOINT"]="https://hf-mirror.com"
os.environ["HF_HUB_DOWNLOAD_PROGRESS"] = "1"
os.environ['HF_HUB_OFFLINE'] = '1'  # 禁用 HuggingFace Hub 连接
os.environ['HF_DATASETS_OFFLINE'] = '1'  # 禁用数据集下载
os.environ['TRANSFORMERS_OFFLINE'] = '1'  # 禁用 Transformers 在线功能
os.environ['HF_HUB_DISABLE_TELEMETRY'] = '1'  # 禁用遥测（避免联网）

In [8]:
os.environ['HF_HUB_DOWNLOAD_TIMEOUT'] = '1'

In [9]:
os.environ['LOCAL_BGE_MODEL_DIR'] = '/root/consult/backend/models/bge-large-zh-v1.5'

In [10]:
retriever_global = LlamaIndexRetriever.get_instance("global")

INFO:app.services.llamaindex_retriever:🔄 开始加载 LlamaIndex 模块（离线模式已启用）
   请在联网环境中预先下载，下载后会永久保存，无需重复下载。
   运行: python -m app.utils.download_nltk_data
INFO:app.services.llamaindex_retriever:✅ 所有 LlamaIndex 模块加载完成，总耗时: 4.355s
INFO:app.services.llamaindex_retriever:🔄 初始化共享嵌入模型（离线模式）: /root/consult/backend/models/bge-large-zh-v1.5
INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: /root/consult/backend/models/bge-large-zh-v1.5
INFO:app.services.llamaindex_retriever:✅ 共享嵌入模型已缓存
INFO:app.services.llamaindex_retriever:🔄 创建新 LlamaIndexRetriever 实例: global
INFO:app.services.llamaindex_retriever:✅ LlamaIndexRetriever 实例已缓存: global


In [11]:
from app.services.web_search_service import get_web_search_service
web_search = get_web_search_service()


In [12]:
retriever_workspace = LlamaIndexRetriever.get_instance("global")


In [13]:
from dotenv import load_dotenv
load_dotenv("/root/consult/backend/.env")

True

In [14]:
api_key = os.getenv('THIRD_PARTY_API_KEY') 
api_base = os.getenv('THIRD_PARTY_API_BASE') 
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.2, openai_api_key=api_key, openai_api_base=api_base).with_config({"stream": False})

In [None]:
from langgraph.graph import StateGraph, START, END
from langgraph.graph import MessagesState
from langchain_core.messages import AnyMessage, ToolMessage, HumanMessage, AIMessage, SystemMessage
import logging
import logging, sys
from operator import add
from dataclasses import asdict

from typing import TypedDict, Annotated
logging.basicConfig(
    level=logging.INFO,
    stream=sys.stdout,
    format="%(levelname)s:%(name)s:%(message)s",
    force=True,  # 关键
)

class QBState(TypedDict):
    request: Dict[str, Any]
    workspace_id: str
    company_name: Optional[str]
    target_projects: List[str]
    known_info: Dict[str, Any]
    global_db_out: str
    messages: Annotated[list[AnyMessage], add]
    type: str
    retry_count: int
    max_retries: int

from langchain_core.tools import tool

    
class QuestionnaireBuilderWorkflow:
    def __init__(self, workspace_retriever, global_retriever, web_search_service, llm=None):
        self.workspace_retriever = workspace_retriever
        self.global_retriever = global_retriever
        self.web_search_service = web_search_service
        if llm is None:
            api_key = os.getenv('THIRD_PARTY_API_KEY') or os.getenv('OPENAI_API_KEY')
            api_base = os.getenv('THIRD_PARTY_API_BASE') or os.getenv('OPENAI_BASE_URL', 'https://api.openai.com/v1')
            self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.2, openai_api_key=api_key, openai_api_base=api_base).with_config({"stream": False})
        else:
            self.llm = llm.with_config({"stream": False}) if hasattr(llm, "with_config") else llm
        self.graph = self._build_graph()
        self.checkpointer = MemorySaver()
        self.compiled_graph = self.graph.compile(checkpointer=self.checkpointer)

    
    
    

    def _build_graph(self) -> StateGraph:
        
        @tool
        def search_global_db(query: str) -> str:
            """从全局数据库中检索相关数据，全局数据库包含各类政策信息与过去的申请案例，输入具体检索内容"""
            snippets: List[str] = []
            # 全局检索（同步包装）
            try:
                logging.info(f"[db_search_tool] 开始全局检索: query={query}")
                g_results = asyncio.run(self.global_retriever.retrieve(query=query, top_k=6, use_hybrid=True, use_compression=True))
                logging.info(f"[db_search_tool] 全局检索返回 {len(g_results)} 条结果")
                for r in g_results:
                    title = r.get("title") or r.get("name") or r.get("document_id") or "片段"
                    text = r.get("text") or r.get("content") or ""
                    snippets.append(f"[GLOBAL] {title}: {text[:300]}")
            except Exception as e:
                logging.info(f"[db_search_tool] 全局检索失败: {e}")
            return "\n".join(snippets[:12]) or "未找到有效片段"
        
        # 定义异步实现
        async def _search_web_async_impl(query: str) -> str:
            """异步搜索实现"""
            try:
                results = await self.web_search_service.search_web(query=query, num_results=6)
            except Exception as e:
                logging.info(f"[web_search_tool] 网络搜索失败: {e}")
                return "(网络搜索失败)"
            if not results:
                return "(未找到结果)"
            lines: List[str] = []
            for idx, r in enumerate(results[:8], 1):
                d = asdict(r)
                title = d.get("title") or r.get("name") or r.get("snippet_title") or "未知标题"
                url = d.get("url") or r.get("link") or r.get("source_url") or ""
                snippet = d.get("snippet") or r.get("content") or r.get("desc") or ""
                lines.append(f"[{idx}] {title}\n{url}\n{snippet[:200]}")
            print("lines","\n\n".join(lines))
            return "\n\n".join(lines)
        
        # 定义同步包装函数 - 确保在所有情况下都能工作
        def _search_web_sync_impl(query: str) -> str:
            """同步包装 - 用于满足 create_agent 的同步检查要求"""
            # 这个函数会在 create_agent 验证时被调用，也可能在实际执行时被调用
            # 我们需要确保它能够正确处理所有情况
            try:
                # 首先尝试获取当前运行的 loop
                loop = asyncio.get_running_loop()
                # 如果有运行中的 loop，在新线程中运行
                import threading
                result_container = [None]
                exception_container = [None]
                
                def run_in_thread():
                    try:
                        # 在新线程中创建新的 event loop
                        new_loop = asyncio.new_event_loop()
                        asyncio.set_event_loop(new_loop)
                        try:
                            result = new_loop.run_until_complete(_search_web_async_impl(query))
                            result_container[0] = result
                        finally:
                            new_loop.close()
                            asyncio.set_event_loop(None)
                    except Exception as e:
                        exception_container[0] = e
                
                thread = threading.Thread(target=run_in_thread, daemon=True)
                thread.start()
                thread.join(timeout=60)
                
                if thread.is_alive():
                    raise TimeoutError("搜索超时")
                if exception_container[0]:
                    raise exception_container[0]
                
                if result_container[0] is None:
                    raise RuntimeError("同步包装函数未返回结果")
                
                return result_container[0]
            except RuntimeError as e:
                # 没有运行中的 loop，直接使用 asyncio.run
                if "no running event loop" in str(e).lower() or "get_running_loop" in str(e):
                    try:
                        return asyncio.run(_search_web_async_impl(query))
                    except Exception as run_error:
                        logging.error(f"[search_web_sync] asyncio.run 失败: {run_error}")
                        # 如果 asyncio.run 也失败，返回错误信息
                        return f"(同步调用失败: {run_error})"
                else:
                    # 其他 RuntimeError，直接抛出
                    raise
        
        # 使用 StructuredTool.from_function 创建工具，同时提供同步和异步版本
        from langchain_core.tools import StructuredTool
        
        # 确保同步函数可以被正确访问
        # 检查同步函数是否存在
        if _search_web_sync_impl is None:
            raise ValueError("同步函数 _search_web_sync_impl 未定义")
        
        search_web = StructuredTool.from_function(
            func=_search_web_sync_impl,  # 同步版本（用于 create_agent 检查）
            coroutine=_search_web_async_impl,  # 异步版本（用于 agent.ainvoke 调用）
            name="search_web",
            description="从网络中检索相关数据；输入中文查询，返回若干条标题与链接摘要",
        )
        
        # 验证工具是否正确创建
        if search_web.func is None:
            raise ValueError(f"工具 {search_web.name} 的 func 属性为 None，无法同步调用")
        
        logging.info(f"[search_web工具] 工具创建成功，名称: {search_web.name}, func存在: {search_web.func is not None}")


        def db_search_node(state: QBState) -> QBState:

            logging.info(f"db_search_node")
            tp = ", ".join(state.get("target_projects") or [])
            known = state.get("known_info") 

            react_system_text = (
                "你是中国政策与补贴申报顾问。用户希望申请以下项目：{target_projects}\n"
                "已知信息（可能不完整）：\n{known_info}\n\n"
                "目标：输出面向实操的‘申报条件综述’。\n"
                "执行方式（严格遵守）：\n"
                "1) 如信息不足，循环调用 search_global_db 检索相关片段并综合要点；\n"
                "2) 每轮检索后判断是否仍存在关键空缺（资格/门槛/材料/流程/时间/例外情形）；\n"
                "3) 若仍有空缺，则继续调用 search_global_db；若已覆盖充分则停止循环；\n"
                "4) 最多调用工具 3 次，达到上限后必须仅输出 OK 并停止；\n"
                # "5) 完成后仅输出 OK 作为结束信号（禁止提前总结）。\n\n"
                "注意：优先引用明确口径与阈值，并标注来源类别。"
                
            ).format(
                target_projects=tp or "未指定",
                known_info=known or "无",
            )

            agent = create_agent(
                    model=self.llm,
                    tools=[search_global_db],
                    system_prompt=react_system_text,
                )
            message_content = state.get("messages")

            # 安全地提取消息内容
            def get_content(msg):
                if msg is None:
                    return ""
                if isinstance(msg, dict):
                    return msg.get("content", "")
                if hasattr(msg, "content"):
                    return getattr(msg, "content", "")
                if isinstance(msg, list) and len(msg) > 0:
                    return get_content(msg[0])
                return str(msg) if msg else ""
            logging.info(f"[db_search_tool] 调用开始检索")
            if message_content is not None and len(message_content) > 1:
                res_msg = agent.invoke({"messages": [{"role": "user", "content": "根据这些项目生成条件要求总览,已知总结结果：" + \
                get_content(message_content[-2]) + "，判断结果" + get_content(message_content[-1])}]})
            else:
                res_msg = agent.invoke({"messages": [{"role": "user", "content": "根据这些项目生成条件要求总览"}]})
            prev_msgs = res_msg.get("messages") or []
            logging.info(f"[db_search_tool] 模型检索完成")
            return {"messages": prev_msgs}
        
        async def web_search_node(state: QBState) -> QBState:
            """
            使用网络搜索工具对指定项目进行外部信息检索，返回若干检索要点。
            仅返回增量键以满足 LangGraph 的合并规则。
            """
            logging.info("[web_search_node] start")
            tp = ", ".join(state.get("target_projects") or [])
            known = state.get("known_info") or {}
            known_blob = "\n".join([f"- {k}: {v}" for k, v in list(known.items())[:12]]) if isinstance(known, dict) else str(known)

            system_text = (
                "你是网络检索与情报整合助手。目标：围绕‘{target_projects}’项目，为企业用户提供申报所需要点。\n"
                "检索与迭代规则（严格执行）：\n"
                "1) 先生成一批高质量中文检索词（3-8条），覆盖：申报条件/资格门槛/材料清单/办理流程/关键时间/例外情形/资金标准。必要时加入单位/地名限定。\n"
                "2) 强调优先从政府公开网站（如各级政府官网、部门官网、政务公开专栏、政策公告栏、gov.cn 域名等）进行检索，可以先检索官网公告或政策通知的具体发布位置，再深入细分栏目和部门。每轮按检索词调用 search_web 工具获取结果；优先来源顺序：官网/官媒（gov.cn/部门官网/平台）、政策文件与指南、地方主管部门、企查查/企信宝等工商信息、招股书/年报/公告、主流媒体；尽量附原始链接。\n"
                "3) 每轮结束自检信息空缺（上述各类要点是否覆盖、是否有明确阈值/关键口径/例外情形），若仍存在空缺，则生成更精准的新检索词（含实体/年份/地区/文号等），再进行下一轮；最多迭代 8 次。\n"
                "4) 达到 3 次上限后，必须仅输出：OK，并停止检索与输出；\n"
                "5) 输出时先列出‘检索词与命中概览’（按轮次/关键词列举命中情况与链接），再给出‘要点汇总’（分：资格门槛/材料/流程/时间/例外/资金标准），每点后标注来源编号。若信息已充分，请在最后单独一行仅输出：OK。\n"
                "注意：避免泛化与编造，尽量引用来源原文口径；同源重复合并；对工商主体信息可调用与企业相关的公开渠道（如：企查查、企信宝），谨慎标注身份字段。\n\n"
                "已知背景（供参考，可能不完整）：\n{known_info}"
            ).format(target_projects=tp or "未指定", known_info=known_blob or "无")

            agent = create_agent(
                model=self.llm,
                tools=[search_web],
                system_prompt=system_text,
            )

            # 生成一次性检索提示（也可根据需要改为多轮）
            message_content = state.get("messages")
            def get_content(msg):
                if msg is None:
                    return ""
                if isinstance(msg, dict):
                    return msg.get("content", "")
                if hasattr(msg, "content"):
                    return getattr(msg, "content", "")
                if isinstance(msg, list) and len(msg) > 0:
                    return get_content(msg[0])
                return str(msg) if msg else ""
            logging.info(f"[db_search_tool] 调用开始检索")
            if message_content is not None and len(message_content) > 1:
                res_msg = await agent.ainvoke({"messages": [{"role": "user", "content": "请检索并汇总要点，给出链接与简要摘录,已知总结结果：" + \
                get_content(message_content[-2]) + "，判断结果" + get_content(message_content[-1])}]})
            else:
                res_msg = await agent.ainvoke({"messages": [{"role": "user", "content": "请检索并汇总要点，给出链接与简要摘录"}]})
            prev_msgs = res_msg.get("messages") or []

            print(prev_msgs)    

            return {"messages": [prev_msgs]}


        def summery_node(state: QBState) -> QBState:

            logging.info(f"[summery_node] 开始总结")

            prompt = (
                "你是政策要求材料总结助手。用户希望申请以下项目：{target_projects}\n"
                "已知信息（可能不完整）：\n{known_info}\n\n"
                "你已经从数据库或网络中检索到了相关信息，请你基于这些信息，系统性、详细地总结申报条件要求和必要材料，涵盖但不限于以下方面：资格要求、申报门槛、必需材料、办理流程、关键时间节点、常见例外等。\n"
                "请充分整合所有获取到的政策条款、申报文件、案例等数据，对每项条件尽量详细具体，必要时举例说明，并按类别或逻辑结构清晰归纳，帮助用户快速把握核心要点和潜在难点。\n"
                "在总结后，请进一步思考和指出：\n"
                "1）根据当前信息，还有哪些可能的申报条件、材料或案例尚未被覆盖，建议检索哪些内容以补全信息？\n"
                "2）请列出你的补全建议点和你的分析思路，帮助后续继续完善。"
                "尽量细化到每个条件的具体内容要求"
            )

            # 直接从 QBState 获取 ToolMessage

            msgs = state.get("messages") or []
            tool_texts = [m.content for m in msgs if isinstance(m, ToolMessage)]
            tool_blob = "\n\n".join(tool_texts) if tool_texts else "（无工具输出）"
            # print("tool_blob",tool_blob)
            prompts = [
                SystemMessage(content=prompt),
                HumanMessage(content=tool_blob),
            ]
            res_msg = self.llm.invoke(prompts)
            logging.info(f"[summery_node] 总结完成")
            # print("res_msg",res_msg.content)
            return {"messages": [res_msg]}
        
        def Judger_node(state: QBState) -> QBState:
            """
            根据总结信息，判断是否需要进行检索，如果要检索具体哪些方案还要检索；受 max_retries 控制。
            """
            logging.info(f"[Judger_node] 开始判断")
            # 已达最大次数则直接放弃检索
            retry_count = int(state.get("retry_count") or 0)
            max_retries = int(state.get("max_retries") or 0)
            logging.info(f"retry_count: {retry_count}, max_retries: {max_retries}")
            if retry_count >= max_retries and max_retries > 0:
                logging.info(f"[Judger_node] 已达最大重检索次数: {retry_count}/{max_retries}")
                return {"type": "放弃检索"}

            prompt = (
                "你是申报条件判定助手。用户希望申请以下项目：{target_projects}\n"
                "已知信息（可能不完整）：\n{known_info}\n\n"
                "你已经获得了对申报条件与材料的总结（见下文），请根据这些内容判断：\n"
                "你的回答前四个字必须为“库中检索”或“网络检索”或“放弃检索”，用于直接表示是否还需要进一步检索信息；\n"
                "之后再给出具体建议方案和理由，但务必保证前四个字为“库中检索”或“网络检索”或“放弃检索”并直接作答；\n"
                "例如：\n"
                "库中检索，需要进一步检索学历要求、财务指标明细，因为…\n"
                "放弃检索，所有关键信息均已覆盖，无需补充。\n"
                "网络检索，需要进一步检索学历要求、财务指标明细，因为…\n"
                "请严格遵循：前四个字只能为“库中检索”或“网络检索”或“放弃检索”。"
                "若需要检索, 请列出需检索的具体方案（如：学历要求、财务指标明细等）及你的判定理由。\n"
            ).format(
                target_projects=state.get("target_projects") or "未指定",
                known_info=state.get("known_info") or "无"
            )

            logging.info("总结结果" + state.get("messages")[-1].content)
            # print(state.get("messages"))

            prompts = [
                SystemMessage(content=prompt),
                HumanMessage(content=state.get("messages")[-1].content),
            ]
            res_msg = self.llm.invoke(prompts)
            logging.info("[Judger_node] 模型判断完成")
            # 取前3字作为动作标记
            type_now = res_msg.content[0:4]
            print("type_now",type_now)
            print("res_msg",res_msg.content)
            logging.info(f"[Judger_node] 判断结果 {type_now}")
            # 若需要再次检索，则递增 retry_count
            if type_now in ("库中检索", "网络检索"):
                return {"messages": [res_msg], "type": type_now, "retry_count": retry_count + 1}
            else:
                return {"messages": [res_msg], "type": type_now}
        
        async def person_info_web_search_node(state: QBState) -> QBState:
            """
            使用网络搜索工具对指定项目进行外部信息检索，返回若干检索要点。
            仅返回增量键以满足 LangGraph 的合并规则。
            """
            logging.info("[web_search_node] start")
            company_name = ", ".join(state.get("company_name") or [])
            known = state.get("known_info") or {}
            known_blob = "\n".join([f"- {k}: {v}" for k, v in list(known.items())[:12]]) if isinstance(known, dict) else str(known)

            system_text = (
                "你是“主体信息核验”检索助手。任务：围绕以下“公司/个人”实体，检索公开渠道并给出可佐证材料与链接，验证其是否满足相关政策申报所需的主体条件与记录。\n"
                "检索与迭代规则（严格执行）：\n"
                "1) 先生成一批中文检索词（3-8条），覆盖：企业基础信息（名称/统一社会信用代码/注册地/高管股东）、资格资质/行政许可、行政处罚/信用记录、司法文书/裁判/执行、知识产权、公告/年报/招股书/招投标、主流媒体报道等；必要时加入实体/地区/年份/文号限定。\n"
                "2) 每轮按检索词调用 search_web 获取结果；优先来源顺序：政府/主管部门官网与平台（gov.cn、地方政务、信用平台、司法/裁判文书网）> 政策文件/公告/年报/招股书/招投标/公示 > 工商主体公开库（企查查、企信宝等）> 主流媒体/权威行业平台；尽量提供原始链接。\n"
                "3) 每轮完成后自检覆盖度：上述要点是否覆盖？是否有明确口径/阈值/例外？若仍有空缺，则生成更精准的新检索词（含实体/地区/年份/文号/关键词）并进行下一轮；最多迭代 8 次。\n"
                "4) 达到 8 次上限后，必须仅输出：OK，并停止检索与输出。\n"
                "输出格式（严格遵循）：\n"
                "- 检索词与命中概览（按轮次/关键词列出命中与链接，1-2 行要点）\n"
                "- 证据要点汇总（分项：基础信息、资格/许可、处罚/信用、司法/裁判、知识产权、公告/年报/招股/招投标、媒体报道等），每点后标注来源编号\n"
                "- 若信息已充分，请在最后单独一行仅输出：OK\n"
                "约束：严禁编造；尽量引用来源原文口径；同源重复合并；对个人敏感信息需最小化披露；如无公开记录，明确说明“未检出”；中文输出。\n\n"
                "已知背景（供参考，可能不完整）：\n{known_info}\n"
                "主体名称：\n{company_name}（如为公司/个人，请在检索词中加入其名称/别名/代码/地区等）"
            ).format(company_name=company_name or "未指定", known_info=known_blob or "无")

            agent = create_agent(
                model=self.llm,
                tools=[search_web],
                system_prompt=system_text,
            )

            # 生成一次性检索提示（也可根据需要改为多轮）
            message_content = state.get("messages")
            def get_content(msg):
                if msg is None:
                    return ""
                if isinstance(msg, dict):
                    return msg.get("content", "")
                if hasattr(msg, "content"):
                    return getattr(msg, "content", "")
                if isinstance(msg, list) and len(msg) > 0:
                    return get_content(msg[0])
                return str(msg) if msg else ""
            logging.info(f"[db_search_tool] 调用开始检索")
            if message_content is not None and len(message_content) > 1:
                res_msg = await agent.ainvoke({"messages": [{"role": "user", "content": "请检索并汇总要点，给出链接与简要摘录,已知总结结果：" + \
                get_content(message_content[-2]) + "，判断结果" + get_content(message_content[-1])}]})
            else:
                res_msg = await agent.ainvoke({"messages": [{"role": "user", "content": "请检索并汇总要点，给出链接与简要摘录"}]})
            prev_msgs = res_msg.get("messages") or []

            print("==============person===============")
            print("prev_msgs",prev_msgs)    
            print("message_content[-1]",message_content[-1])
            print("message_content[-2]",message_content[-2])
            
            return {"messages": [message_content[-2],message_content[-1],prev_msgs]}
        
        def analysis_node(state: QBState) -> QBState:

            logging.info(f"[analysis_node] 开始分析")

            prompt = (
                "你是“申报可行性评估”专家，面向具体申请人（公司/个人）与目标政策。\n"
                "目标：基于已检索/整理的信息，给出申请成功可能性的系统评估与改进建议，并产出一份进一步核实问卷。\n\n"

                "工作流程（严格遵循）：\n"
                "1) 要求与材料大纲：\n"
                "   - 资格条件（主体/行业/规模/资质/信用/地域/时间窗口）\n"
                "   - 硬性门槛（营收/投资/纳税/研发/人员/社保/不良记录等阈值）\n"
                "   - 佐证材料（营业执照、财务报表、纳税/社保证明、合同、专利/商标、获奖/荣誉等）\n"
                "   - 流程与关键时间点（申报节点、审核、异议、公示、发放）\n"
                "   - 例外情形与排除条款\n"
                "   为每条在括号中标注来源编号（若有）。\n\n"

                "2) 匹配与证据：将申请人信息与要求逐条比对，分组输出：\n"
                "   - 已满足（列证据点与来源编号）\n"
                "   - 基本满足但需补充（缺失/证据弱项）\n"
                "   - 未满足（关键差距）\n"
                "   - 不确定/模糊（待核实）\n\n"

                "3) 评分与结论：\n"
                "   - 从适配度(40)、合规与信用(20)、材料完备度(20)、流程与时间把控(10)、风险与不确定性(10) 五维度打分并给总分（0-100）。\n"
                "   - 给出一句话结论（可行/存在较大不确定/暂不具备），以及三条内的优先行动建议。\n\n"

                "4) 进一步核实问卷：用于向申请人收集关键信息与材料链接（可复制到表单）。\n"
                "   - 每个问题包含：提问、所需证据/证明、说明（为什么需要/判定依据）、期望格式（文件/截图/链接/编号）。\n"
                "   - 优先覆盖“不确定/模糊”和“基本满足但需补充”的条目，按优先级排序。\n"
                "   - 至少给出 8-15 个问题，禁用长篇开放题，尽量结构化。\n\n"

                "输出格式（严格遵循，中文）：\n"
                "## 一、要求与材料大纲（附来源编号）\n"
                "- ...\n\n"
                "## 二、匹配与证据\n"
                "### 2.1 已满足\n"
                "- 要求A：证据...（来源#1）\n"
                "### 2.2 基本满足但需补充\n"
                "- 要求B：缺失...（建议补充...）\n"
                "### 2.3 未满足\n"
                "- 要求C：差距...\n"
                "### 2.4 不确定/模糊\n"
                "- 要求D：原因...（需核实...）\n\n"
                "## 三、评分与结论（总分：X/100）\n"
                "- 适配度：x/40；合规与信用：x/20；材料完备度：x/20；流程与时间：x/10；风险与不确定性：x/10\n"
                "- 结论：...\n"
                "- 优先行动建议：1) ... 2) ... 3) ...\n\n"
                "## 四、进一步核实问卷\n"
                "1) [高优先级] 近两年纳税证明与社保缴纳清单\n"
                "   - 证据：税务/社保官方出具文件或截图\n"
                "   - 说明：核验硬性门槛与在地贡献\n"
                "   - 格式：PDF/官方链接\n"
                "2) ...（按此模板列 8-15 条）\n\n"

                "约束：不得编造；如无来源请标注“无明确来源”；仅提炼必要细节；可引用先前‘检索命中’与‘总结’的来源编号。"
            )

            # 直接从 QBState 获取 ToolMessage

            msgs = state.get("messages") or []
            print("msgs[-1]",msgs[-1])
            print("msgs[-2]",msgs[-2])
            print("msgs[-3]",msgs[-3])
            texts = [msg.content for msg in msgs[-1]]+[msgs[-2], msgs[-3]]
            print("texts",len(texts))
            blob = "\n\n".join(texts) if texts else "（无工具输出）"
            print("blob",blob)


            agent = create_agent(
                model=self.llm,
                tools=[],  # 或不传，视你的封装实现而定
                system_prompt=prompt,
            )
            res_msg = agent.invoke(HumanMessage(content=blob))
            logging.info(f"[analysis_node] 分析完成")
            print("res_msg",res_msg)
            return {"messages": [res_msg]}



        def router_func(state: QBState):
            if state["type"] == "库中检索":
                return "db_search"
            elif state["type"] == "网络检索":
                return "web_search"
            else:
                return "person_info_web_search"
            
        graph = StateGraph(QBState)      
        graph.add_node("db_search", db_search_node)
        graph.add_node("web_search", web_search_node)
        graph.add_node("summery", summery_node)
        graph.add_node("judger", Judger_node)
        graph.add_node("person_info_web_search", person_info_web_search_node)
        graph.add_node("analysis", analysis_node)



        graph.add_edge(START, "db_search")
        graph.add_edge("db_search", "summery")
        graph.add_edge("summery", "judger")
        graph.add_edge("web_search", "summery")
        graph.add_edge("summery", "judger")
        graph.add_conditional_edges("judger", router_func, ["db_search","web_search","person_info_web_search"])
        graph.add_edge("person_info_web_search", "analysis")
        graph.add_edge("analysis", END)
    
        return graph

    def run(self, request: Dict[str, Any], phase: Optional[str] = None) -> Dict[str, Any]:
        initial: QBState = {
            "request": request,
            "workspace_id": (request.get("workspace_id") or "global"),
            "company_name": request.get("company_name"),
            "target_projects": request.get("target_projects", []),
            "known_info": request.get("known_info", {}),
            "global_db_out": "",
            "type": "",
            "messages": [],
            "retry_count": int(request.get("retry_count", 0)),
            "max_retries": int(request.get("max_retries", 2)),
        }
        
        result: QBState = self.compiled_graph.invoke(
            initial,
            config={"configurable": {"thread_id": str(uuid.uuid4())}, "recursion_limit": 100}
        )

        # 相位裁剪已移除，直接返回
        return result
    

In [16]:
workflow = QuestionnaireBuilderWorkflow(
                    workspace_retriever=retriever_workspace,
                    global_retriever=retriever_global,
                    web_search_service=web_search,
                    llm=None
                )
request_context = {
                    "workspace_id": "global",
                    "company_name": "紫荆思源有限公司",
                    "target_projects": ["前海十二条"],
                    "known_info": {},
                }
result = await workflow.run(request_context)
print(result)

INFO:root:[search_web工具] 工具创建成功，名称: search_web, func存在: True
INFO:root:db_search_node
INFO:root:[db_search_tool] 调用开始检索


INFO:httpx:HTTP Request: POST https://api.qingyuntop.top/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:[db_search_tool] 开始全局检索: query=前海十二条 申报条件
INFO:llama_index.core.indices.loading:Loading all indices.
INFO:app.services.llamaindex_retriever:✅ 加载索引: global


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

INFO:root:[db_search_tool] 全局检索返回 6 条结果
INFO:httpx:HTTP Request: POST https://api.qingyuntop.top/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:[db_search_tool] 开始全局检索: query=前海十二条 申报条件 资格 材料 流程


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

INFO:root:[db_search_tool] 全局检索返回 6 条结果
INFO:httpx:HTTP Request: POST https://api.qingyuntop.top/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:[db_search_tool] 开始全局检索: query=前海十二条 申报条件 例外情形


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

INFO:root:[db_search_tool] 全局检索返回 6 条结果
INFO:httpx:HTTP Request: POST https://api.qingyuntop.top/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:[db_search_tool] 模型检索完成
INFO:root:[summery_node] 开始总结
INFO:httpx:HTTP Request: POST https://api.qingyuntop.top/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:[summery_node] 总结完成
INFO:root:[Judger_node] 开始判断
INFO:root:retry_count: 0, max_retries: 2
INFO:root:总结结果### 申报条件要求和必要材料总结

#### 一、资格要求
1. **申请单位资格**：
   - 申请单位必须在前海合作区内注册并实际经营。
   - 需具备合法的营业执照及相关经营资质。

2. **人员要求**：
   - 申请单位需在前海合作区内有满足生产经营需要的从业人员，并需提供相关社保缴纳证明。

3. **场地要求**：
   - 申请单位需在前海合作区内有固定的生产经营场所，面积需具体填写。

#### 二、申报门槛
- 申请单位必须遵守《十二条措施》及申报指南中关于“实际经营”的相关规定。
- 所有提交的材料必须真实、准确和完整，且需接受审核。

#### 三、必需材料
1. **基本材料**：
   - 营业执照复印件。
   - 申请单位法定代表人身份证明。
   - 实际经营承诺书（需签字盖章）。

2. **经营场所证明**：
   - 固定生产经营场所的租赁合同或产权证明。

3. **人员社保证明**：
   - 在前海合作区内缴纳基本养老保险等社保的证明材料。

4. **其他辅助材料**：
   - 相关的合同协议、经营计划书等。

#### 四、办理流程
1. **登录申报平台**：
   - 访问前海企业服务一体化服务平台（https://qhsk.sz.gov.cn），进行法人或账号密码登录。

2. **填写申请信息**

TypeError: No synchronous function provided to "web_search".
Either initialize with a synchronous function or invoke via the async API (ainvoke, astream, etc.)