-
Notifications
You must be signed in to change notification settings - Fork 163
11_01_Agent_Runtime
核心实现:cli/runtime.py::AgentRuntime。
它把一次用户请求拆成一个 provider-agnostic(不绑定具体模型供应商)的事件循环。这里保留两张图:
- 简版图:用于快速理解 Agent Runtime(智能体运行时)的主循环,适合作为本节的入口视图。
- 详版图:用于展开工程细节,说明记忆、压缩、工具审批、后台任务、大结果落盘、错误回灌和循环保护。
flowchart TD
U["用户消息"] --> M["记忆召回 / 系统提示"]
M --> C["上下文检查与压缩"]
C --> L["provider.chat_stream"]
L --> K["消费 chunk 流式片段"]
K -->|text_delta| T["流式渲染"]
K -->|thinking_delta| R["推理内容记录"]
K -->|tool_calls| A["追加 assistant tool message"]
A --> B["按工具并发安全性分批"]
B --> E["执行工具"]
E --> O["tool observation 回灌 messages"]
O --> C
K -->|无工具调用| G["Loop Guard 循环守卫检查必需工具"]
G -->|漏调| P["注入 retry user message"]
P --> C
G -->|完成| D["done event / 保存日志"]
简版图只表达一件事:Agent 不是一次模型问答,而是“模型推理 → 工具执行 → 观察结果回灌 → 再次推理”的闭环。先看这张图可以快速建立整体理解,再继续阅读后面的工程细节。
详版图沿着三条线展开:
- 模型前处理:识别必需工具、召回记忆、必要时压缩上下文。
- 工具执行治理:最大轮数、并发/串行分批、高风险审批、后台任务、死循环检测。
- 模型后处理:工具结果或错误回灌、大结果落盘、漏调必需工具时纠偏、最终保存日志。
flowchart TD
U["用户消息"] --> X["resolve_turn_expectation<br/>识别本轮必需工具"]
X --> M["build_memory_context<br/>召回相关记忆"]
M --> I["prepend_memory_context<br/>临时注入 relevant-memories"]
I --> C{"估算 token(令牌)是否超过<br/>压缩阈值"}
C -->|是| C0["flush_memory_before_compaction<br/>压缩前提取持久偏好"]
C0 --> C1["复制 head 并运行<br/>shrink_stale_tool_results"]
C1 --> C2["compact_messages<br/>生成 500 字以内摘要"]
C -->|否| L
C2 --> L["provider.chat_stream<br/>模型流式调用"]
L --> TO["_iter_with_timeout<br/>60 秒无 chunk 则超时"]
TO --> K["消费 chunk<br/>流式片段"]
K -->|text_delta| T["RuntimeEvent:text_delta<br/>TUI 流式渲染"]
K -->|thinking_delta| R["RuntimeEvent:thinking_delta<br/>记录推理内容"]
K -->|usage| U1["RuntimeEvent:usage<br/>累计 token 用量"]
K -->|tool_calls| A["追加 assistant tool message<br/>保留 tool_call_id"]
A --> MR{"超过 MAX_TOOL_ROUNDS=15"}
MR -->|是| ML["done:工具调用轮次超限"]
MR -->|否| B["partition_tool_calls<br/>按并发安全性分批"]
B --> BC{"concurrency_safe"}
BC -->|是| P1["ThreadPoolExecutor<br/>最多 5 线程并发"]
BC -->|否| P2["串行执行<br/>避免写操作互相踩踏"]
P1 --> DL
P2 --> DL
DL{"doom-loop 检测<br/>最近 6 次内重复 3 次"}
DL -->|触发| AB["追加 tool_error<br/>中止本轮工具批次"]
DL -->|未触发| AP{"requires_approval<br/>是否高风险工具"}
AP -->|是| CF["确认弹窗<br/>允许 / 总是允许 / 修改 / 拒绝"]
AP -->|否| BG
CF --> BG{"background<br/>是否后台任务"}
BG -->|是| BT["BackgroundTaskManager<br/>返回 task_id"]
BG -->|否| EX["ToolRegistry.execute<br/>执行确定性 Python 函数"]
EX --> ER{"工具是否报错"}
ER -->|是| TE["tool_error<br/>错误回灌给模型"]
ER -->|否| TR["tool_result<br/>结果回灌给模型"]
BT --> TR
TE --> FMT
TR --> FMT["format_tool_result_for_context<br/>结果超过 8,000 字符落盘"]
FMT --> O["追加 tool observation<br/>写回 messages"]
AB --> O
O --> C
K -->|无工具调用| G{"Loop Guard<br/>必需工具是否漏调"}
G -->|漏调且重试小于 2| RT["注入 retry user message<br/>要求直接调用必需工具"]
RT --> C
G -->|漏调且重试耗尽| W["追加不可靠警告"]
W --> D
G -->|未漏调| D["done event<br/>保存 chat log / scratchpad"]
ML --> D
详版图不是要求一次性读完,而是作为工程细节索引。需要理解大结果处理时,可以看 format_tool_result_for_context 这条线;需要理解模型漏调工具时,可以看 Loop Guard 这条线;需要理解重复工具调用保护时,可以看 doom-loop 这条线。
run_stream() 只产出统一的 RuntimeEvent(运行时事件)字典。TUI(终端图形界面)、headless wrapper(无界面兼容封装)、sub-agent(子智能体)和测试都消费这些事件,而不是各自重新实现模型调用和工具循环。
两张图共同强调两个设计重点:
- 模型前后都由确定性代码兜底:进入模型前先做记忆召回和必要的上下文压缩;模型返回后再做工具分批、审批、后台任务、大结果落盘和错误回灌。工具 observation(观察结果)写入后尽量保持稳定,避免破坏 prompt cache(提示词缓存)。
- 结束条件不是“模型说完了”:无工具调用时还要经过 Loop Guard(循环守卫)检查;漏调必需工具会注入 retry user message(纠偏用户消息),最多 2 次,仍失败才带警告结束。
主要事件:
| 事件 | 来源 | 用途 |
|---|---|---|
text_delta |
Provider(模型适配层)流式文本 | TUI(终端图形界面)逐行渲染 |
thinking_delta / thinking
|
reasoning chunk(推理片段)/ round(轮次)结束 | 推理展示和 scratchpad(运行追踪文件)记录 |
tool_calls |
模型请求调用工具 | 清除临时流式文字,进入工具执行 |
tool_start |
工具开始 | 展示工具名、参数、spinner(加载动画) |
tool_result / tool_error
|
工具结束 | 回灌 observation,更新执行摘要 |
model_start |
多轮工具调用的新一轮模型推理 | 恢复“思考中”状态 |
compaction |
上下文被压缩 | 提示消息条数变化 |
retry |
Loop Guard(循环守卫)触发 | 强制模型补调用必需工具 |
usage |
Provider(模型适配层)token(模型令牌)统计 | 状态栏计费/用量 |
done |
最终回答 | 保存 chat log(聊天日志),结束本轮 |
Provider(模型适配层)接口定义在 cli/providers/base.py:
class LLMProvider:
def chat(self, messages, tools, system_prompt="") -> dict: ...
def chat_stream(self, messages, tools, system_prompt="") -> Generator[dict, None, None]: ...
@property
def name(self) -> str: ...Provider(模型适配层)的职责是把不同模型 SDK(软件开发工具包)的响应转换成统一 chunk(流式片段):
-
text_delta:普通文本。 -
thinking_delta:DeepSeek 等模型返回的 reasoning(推理内容)。 -
tool_calls:标准化为{"id", "name", "args"}。 -
usage:输入/输出 token(模型令牌),以及可用时的 cache read/write token(缓存读写令牌)。
为了降低 API 的输入(Input)开销,缩短首字响应延迟(TTFT),Claude Provider (cli/providers/claude.py) 实现了高度精准的滑动 Prompt 缓存(Prompt Caching)机制。
-
静态基底缓存 (Static System Prompt Caching):系统提示词作为前置静态上下文,被包装为 Block 格式并标记
"cache_control": {"type": "ephemeral"}。由于系统提示词在整个会话中保持不变,它构成了缓存的稳定起点。 - **滑动边界推进 (Sliding Boundary Promotion) :在每次向 Claude 发起 API 请求时,系统自动识别当前消息序列的 ** 最后一条消息 ,并在其最后一个 Content Block 上附加
"cache_control": {"type": "ephemeral"}。- 随着对话轮次的不断累加,这个缓存控制标记会随最后一轮消息 向后滑动推进 。
- 这种设计使 LLM 服务端能够缓存当前轮次之前 100% 的历史会话内容。后续对话发起时,只需计算并写入最新一轮增量消息的 Token,前序的几万 Token 均可通过缓存读取,最高可减免 90% 的输入 Token 计费并大幅降低时延。
在底层消息格式构建函数 _build_messages() 中,系统根据消息角色 (role) 分别实现了细粒度的缓存注入:
-
用户消息 (User):将文本内容包装为 Block 列表,在
text块上注入缓存控制标记:{ "role": "user", "content": [{"type": "text", "text": msg["content"], "cache_control": {"type": "ephemeral"}}] } -
助手消息 (Assistant):合并
content中的文本块与tool_use块,在最后一个内容 Block 上追加缓存标记:content[-1]["cache_control"] = {"type": "ephemeral"}
-
工具返回结果 (Tool):在转换后的
tool_result内容 Block 上附加缓存标记:content_block = { "type": "tool_result", "tool_use_id": msg.get("tool_call_id", ""), "content": result, } content_block["cache_control"] = {"type": "ephemeral"}
在流式响应解析 chat_stream() 中,系统通过监听 Claude SDK 的 message_start 事件,从其 usage 元数据字段中精准提取缓存相关的计数指标:
-
cache_read_input_tokens(缓存读取命中数):代表成功从服务端缓存中直接复用的 Token 数量(费率极低)。 -
cache_creation_input_tokens(缓存创建写入数):代表本轮新增写入缓存的 Token 数量(费率等同于普通输入 Token)。
这些指标被统一包装在返回的 usage 信息中并同步更新至 TUI 状态栏,为用户提供直观的成本控制和延迟优化统计反馈。
当前 CLI(命令行)主要 provider(模型适配层):
| Provider(模型适配层) | 文件 | 说明 |
|---|---|---|
| Claude | cli/providers/claude.py |
Claude tool_use / tool_result 双向转换及滑动 Prompt 缓存 |
| OpenAI-compatible(OpenAI 兼容接口) | cli/providers/openai.py |
OpenAI、DeepSeek、Qwen、Kimi 等兼容端点 |
| Gemini | cli/providers/gemini.py |
Gemini function calling 适配 |
| Fallback(故障切换) | cli/providers/fallback.py |
限流、超时、服务端错误时切换备用 provider(模型适配层) |
OpenAI-compatible provider(OpenAI 兼容模型适配层)有三类兼容处理:
- 首次调用携带
stream_options、tool_choice、frequency_penalty。 - 第三方端点不支持时逐步去掉不兼容参数。
- 对把工具调用输出成
<tool_call>...</tool_call>文本的模型做兜底解析。
流式读取由 _iter_with_timeout() 包装:生产者 daemon thread(守护线程)负责读 stream(流式响应),主线程从 queue(队列)取 chunk(流式片段);60 秒无新 chunk 就抛出超时,避免 TCP 半开连接把 TUI 卡死。
模型元数据由 cli/model_registry.py 和 cli/model_metadata.py 维护,覆盖 context window(上下文窗口)、reasoning(推理能力)和成本显示:
-
wyckoff model cost <id> --context-window N可以显式保存模型上下文窗口。 - 未显式配置时,
infer_context_window()会按模型名推断常见窗口;未知模型默认按 64K token(令牌)处理。 -
FallbackProvider(故障切换模型适配层)按默认模型和 fallback(备用模型)顺序尝试,只在限流、超时、网络错误和服务端错误等可恢复异常上切换;配置错误不会被静默吞掉。
-
缓存控制 Block 注入:
在向 LLM 终点(如 Claude)发起请求前,系统会对会话历史进行扫描。缓存标记
cache_control: {"type": "ephemeral"}不会加在所有消息上,而是根据 Anthropic 的收费与机制规则,仅附加在以下 3 个关键节点的最后一个内容 Block上:- 静态 System Prompt 底部。
- 用户消息 (User Message) 的文本 Block。
- 助手消息 (Assistant Message) 的最后一个 Block (如有
tool_use),或工具执行结果 (Tool Message) 的tool_result块上。
-
增量匹配与滑动效率:
由于大语言模型服务端仅对前缀匹配成功的 Token 进行缓存复用,且最小写入增量通常是 2048/4096 Tokens。当会话消息以
[User -> Assistant -> Tool -> Assistant -> Tool -> User]的顺序增加时,滑动标记使得最近一轮之前的 100% 历史都处于缓存激活状态。系统通过维持这一稳定前缀,为大容量会话减少了近 90% 的首字时延(TTFT)与 Token 成本。
返回 系列索引
- Home
- 01_Product_Overview
- 02_Finance_Wyckoff_Method
- 03_Finance_Quantitative_Metrics
- 04_Finance_Sector_Rotation_Regime
- 05_Finance_Risk_Management
- 06_Backtest_Methodology
- 07_Backtest_Simple_vs_Compound
- 08_Research_Strategy_Decay
- 09_Tech_Architecture
- 10_Tech_LLM_RAG_Integration
- 12_Tech_Actions_Operations
- 13_Tech_Python_Engineering