# 阶段2：宏观经济SubAgent原型（动态工具架构）

**目标**：实现宏观经济分析SubAgent，验证动态工具选择架构

**架构**：LangChain Agent + Middleware（动态工具过滤）

**核心设计**：
- 工具描述向量索引：将AKShare宏观接口描述入库
- 动态工具检索：根据用户查询语义检索最相关工具
- Middleware过滤：Agent仅使用检索到的工具

**优势**：
- AKShare有100+宏观接口，预制工具无法覆盖所有场景
- 动态选择避免工具过多导致模型上下文过载


## Step 1：环境初始化

加载配置、初始化Embedding模型


In [None]:
# 环境初始化
import os
import logging
from pathlib import Path
from dotenv import load_dotenv

# 加载环境变量
load_dotenv("../config/.env")

# 配置日志
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)

# 导入项目模块
from analyst_chain.knowledge.constants import (
    Domain,
    STRUCTURED_JSON_DIR,
    VECTOR_DB_DIR,
    EMBEDDING_MODEL,
    PROCESSED_DATA_DIR
)
from analyst_chain.utils.embeddings_utils import get_embeddings

# 工具索引存储路径
TOOL_INDEX_DIR = PROCESSED_DATA_DIR / "knowledge" / "tool_index"

print(f"[环境] EMBEDDING_MODEL: {EMBEDDING_MODEL}")
print(f"[环境] TOOL_INDEX_DIR: {TOOL_INDEX_DIR}")



## Step 2：扫描AKShare宏观接口

使用Python反射机制扫描所有`macro_china_*`接口，提取函数名、描述、参数


In [None]:
# 扫描AKShare宏观接口（Python反射）
import akshare as ak
import inspect
from typing import List, Dict, Any

def scan_akshare_macro_tools() -> List[Dict[str, Any]]:
    """扫描AKShare所有macro_china_开头的函数

    Returns:
        工具描述列表，每个包含：
        - name: 函数名
        - description: docstring第一行
        - full_docstring: 完整docstring
        - parameters: 参数信息
    """
    tools = []
    for name in dir(ak):
        if not name.startswith("macro_china_"):
            continue
        func = getattr(ak, name)
        if not callable(func):
            continue

        # 提取docstring
        docstring = inspect.getdoc(func) or "暂无描述"
        short_desc = docstring.split('\n')[0]

        # 提取函数签名（参数）
        try:
            sig = inspect.signature(func)
            params = {}
            for param_name, param in sig.parameters.items():
                param_info = {"type": "string"}
                if param.default != inspect.Parameter.empty:
                    param_info["default"] = str(param.default)
                params[param_name] = param_info
        except (ValueError, TypeError):
            params = {}

        tools.append({
            "name": name,
            "description": short_desc,
            "full_docstring": docstring,
            "parameters": params
        })

    return tools

# 执行扫描
AKSHARE_MACRO_TOOLS = scan_akshare_macro_tools()
print(f"[扫描完成] 发现 {len(AKSHARE_MACRO_TOOLS)} 个宏观经济接口")

# 显示前5个接口
print("\n前5个接口示例：")
for tool in AKSHARE_MACRO_TOOLS[:5]:
    params_str = ", ".join(tool["parameters"].keys()) if tool["parameters"] else "无参数"
    print(f"  - {tool['name']}: {tool['description'][:50]}... ({params_str})")

## Step 3：构建工具描述向量索引

将工具描述转为Document，构建Chroma向量库


In [None]:
# 构建工具描述向量索引
from langchain_core.documents import Document
from langchain_chroma import Chroma

def build_tool_index(tools: List[Dict], persist_dir: Path) -> Chroma:
    """构建工具描述向量库

    Args:
        tools: 工具描述列表
        persist_dir: 向量库存储路径

    Returns:
        Chroma向量库实例
    """
    # 创建存储目录
    persist_dir.mkdir(parents=True, exist_ok=True)

    # 构建文档
    documents = []
    for tool in tools:
        # 内容：描述 + 完整docstring（提供更多语义信息）
        content = f"{tool['description']}\n{tool['full_docstring']}"
        doc = Document(
            page_content=content,
            metadata={
                "name": tool["name"],
                "description": tool["description"],
                "has_params": len(tool["parameters"]) > 0
            }
        )
        documents.append(doc)

    # 初始化Embedding
    embeddings = get_embeddings(model_name=EMBEDDING_MODEL)

    # 构建向量库
    vectorstore = Chroma.from_documents(
        documents=documents,
        embedding=embeddings,
        collection_name="akshare_macro_tools",
        persist_directory=str(persist_dir)
    )

    return vectorstore

# 构建向量索引
print("[构建] 正在构建工具描述向量索引...")
tool_vectorstore = build_tool_index(AKSHARE_MACRO_TOOLS, TOOL_INDEX_DIR)
print(f"[完成] 向量索引已构建，包含 {len(AKSHARE_MACRO_TOOLS)} 个工具描述")

## Step 4：实现工具检索器

根据用户查询检索最相关的AKShare接口


In [None]:
# 实现工具检索器
class ToolRetriever:
    """工具检索器：根据查询检索最相关的AKShare接口"""

    def __init__(self, vectorstore: Chroma):
        """初始化检索器

        Args:
            vectorstore: 工具描述向量库
        """
        self.vectorstore = vectorstore

    def retrieve(self, query: str, k: int = 3) -> List[Dict[str, Any]]:
        """检索最相关的工具

        Args:
            query: 用户查询
            k: 返回工具数量

        Returns:
            工具列表，每个包含name, description, score
        """
        results = self.vectorstore.similarity_search_with_score(query, k=k)
        return [
            {
                "name": doc.metadata["name"],
                "description": doc.metadata["description"],
                "score": score
            }
            for doc, score in results
        ]

    def get_tool_names(self, query: str, k: int = 3) -> List[str]:
        """仅返回工具名称列表"""
        tools = self.retrieve(query, k)
        return [t["name"] for t in tools]

# 初始化检索器
tool_retriever = ToolRetriever(tool_vectorstore)

# 测试检索功能
test_queries = [
    "社会融资规模增量是多少？",
    "最近GDP增长率如何？",
    "当前通胀水平怎么样？",
    "LPR利率有什么变化？"
]

print("=" * 80)
print("[测试] 工具检索准确性")
print("=" * 80)
for query in test_queries:
    results = tool_retriever.retrieve(query, k=2)
    print(f"\n查询：{query}")
    for r in results:
        print(f"  - {r['name']}: {r['description'][:40]}... (score: {r['score']:.3f})")

## Step 5：定义动态AKShare工具

将AKShare接口封装为LangChain工具，支持动态调用


In [None]:
# 定义动态AKShare工具
from langchain_core.tools import tool
import pandas as pd

def create_akshare_tool(func_name: str, description: str):
    """为AKShare函数创建LangChain工具

    Args:
        func_name: AKShare函数名
        description: 工具描述

    Returns:
        LangChain工具函数
    """
    @tool(name=func_name, description=description)
    def akshare_tool(**kwargs) -> str:
        """动态调用AKShare接口"""
        func = getattr(ak, func_name, None)
        if not func:
            return f"未知接口：{func_name}"
        try:
            result = func(**kwargs)
            if isinstance(result, pd.DataFrame):
                return result.tail(20).to_string()
            return str(result)
        except Exception as e:
            return f"调用失败：{e}"

    return akshare_tool

# 为所有宏观接口创建工具
ALL_MACRO_TOOLS = {}
for tool_info in AKSHARE_MACRO_TOOLS:
    tool_func = create_akshare_tool(tool_info["name"], tool_info["description"])
    ALL_MACRO_TOOLS[tool_info["name"]] = tool_func

print(f"[完成] 已创建 {len(ALL_MACRO_TOOLS)} 个AKShare工具")

# 添加知识库检索工具
from analyst_chain.tools.knowledge_retriever import KnowledgeRetriever

knowledge_retriever = KnowledgeRetriever(
    domain=Domain.MACRO_ECONOMY,
    structured_json_dir=STRUCTURED_JSON_DIR,
    vector_db_dir=VECTOR_DB_DIR,
    embedding_model=EMBEDDING_MODEL
)

@tool(name="search_knowledge", description="检索宏观经济知识库，获取理论知识、分析框架和历史案例")
def search_knowledge(query: str) -> str:
    """检索宏观经济知识库"""
    return knowledge_retriever.vector_search(query, k=3)

print("[完成] 知识检索工具已创建")

## Step 6：创建Agent + 动态工具选择

使用LangChain ReAct Agent + 自定义工具选择逻辑


In [None]:
# 创建Agent + 动态工具选择
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent

# 初始化LLM
llm = ChatOpenAI(
    model="deepseek-chat",
    api_key=os.getenv("DEEPSEEK_API_KEY"),
    base_url=os.getenv("DEEPSEEK_BASE_URL"),
    streaming=True,
    temperature=0.7,
)

# 系统提示词
SYSTEM_PROMPT = """你是宏观经济分析专家，擅长：
- 分析GDP、CPI、PMI等宏观经济指标
- 判断经济周期阶段（复苏/过热/滞胀/衰退）
- 结合经济理论解读数据
- 提供投资建议

工作流程：
1. 理解用户问题
2. 调用合适的工具获取数据
3. 结合知识库进行分析
4. 给出专业、清晰的回答

输出要求：
- 数据准确，引用来源
- 分析专业，运用理论
- 逻辑清晰，结构完整
- 结论明确，便于理解"""

def get_dynamic_tools(query: str, k: int = 3) -> list:
    """根据查询动态选择工具

    Args:
        query: 用户查询
        k: 选择的工具数量

    Returns:
        工具列表（检索到的AKShare工具 + 知识检索工具）
    """
    # 检索最相关的AKShare工具
    relevant_tool_names = tool_retriever.get_tool_names(query, k=k)
    relevant_tools = [ALL_MACRO_TOOLS[name] for name in relevant_tool_names if name in ALL_MACRO_TOOLS]

    # 始终包含知识检索工具
    relevant_tools.append(search_knowledge)

    return relevant_tools

def create_dynamic_agent(query: str):
    """创建带动态工具的Agent

    Args:
        query: 用户查询

    Returns:
        LangGraph ReAct Agent
    """
    tools = get_dynamic_tools(query)
    agent = create_react_agent(
        model=llm,
        tools=tools,
        prompt=SYSTEM_PROMPT
    )
    return agent

print("[完成] Agent创建函数已定义")
print(f"[说明] 每次查询会动态选择最相关的工具（避免工具过多导致上下文过载）")

## Step 7：单问题测试

测试动态工具选择 + Agent执行效果


In [None]:
# 单问题测试
import time
from langchain_core.messages import AIMessageChunk, ToolMessage, HumanMessage

def run_query(query: str) -> str:
    """运行单个查询，显示动态工具选择和执行过程

    Args:
        query: 用户查询

    Returns:
        Agent回答
    """
    print("=" * 80)
    print(f"[查询] {query}")
    print("-" * 80)

    # 显示动态选择的工具
    selected_tools = get_dynamic_tools(query)
    tool_names = [t.name for t in selected_tools]
    print(f"[动态工具选择] 选中 {len(tool_names)} 个工具：{', '.join(tool_names)}")
    print("-" * 80)

    # 创建Agent并执行
    start_time = time.time()
    agent = create_dynamic_agent(query)

    response_text = ""
    tool_call_count = 0

    for chunk in agent.stream(
        {"messages": [HumanMessage(content=query)]},
        stream_mode="values"
    ):
        # 获取最后一条消息
        if "messages" in chunk:
            last_msg = chunk["messages"][-1]
            if hasattr(last_msg, 'content') and last_msg.content:
                response_text = last_msg.content

    elapsed = time.time() - start_time
    print(f"\n[回答]\n{response_text}")
    print("-" * 80)
    print(f"[耗时] {elapsed:.2f}s")
    print("=" * 80)

    return response_text

# 测试：社融数据
test_query = "最近的社会融资规模增量是多少？"
run_query(test_query)

## Step 8：批量测试（10个问题）

按难度递增测试：基础查询 → 周期判断 → 投资策略 → 综合分析


In [None]:
# 10个测试场景（按难度递增）
test_questions = [
    # 基础数据查询（简单）
    "2024年GDP增长率是多少？",
    "当前的通胀水平如何？",
    "最新的PMI数据是多少？",

    # 周期判断（中等）
    "当前经济处于什么周期？",
    "经济周期转折的信号是什么？",
    "什么指标变化会预示周期转折？",

    # 投资策略（困难）
    "根据当前经济周期，应该配置什么资产？",
    "投资时钟当前处于哪个阶段？",

    # 综合分析（最难）
    "给出当前宏观经济的整体判断",
    "从宏观角度看，周期性行业投资机会如何？"
]

print(f"[测试] 共{len(test_questions)}个测试问题，按难度递增")
for i, q in enumerate(test_questions, 1):
    difficulty = "简单" if i <= 3 else "中等" if i <= 6 else "困难" if i <= 8 else "最难"
    print(f"{i:2d}. [{difficulty}] {q}")


In [None]:
# 批量测试（取消注释运行）
import json
from datetime import datetime

# 运行批量测试
"""
results = []

for i, question in enumerate(test_questions, 1):
    print(f"\n{'='*80}")
    print(f"[测试 {i}/{len(test_questions)}] {question}")
    print('='*80)

    start_time = time.time()

    try:
        # 获取动态工具
        selected_tools = get_dynamic_tools(question)
        tool_names = [t.name for t in selected_tools]
        print(f"[工具] {', '.join(tool_names)}")

        # 创建Agent并执行
        agent = create_dynamic_agent(question)
        result = agent.invoke({"messages": [HumanMessage(content=question)]})
        response_text = result["messages"][-1].content
        elapsed = time.time() - start_time

        results.append({
            "question_id": i,
            "question": question,
            "response": response_text,
            "tools_used": tool_names,
            "time_taken": round(elapsed, 2),
            "status": "success",
            "score": None  # 待手动评分
        })

        print(f"[回答] {response_text[:200]}...")
        print(f"[耗时] {elapsed:.2f}s")

    except Exception as e:
        elapsed = time.time() - start_time
        results.append({
            "question_id": i,
            "question": question,
            "error": str(e),
            "time_taken": round(elapsed, 2),
            "status": "failed"
        })
        print(f"[错误] {e}")

# 保存结果
output_path = Path("../data/outputs/stage2_test_results.json")
output_path.parent.mkdir(parents=True, exist_ok=True)

test_report = {
    "test_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    "total_questions": len(test_questions),
    "success_count": sum(1 for r in results if r["status"] == "success"),
    "avg_time": sum(r["time_taken"] for r in results) / len(results),
    "results": results
}

with open(output_path, "w", encoding="utf-8") as f:
    json.dump(test_report, f, ensure_ascii=False, indent=2)

print(f"\n{'='*80}")
print(f"[完成] 测试报告已保存到: {output_path}")
print(f"[统计] 成功: {test_report['success_count']}/{test_report['total_questions']} | 平均耗时: {test_report['avg_time']:.2f}s")
"""

print("[说明] 取消上方三引号注释以运行批量测试")


## 评分标准

**数据准确性**（30分）：
- 引用正确来源（"根据AKShare最新数据..."）
- 数据时间明确（"2024年11月..."）
- 数值准确无误

**分析专业性**（40分）：
- 运用理论框架（"根据经济周期理论..."）
- 分析逻辑清晰（数据→趋势→原因→影响）
- 结合知识库内容

**结论清晰度**（30分）：
- 给出明确判断（"当前处于XX周期"）
- 提出可行建议（"建议配置XX资产"）
- 易于理解（非专业人士能看懂）

**完成标准**：平均评分>=80 + 平均响应<30秒
