From 0822ad29f212f9856c6339b44d4fdc32a4fd0f0d Mon Sep 17 00:00:00 2001 From: braisedpork1964 <497494458@qq.com> Date: Tue, 28 Apr 2026 05:11:27 +0000 Subject: [PATCH 1/3] remove `AsyncPolicyAgent` --- lagent/agents/__init__.py | 7 +- lagent/agents/internclaw_agent.py | 27 +-- lagent/apps/main.py | 57 +++--- lagent/schema.py | 78 ++------ project_template/agents/config.py | 13 +- tests/data/agents/e2e-agent/config.py | 9 +- tests/test_agent_service_e2e.py | 188 +++++++++--------- tests/test_agents/test_compact_integration.py | 68 ++++--- tests/test_agents/test_framework_agents.py | 9 +- tests/test_agents/test_internclaw_e2e.py | 145 ++++++++------ tests/test_services/test_agent_loader.py | 11 +- workspace/agents/default_agent/config.py | 4 +- 12 files changed, 310 insertions(+), 306 deletions(-) diff --git a/lagent/agents/__init__.py b/lagent/agents/__init__.py index 1a200725..9a74ab50 100644 --- a/lagent/agents/__init__.py +++ b/lagent/agents/__init__.py @@ -11,11 +11,7 @@ StreamingSequential, ) from .compact_agent import AsyncCompactAgent, estimate_token_count -from .internclaw_agent import ( - AsyncEnvAgent, - AsyncPolicyAgent, - InternClawAgent, -) +from .internclaw_agent import AsyncEnvAgent, InternClawAgent __all__ = [ 'Agent', @@ -31,6 +27,5 @@ 'AsyncCompactAgent', 'estimate_token_count', 'AsyncEnvAgent', - 'AsyncPolicyAgent', 'InternClawAgent', ] diff --git a/lagent/agents/internclaw_agent.py b/lagent/agents/internclaw_agent.py index 0a9c4e20..1d00d710 100644 --- a/lagent/agents/internclaw_agent.py +++ b/lagent/agents/internclaw_agent.py @@ -56,23 +56,6 @@ def _convert_tool_schema(action_description: dict, name_pattern: str = '{}') -> return tools -class AsyncPolicyAgent(AsyncAgent): - - async def forward(self, *message, **kwargs): - formatted_messages, tools = self.aggregator.aggregate( - self.memory, self.name, self.output_format, self.template - ) - llm_response = await self.llm.chat(formatted_messages, tools=tools, **kwargs) - # message = AgentMessage( - # sender=self.name, - # content=llm_response.get('content') or '', - # tool_calls=llm_response.get('tool_calls') or [], - # reasoning_content=llm_response.get('reasoning_content'), - # ) - # return message - return llm_response - - class AsyncEnvAgent(AsyncAgent): def __init__(self, actions, skills: SkillsLoader = None, long_term_memory=None, **kwargs): super().__init__(**kwargs) @@ -168,13 +151,11 @@ async def _inner_func(tool_call): result_dict['tool_call_id'] = tc.get('id', '') if resp.valid != ActionValidCode.OPEN: result_dict['errmsg'] = ( - f'Tool Call Error: {resp.errmsg} in tool call ' - f'{json.dumps(tc, ensure_ascii=False)}' + f'Tool Call Error: {resp.errmsg} in tool call ' f'{json.dumps(tc, ensure_ascii=False)}' ) elif resp.state != ActionStatusCode.SUCCESS: result_dict['errmsg'] = ( - f'Tool Call Error: {resp.errmsg} in tool call ' - f'{json.dumps(tc, ensure_ascii=False)}' + f'Tool Call Error: {resp.errmsg} in tool call ' f'{json.dumps(tc, ensure_ascii=False)}' ) if resp.state == ActionStatusCode.ARGS_ERROR: reward = -1 @@ -352,7 +333,7 @@ async def main(): # ── 4. Policy agent ── aggregator = InternClawContextBuilder(workspace, tools=None) - policy = AsyncPolicyAgent( + policy = AsyncAgent( llm=model, aggregator=aggregator, hooks=[logger_hook], @@ -374,7 +355,7 @@ async def main(): ) # ── 7. Consolidate agent (standard InternClawAgent) ── - consolidate_policy = AsyncPolicyAgent( + consolidate_policy = AsyncAgent( name='consolidate_policy', llm=model, template=CONSOLIDATION_PROMPT, diff --git a/lagent/apps/main.py b/lagent/apps/main.py index 7e0031e6..5a7618f6 100644 --- a/lagent/apps/main.py +++ b/lagent/apps/main.py @@ -1,8 +1,8 @@ import asyncio import logging +import os import signal import sys -import os # 将 Lagent 项目根目录插入 SYS PATH,确保支持正确引用 sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) @@ -13,12 +13,9 @@ from lagent.services.cron import CronService from lagent.services.heartbeat import HeartbeatService - # 配置工程化日志格式(生产级别格式) logging.basicConfig( - level=logging.INFO, - format="%(asctime)s | %(levelname)-7s | %(name)-20s | %(message)s", - datefmt="%H:%M:%S" + level=logging.INFO, format="%(asctime)s | %(levelname)-7s | %(name)-20s | %(message)s", datefmt="%H:%M:%S" ) logger = logging.getLogger("interclaw.main") @@ -27,12 +24,15 @@ def main(): logger.info("Initializing Interclaw Agent OS...") from pathlib import Path - from lagent.agents.aggregator.context import InternClawContextBuilder - from lagent.llms.model import AsyncAPIClient, ModelConfig, SampleParameters - from lagent.actions.filesystem import ReadFileAction, WriteFileAction, EditFileAction + + from lagent.actions.filesystem import EditFileAction, ReadFileAction, WriteFileAction from lagent.actions.shell import ShellAction - from lagent.agents.internclaw_agent import InternClawAgent, AsyncPolicyAgent, AsyncEnvAgent, get_tool_prompt + from lagent.agents import AsyncAgent + from lagent.agents.aggregator.context import InternClawContextBuilder + from lagent.agents.internclaw_agent import AsyncEnvAgent, InternClawAgent, get_tool_prompt from lagent.hooks.logger import MessageLogger + from lagent.llms.model import AsyncAPIClient, ModelConfig, SampleParameters + model_name = "/mnt/shared-storage-user/puyudelivery/user/puyudilivery/ckpts/xtuner_saved_model/interns1_1_mini_official/interns1_1_mini_sft_based_cpt_bs512_epoch1_maxlr3e-5_minlr1e-6_max16k-hf/20260207101512/hf-4374" # model_name = "gpt-4o-2024-08-06" api_base = "http://10.102.218.28:23333/v1/" @@ -42,20 +42,22 @@ def main(): proxies = None model = AsyncAPIClient( - model=ModelConfig(model=model_name, base_url=api_base, api_key=api_key, proxy=proxies), - sample_params=SampleParameters(temperature=0.7, top_p=1.0, top_k=50), - timeout=600, - max_retry=5, - sleep_interval=5, - extra_body=extra_body, - ) + model=ModelConfig(model=model_name, base_url=api_base, api_key=api_key, proxy=proxies), + sample_params=SampleParameters(temperature=0.7, top_p=1.0, top_k=50), + timeout=600, + max_retry=5, + sleep_interval=5, + extra_body=extra_body, + ) workspace = "/mnt/shared-storage-user/llmit/user/liukuikun/workspace/lagent/workspace" - actions = [ReadFileAction(workspace=workspace), WriteFileAction(workspace=workspace), EditFileAction(workspace=workspace), ShellAction(working_dir=workspace)] + actions = [ + ReadFileAction(workspace=workspace), + WriteFileAction(workspace=workspace), + EditFileAction(workspace=workspace), + ShellAction(working_dir=workspace), + ] aggregator = InternClawContextBuilder(Path(workspace), tools=get_tool_prompt(actions)) - policy = AsyncPolicyAgent( - llm=model, - aggregator=aggregator, - hooks=[MessageLogger()]) + policy = AsyncAgent(llm=model, aggregator=aggregator, hooks=[MessageLogger()]) env = AsyncEnvAgent(actions=actions) agent = InternClawAgent(policy_agent=policy, env_agent=env) @@ -64,11 +66,13 @@ def main(): # 3. 动态注册外部通道 (External Channels) # app自动负责把总线 bus 依赖注入给这些组件 - app.register_channel(FeishuChannel, - app_id=os.getenv("FEISHU_APP_ID", "cli_a92538846ff99cd2"), - app_secret=os.getenv("FEISHU_APP_SECRET", "EpDAW3TCnqUpyUltr1Q5WfR27j0vX13F"), - encrypt_key=os.getenv("FEISHU_ENCRYPT_KEY", ""), - verification_token=os.getenv("FEISHU_VERIFICATION_TOKEN", "")) + app.register_channel( + FeishuChannel, + app_id=os.getenv("FEISHU_APP_ID", "cli_a92538846ff99cd2"), + app_secret=os.getenv("FEISHU_APP_SECRET", "EpDAW3TCnqUpyUltr1Q5WfR27j0vX13F"), + encrypt_key=os.getenv("FEISHU_ENCRYPT_KEY", ""), + verification_token=os.getenv("FEISHU_VERIFICATION_TOKEN", ""), + ) # 如果有飞书 credentials,可动态注册飞书 channel # app.register_channel(FeishuChannel, app_id="TODO_APP_ID", app_secret="TODO_APP_SECRET") @@ -80,5 +84,6 @@ def main(): # 5. 启动总控制器 app.run() + if __name__ == "__main__": main() diff --git a/lagent/schema.py b/lagent/schema.py index ed6fa6cb..552709ee 100644 --- a/lagent/schema.py +++ b/lagent/schema.py @@ -91,15 +91,15 @@ class AgentStatusCode(IntEnum): class AgentMessage(BaseModel): content: Any + sender: str = 'user' thinking: Optional[str] = None content_ids: Optional[List[int]] = Field(default=None, repr=False) content_logprobs: Optional[List[float]] = Field(default=None, repr=False) thinking_ids: Optional[List[int]] = Field(default=None, repr=False) thinking_logprobs: Optional[List[float]] = Field(default=None, repr=False) - raw_content: Optional[str] = None + raw_content: Optional[str] = Field(default=None, repr=False) raw_content_ids: Optional[List[int]] = Field(default=None, repr=False) raw_content_logprobs: Optional[List[float]] = Field(default=None, repr=False) - sender: str = 'user' tool_calls: Optional[List[dict]] = None tool_calls_ids: Optional[List[str]] = None formatted: Optional[Any] = None @@ -115,71 +115,37 @@ class AgentMessage(BaseModel): @classmethod def from_model_response(cls, response: Union[ChatCompletion, dict], sender: str) -> "AgentMessage": """Convert model response (ChatCompletion object or model_dump dict) to AgentMessage.""" - if isinstance(response, dict): - choice = response['choices'][0] - msg = choice['message'] - finish_reason = choice.get('finish_reason') - # tool_calls_raw = msg.get('tool_calls') # list of dicts or None - return cls( - sender=sender, - content=msg.get('content') or "", - thinking=msg.get('reasoning_content'), - raw_content=msg.get('raw_content'), - content_ids=msg.get('content_ids'), - content_logprobs=msg.get('content_logprobs'), - thinking_ids=msg.get('reasoning_content_ids'), - thinking_logprobs=msg.get('reasoning_content_logprobs'), - raw_content_ids=msg.get('raw_content_ids'), - raw_content_logprobs=msg.get('raw_content_logprobs'), - extra_info=msg.get('extra_info') or {}, - tool_calls=msg.get('tool_calls'), - # tool_calls=[tc['function'] for tc in tool_calls_raw] if tool_calls_raw else None, - # tool_calls_ids=[tc['id'] for tc in tool_calls_raw] if tool_calls_raw else None, - stream_state=( - ModelStatusCode.SESSION_OUT_OF_LIMIT if finish_reason == 'length' else ModelStatusCode.END - ), - finish_reason=finish_reason, - ) - # ChatCompletion object (or subclass) - chat_message = response.choices[0].message - tool_calls = chat_message.tool_calls and [tool_call.model_dump() for tool_call in chat_message.tool_calls] + if not isinstance(response, dict): + response = response.model_dump() + + choice = response['choices'][0] + msg = choice['message'] + finish_reason = choice.get('finish_reason') return cls( sender=sender, - content=chat_message.content or "", - thinking=getattr(chat_message, 'reasoning_content', None), - raw_content=getattr(chat_message, 'raw_content', None), - content_ids=getattr(chat_message, 'content_ids', None), - content_logprobs=getattr(chat_message, 'content_logprobs', None), - thinking_ids=getattr(chat_message, 'reasoning_content_ids', None), - thinking_logprobs=getattr(chat_message, 'reasoning_content_logprobs', None), - raw_content_ids=getattr(chat_message, 'raw_content_ids', None), - raw_content_logprobs=getattr(chat_message, 'raw_content_logprobs', None), - extra_info=getattr(chat_message, 'extra_info', {}) or {}, - tool_calls=tool_calls, - # tool_calls=[tool_call['function'] for tool_call in tool_calls] if tool_calls else None, - # tool_calls_ids=[tool_call['id'] for tool_call in tool_calls] if tool_calls else None, - stream_state=( - ModelStatusCode.SESSION_OUT_OF_LIMIT - if response.choices[0].finish_reason == 'length' - else ModelStatusCode.END - ), - finish_reason=response.choices[0].finish_reason, + content=msg.get('content') or "", + thinking=msg.get('reasoning_content'), + raw_content=msg.get('raw_content'), + content_ids=msg.get('content_ids'), + content_logprobs=msg.get('content_logprobs'), + thinking_ids=msg.get('reasoning_content_ids'), + thinking_logprobs=msg.get('reasoning_content_logprobs'), + raw_content_ids=msg.get('raw_content_ids'), + raw_content_logprobs=msg.get('raw_content_logprobs'), + extra_info=msg.get('extra_info') or {}, + tool_calls=msg.get('tool_calls'), + stream_state=(ModelStatusCode.SESSION_OUT_OF_LIMIT if finish_reason == 'length' else ModelStatusCode.END), + finish_reason=finish_reason, ) def to_model_request(self, role: str = 'assistant') -> dict: """Convert AgentMessage to model request dict.""" msg = {'role': role, 'content': self.content} - # tool_calls = [ - # {'id': tool_call_id, 'function': tool_call, 'type': 'function'} - # for tool_call, tool_call_id in zip(self.tool_calls or [], self.tool_calls_ids or []) - # ] - # if tool_calls: - # msg['tool_calls'] = tool_calls for key in [ 'tool_calls', - 'raw_content', 'content_ids', 'content_logprobs', + 'raw_content', 'raw_content_ids', 'raw_content_logprobs', 'extra_info', diff --git a/project_template/agents/config.py b/project_template/agents/config.py index b2b09ca0..521b60da 100644 --- a/project_template/agents/config.py +++ b/project_template/agents/config.py @@ -19,14 +19,11 @@ When unset, defaults to ``create_object(agent_config)``. """ -from lagent.agents.internclaw_agent import ( - AsyncEnvAgent, - AsyncPolicyAgent, - InternClawAgent, -) -from lagent.llms.model import AsyncAPIClient -from lagent.agents.aggregator.context import InternClawContextBuilder from lagent.actions.mcp_client import AsyncMCPClientSandbox +from lagent.agents import AsyncAgent +from lagent.agents.aggregator.context import InternClawContextBuilder +from lagent.agents.internclaw_agent import AsyncEnvAgent, InternClawAgent +from lagent.llms.model import AsyncAPIClient # ── Metadata ────────────────────────────────────────────────────────── @@ -60,7 +57,7 @@ agent_config = dict( type=InternClawAgent, policy_agent=dict( - type=AsyncPolicyAgent, + type=AsyncAgent, llm=llm, aggregator=dict(type=InternClawContextBuilder), name="policy", diff --git a/tests/data/agents/e2e-agent/config.py b/tests/data/agents/e2e-agent/config.py index 8c7256bf..212f99e1 100644 --- a/tests/data/agents/e2e-agent/config.py +++ b/tests/data/agents/e2e-agent/config.py @@ -6,12 +6,9 @@ from pathlib import Path +from lagent.agents import AsyncAgent from lagent.agents.aggregator.context import InternClawContextBuilder -from lagent.agents.internclaw_agent import ( - AsyncEnvAgent, - AsyncPolicyAgent, - InternClawAgent, -) +from lagent.agents.internclaw_agent import AsyncEnvAgent, InternClawAgent from lagent.llms.model import AsyncAPIClient name = "e2e-agent" @@ -35,7 +32,7 @@ agent_config = dict( type=InternClawAgent, policy_agent=dict( - type=AsyncPolicyAgent, + type=AsyncAgent, llm=llm, aggregator=dict(type=InternClawContextBuilder, workspace=Path("/tmp")), name="policy", diff --git a/tests/test_agent_service_e2e.py b/tests/test_agent_service_e2e.py index 76c16163..6be1d171 100644 --- a/tests/test_agent_service_e2e.py +++ b/tests/test_agent_service_e2e.py @@ -29,36 +29,26 @@ """ from __future__ import annotations - import argparse import asyncio import json import logging -import sys import os -from pathlib import Path +import sys from dataclasses import dataclass +from pathlib import Path from typing import Any # Ensure lagent is importable sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) -from lagent.agents.agent import AsyncAgent -from lagent.services.agent_loader import AgentSpec -from lagent.services.agent import ( - AgentEntry, - AgentService, - AgentStatus, -) +from lagent.actions.compact import COMPACT_PROMPT, AsyncCompactAction, CompactAction, estimate_token_count from lagent.actions.subagent import AgentAction, AsyncAgentAction -from lagent.actions.compact import ( - AsyncCompactAction, - CompactAction, - estimate_token_count, - COMPACT_PROMPT, -) +from lagent.agents.agent import AsyncAgent from lagent.memory.memory import BaseMemoryStore, ClaudeCodeMemory from lagent.schema import ActionReturn, ActionStatusCode, AgentMessage +from lagent.services.agent import AgentEntry, AgentService, AgentStatus +from lagent.services.agent_loader import AgentSpec logging.basicConfig( level=logging.INFO, @@ -71,6 +61,7 @@ # Helpers / Mocks # ===================================================================== + class MockLLM: """A mock LLM that returns canned responses.""" @@ -124,9 +115,7 @@ def __init__(self, llm=None, name="test-agent", **kwargs): ) async def forward(self, *messages, **kwargs): - formatted, tools = self.aggregator.aggregate( - self.memory, self.name, self.output_format, self.template - ) + formatted, tools = self.aggregator.aggregate(self.memory, self.name, self.output_format, self.template) resp = await self.llm.chat(formatted, tools=tools, **kwargs) return AgentMessage( sender=self.name, @@ -139,6 +128,7 @@ async def forward(self, *messages, **kwargs): # Level 1: Unit Tests (no network) # ===================================================================== + async def test_agent_service_basic(): """AgentService: register, spawn (with mock agent), list, query.""" print("\n" + "=" * 60) @@ -158,11 +148,13 @@ async def test_agent_service_basic(): type=f"{SimpleTestAgent.__module__}.{SimpleTestAgent.__qualname__}", llm=dict( type=f"{MockLLM.__module__}.{MockLLM.__qualname__}", - responses=[{ - "role": "assistant", - "content": "Echo: placeholder", - "tool_calls": [], - }], + responses=[ + { + "role": "assistant", + "content": "Echo: placeholder", + "tool_calls": [], + } + ], ), name="echo-agent", ), @@ -211,11 +203,13 @@ async def test_agent_action_wiring(): type=f"{SimpleTestAgent.__module__}.{SimpleTestAgent.__qualname__}", llm=dict( type=f"{MockLLM.__module__}.{MockLLM.__qualname__}", - responses=[{ - "role": "assistant", - "content": "Done: something", - "tool_calls": [], - }], + responses=[ + { + "role": "assistant", + "content": "Done: something", + "tool_calls": [], + } + ], ), name="helper", ), @@ -256,11 +250,15 @@ async def test_compact_action_unit(): # Create a policy agent with mock LLM that returns a summary policy = SimpleTestAgent( - llm=MockLLM([{ - "role": "assistant", - "content": "## Summary\nUser asked to test compact.\n## Pending\nNothing.", - "tool_calls": [], - }]), + llm=MockLLM( + [ + { + "role": "assistant", + "content": "## Summary\nUser asked to test compact.\n## Pending\nNothing.", + "tool_calls": [], + } + ] + ), name="policy", ) # Feed some history into policy's memory @@ -303,11 +301,15 @@ async def test_claude_code_memory_unit(): svc = AgentService() policy = SimpleTestAgent( - llm=MockLLM([{ - "role": "assistant", - "content": "Compacted summary here.", - "tool_calls": [], - }]), + llm=MockLLM( + [ + { + "role": "assistant", + "content": "Compacted summary here.", + "tool_calls": [], + } + ] + ), name="policy", ) policy.memory.add(AgentMessage(sender="user", content="test")) @@ -358,11 +360,13 @@ async def test_spawn_with_state(): type=f"{SimpleTestAgent.__module__}.{SimpleTestAgent.__qualname__}", llm=dict( type=f"{MockLLM.__module__}.{MockLLM.__qualname__}", - responses=[{ - "role": "assistant", - "content": "Worker done", - "tool_calls": [], - }], + responses=[ + { + "role": "assistant", + "content": "Worker done", + "tool_calls": [], + } + ], ), name="worker", ), @@ -398,11 +402,13 @@ async def test_agent_spec_create(): type=f"{SimpleTestAgent.__module__}.{SimpleTestAgent.__qualname__}", llm=dict( type=f"{MockLLM.__module__}.{MockLLM.__qualname__}", - responses=[{ - "role": "assistant", - "content": "PyConfig agent works!", - "tool_calls": [], - }], + responses=[ + { + "role": "assistant", + "content": "PyConfig agent works!", + "tool_calls": [], + } + ], ), name="pyconfig-test", ), @@ -484,6 +490,7 @@ async def test_agent_service_persistence(): # Level 2: Integration Tests (real LLM) # ===================================================================== + async def test_real_llm_compact(): """Integration: CompactAction with a real LLM endpoint. @@ -497,9 +504,9 @@ async def test_real_llm_compact(): print("TEST: Real LLM compact (integration)") print("=" * 60) - from lagent.llms.model import AsyncAPIClient, ModelConfig, SampleParameters + from lagent.agents import AsyncAgent from lagent.agents.aggregator.context import InternClawContextBuilder - from lagent.agents.internclaw_agent import AsyncPolicyAgent + from lagent.llms.model import AsyncAPIClient, ModelConfig, SampleParameters base_url = os.environ.get("LLM_BASE_URL", "http://35.220.164.252:3888/v1") api_key = os.environ.get("LLM_API_KEY", " ") @@ -520,26 +527,32 @@ async def test_real_llm_compact(): aggregator = InternClawContextBuilder(workspace, tools=None) # Build policy agent - policy = AsyncPolicyAgent( + policy = AsyncAgent( llm=model, aggregator=aggregator, name="policy", ) # Simulate a conversation - policy.memory.add(AgentMessage( - sender="user", - content="Please help me write a Python function that calculates fibonacci numbers.", - )) - policy.memory.add(AgentMessage( - sender="policy", - content="Sure! Here's a recursive fibonacci function:\n\n```python\ndef fib(n):\n if n <= 1: return n\n return fib(n-1) + fib(n-2)\n```", - tool_calls=[], - )) - policy.memory.add(AgentMessage( - sender="user", - content="Can you make it iterative and add memoization?", - )) + policy.memory.add( + AgentMessage( + sender="user", + content="Please help me write a Python function that calculates fibonacci numbers.", + ) + ) + policy.memory.add( + AgentMessage( + sender="policy", + content="Sure! Here's a recursive fibonacci function:\n\n```python\ndef fib(n):\n if n <= 1: return n\n return fib(n-1) + fib(n-2)\n```", + tool_calls=[], + ) + ) + policy.memory.add( + AgentMessage( + sender="user", + content="Can you make it iterative and add memoization?", + ) + ) # Create AgentService + CompactAction svc = AgentService() @@ -552,9 +565,7 @@ async def test_real_llm_compact(): ) # Check should_compact - token_est = estimate_token_count( - [{"content": "x" * 200}] # simulate some tokens - ) + token_est = estimate_token_count([{"content": "x" * 200}]) # simulate some tokens print(f" Token estimate: {token_est}, threshold: {compact.threshold_tokens}") assert compact.should_compact(token_est) or token_est < compact.threshold_tokens @@ -580,9 +591,9 @@ async def test_real_llm_agent_service_spawn(): print("TEST: Real LLM AgentService spawn (integration)") print("=" * 60) - from lagent.llms.model import AsyncAPIClient, ModelConfig, SampleParameters + from lagent.agents import AsyncAgent from lagent.agents.aggregator.context import InternClawContextBuilder - from lagent.agents.internclaw_agent import AsyncPolicyAgent + from lagent.llms.model import AsyncAPIClient, ModelConfig, SampleParameters base_url = os.environ.get("LLM_BASE_URL", "http://35.220.164.252:3888/v1") api_key = os.environ.get("LLM_API_KEY", " ") @@ -598,7 +609,7 @@ async def test_real_llm_agent_service_spawn(): model_cfg = ModelConfig(model=model_name, base_url=base_url, api_key=api_key, proxy=proxy) sample_params = SampleParameters(temperature=0.7, top_p=1.0, top_k=50) - # We can't use full PyConfig here because AsyncPolicyAgent needs + # We can't use full PyConfig here because AsyncAgent needs # a ContextBuilder which needs a workspace path. So we use a # custom factory instead. async def real_llm_factory(spec: AgentSpec, task: str): @@ -610,7 +621,7 @@ async def real_llm_factory(spec: AgentSpec, task: str): sleep_interval=2, ) aggregator = InternClawContextBuilder(workspace, tools=None) - agent = AsyncPolicyAgent( + agent = AsyncAgent( llm=model, aggregator=aggregator, name=spec.name, @@ -648,14 +659,11 @@ async def test_real_llm_full_pipeline(): print("TEST: Full pipeline with real LLM (integration)") print("=" * 60) - from lagent.llms.model import AsyncAPIClient, ModelConfig, SampleParameters + from lagent.agents import AsyncAgent from lagent.agents.aggregator.context import InternClawContextBuilder - from lagent.agents.internclaw_agent import ( - AsyncPolicyAgent, - AsyncEnvAgent, - InternClawAgent, - ) + from lagent.agents.internclaw_agent import AsyncEnvAgent, InternClawAgent from lagent.hooks.logger import MessageLogger + from lagent.llms.model import AsyncAPIClient, ModelConfig, SampleParameters base_url = os.environ.get("LLM_BASE_URL", "http://35.220.164.252:3888/v1") api_key = os.environ.get("LLM_API_KEY", " ") @@ -678,7 +686,7 @@ async def test_real_llm_full_pipeline(): svc = AgentService() # Step 2: Create PolicyAgent - policy = AsyncPolicyAgent( + policy = AsyncAgent( llm=model, aggregator=aggregator, name="policy", @@ -722,6 +730,7 @@ async def test_real_llm_full_pipeline(): except Exception as exc: print(f" ⚠️ Agent failed: {exc}") import traceback + traceback.print_exc() print(" 🎉 Full pipeline: DONE\n") @@ -731,6 +740,7 @@ async def test_real_llm_full_pipeline(): # Level 3: Full E2E (real LLM + sandbox) # ===================================================================== + async def test_e2e_with_sandbox(): """Full E2E: InternClawAgent with sandbox MCP actions. @@ -741,17 +751,13 @@ async def test_e2e_with_sandbox(): print("TEST: Full E2E with sandbox (e2e)") print("=" * 60) - from lagent.llms.model import AsyncAPIClient, ModelConfig, SampleParameters - from lagent.agents.aggregator.context import InternClawContextBuilder - from lagent.agents.internclaw_agent import ( - AsyncPolicyAgent, - AsyncEnvAgent, - InternClawAgent, - ) - from lagent.skills.skills import SkillsLoader, SandboxSkillsBackend - from lagent.memory.memory import SandboxMemoryBackend from lagent.actions.mcp_client import AsyncMCPClientSandbox + from lagent.agents import AsyncAgent + from lagent.agents.aggregator.context import InternClawContextBuilder + from lagent.agents.internclaw_agent import AsyncEnvAgent, InternClawAgent from lagent.hooks.logger import MessageLogger + from lagent.llms.model import AsyncAPIClient, ModelConfig, SampleParameters + from lagent.skills.skills import SandboxSkillsBackend, SkillsLoader base_url = os.environ.get("LLM_BASE_URL", "http://35.220.164.252:3888/v1") api_key = os.environ.get("LLM_API_KEY", " ") @@ -774,6 +780,7 @@ async def test_e2e_with_sandbox(): # Discover workspace home_path = await shell_action.run(command='pwd') import json as _json + cwd = _json.loads(home_path.result[0]['content'])['cwd'] workspace_path = os.path.join(cwd, 'workspace') print(f" Workspace: {workspace_path}") @@ -784,7 +791,7 @@ async def test_e2e_with_sandbox(): # AgentService + CompactAction svc = AgentService() - policy = AsyncPolicyAgent( + policy = AsyncAgent( llm=model, aggregator=aggregator, name="policy", @@ -821,6 +828,7 @@ async def test_e2e_with_sandbox(): except Exception as exc: print(f" ⚠️ E2E test failed: {exc}") import traceback + traceback.print_exc() finally: try: @@ -835,6 +843,7 @@ async def test_e2e_with_sandbox(): # Runner # ===================================================================== + async def run_unit_tests(): """Run all unit tests (no network required).""" print("\n" + "#" * 60) @@ -887,6 +896,7 @@ def main(): # Default to unit tests if nothing specified if not any([args.unit, args.integration, args.e2e, args.all]): args.unit = True + async def run_all(): if args.unit or args.all: await run_unit_tests() diff --git a/tests/test_agents/test_compact_integration.py b/tests/test_agents/test_compact_integration.py index 2b9c01b3..5fe7da94 100644 --- a/tests/test_agents/test_compact_integration.py +++ b/tests/test_agents/test_compact_integration.py @@ -15,18 +15,17 @@ import tempfile from pathlib import Path -from lagent.agents.compact_agent import AsyncCompactAgent, estimate_token_count -from lagent.agents.internclaw_agent import ( - AsyncEnvAgent, AsyncPolicyAgent, InternClawAgent, -) +from lagent.actions.save_memory import SaveMemoryAction +from lagent.agents import AsyncAgent from lagent.agents.aggregator.context import InternClawContextBuilder +from lagent.agents.compact_agent import AsyncCompactAgent, estimate_token_count +from lagent.agents.internclaw_agent import AsyncEnvAgent, InternClawAgent from lagent.memory import Memory, OpenClawMemoryProvider -from lagent.actions.save_memory import SaveMemoryAction from lagent.schema import AgentMessage - # ── Helpers ─────────────────────────────────────────────────────── + class MockLLM: """Mock LLM that returns predictable responses.""" @@ -47,6 +46,7 @@ async def chat(self, messages, **kwargs): # ── Test 1: Provider + EnvAgent get_info injection ──────────────── + async def test_provider_injects_into_env_info(): """Verify provider.get_info() content appears in env_info['memory'].""" with tempfile.TemporaryDirectory() as tmpdir: @@ -66,6 +66,7 @@ async def test_provider_injects_into_env_info(): # ── Test 2: CompactAgent with formatted_messages ────────────────── + async def test_compact_formats_and_summarizes(): """Verify CompactAgent correctly formats list[dict] input and produces summary.""" received = {} @@ -107,6 +108,7 @@ async def chat(self, messages, **kwargs): # ── Test 3: ContextBuilder handles compact_boundary ─────────────── + async def test_context_builder_with_compact_boundary(): """Verify ContextBuilder skips messages before boundary and prepends summary.""" with tempfile.TemporaryDirectory() as tmpdir: @@ -126,10 +128,14 @@ async def test_context_builder_with_compact_boundary(): mem.add(AgentMessage(sender='user', content='msg2', role='user')) mem.add(AgentMessage(sender='agent', content='msg3', role='assistant')) # This message carries the compact info - mem.add(AgentMessage( - sender='user', content='msg4 (after compact)', role='user', - env_info=env_info_with_compact, - )) + mem.add( + AgentMessage( + sender='user', + content='msg4 (after compact)', + role='user', + env_info=env_info_with_compact, + ) + ) mem.add(AgentMessage(sender='agent', content='msg5', role='assistant')) messages, tools = builder.aggregate(mem, name='agent') @@ -173,6 +179,7 @@ async def test_context_builder_without_compact(): # ── Test 4: Provider + SaveMemoryAction + ContextBuilder ────────── + async def test_provider_action_contextbuilder_flow(): """Full flow: write via action → read via provider → inject into ContextBuilder.""" with tempfile.TemporaryDirectory() as tmpdir: @@ -193,10 +200,14 @@ async def test_provider_action_contextbuilder_flow(): # Build context with provider info builder = InternClawContextBuilder(workspace) mem = Memory() - mem.add(AgentMessage( - sender='user', content='What do I like?', role='user', - env_info={'memory': info}, - )) + mem.add( + AgentMessage( + sender='user', + content='What do I like?', + role='user', + env_info={'memory': info}, + ) + ) messages, tools = builder.aggregate(mem, name='agent') system_prompt = messages[0]['content'] @@ -205,12 +216,14 @@ async def test_provider_action_contextbuilder_flow(): # ── Test 5: Full InternClawAgent loop with compact ──────────────── + async def test_internclaw_compact_triggers(): """Verify compact triggers during InternClawAgent loop when tokens exceed threshold.""" compact_called = {'count': 0} class PolicyLLM: """Simulates policy: returns tool_calls for first N turns, then stops.""" + def __init__(self): self._turn = 0 @@ -239,7 +252,8 @@ async def chat(self, messages, **kwargs): ) from lagent.agents.aggregator import DefaultAggregator - policy = AsyncPolicyAgent(llm=PolicyLLM(), aggregator=DefaultAggregator()) + + policy = AsyncAgent(llm=PolicyLLM(), aggregator=DefaultAggregator()) # Minimal env that just passes through env = AsyncEnvAgent(actions=[]) @@ -254,19 +268,21 @@ async def chat(self, messages, **kwargs): result = await agent("Start a conversation about memory refactoring") # Compact should have been called at least once - assert compact_called['count'] > 0, \ - f"Compact should have triggered, but was called {compact_called['count']} times" + assert ( + compact_called['count'] > 0 + ), f"Compact should have triggered, but was called {compact_called['count']} times" # ── Real model test ─────────────────────────────────────────────── + async def test_real_compact_with_provider(): """Integration: real LLM + provider + compact.""" from lagent.llms.model import AsyncAPIClient, ModelConfig, SampleParameters model_name = "gpt-5.4-mini" api_base = "http://35.220.164.252:3888/v1" - api_key = "" + api_key = "" proxy = "http://100.100.72.89:8899" model = AsyncAPIClient( @@ -299,7 +315,10 @@ async def test_real_compact_with_provider(): formatted_messages = [ {'role': 'system', 'content': f"You are helpful.\n\nMemory:\n{env_info.get('long_term', '')}"}, {'role': 'user', 'content': 'Help me design the memory system'}, - {'role': 'assistant', 'content': 'Based on the project context, I see you prefer minimal abstractions. Let me propose a design.'}, + { + 'role': 'assistant', + 'content': 'Based on the project context, I see you prefer minimal abstractions. Let me propose a design.', + }, {'role': 'user', 'content': 'Yes, Memory should be a pure list, no LTM base class'}, {'role': 'assistant', 'content': 'Agreed. Provider=read, Action=write, both independent.'}, {'role': 'user', 'content': 'What about compact?'}, @@ -320,18 +339,21 @@ async def test_real_compact_with_provider(): assert content and len(content) > 50, "Summary should be substantial" content_lower = content.lower() - assert any(w in content_lower for w in ['memory', 'compact', 'provider']), \ - "Summary should mention key topics from the conversation" + assert any( + w in content_lower for w in ['memory', 'compact', 'provider'] + ), "Summary should mention key topics from the conversation" # Verify the provider's memory content influenced the summary - assert any(w in content_lower for w in ['minimal', 'abstraction', 'lagent']), \ - "Summary should reflect project context from provider" + assert any( + w in content_lower for w in ['minimal', 'abstraction', 'lagent'] + ), "Summary should reflect project context from provider" print(" Real compact + provider: OK") # ── Runner ──────────────────────────────────────────────────────── + async def main(): run_real = True diff --git a/tests/test_agents/test_framework_agents.py b/tests/test_agents/test_framework_agents.py index cb680e72..f1b6c48c 100644 --- a/tests/test_agents/test_framework_agents.py +++ b/tests/test_agents/test_framework_agents.py @@ -4,10 +4,11 @@ import tempfile from pathlib import Path -from lagent.agents.internclaw_agent import AsyncEnvAgent, AsyncPolicyAgent, InternClawAgent +from lagent.agents import AsyncAgent from lagent.agents.compact_agent import AsyncCompactAgent -from lagent.memory.openclaw_provider import OpenClawMemoryProvider +from lagent.agents.internclaw_agent import AsyncEnvAgent, InternClawAgent from lagent.memory.claude_code_provider import ClaudeCodeMemoryProvider +from lagent.memory.openclaw_provider import OpenClawMemoryProvider class MockLLM: @@ -18,7 +19,7 @@ async def chat(self, messages, **kwargs): async def test_internclaw_with_compact(): llm = MockLLM() agent = InternClawAgent( - policy_agent=AsyncPolicyAgent(llm=llm), + policy_agent=AsyncAgent(llm=llm), env_agent=AsyncEnvAgent(actions=[]), compact_agent=AsyncCompactAgent(llm=llm), consolidate_agent=None, @@ -32,7 +33,7 @@ async def test_internclaw_without_compact(): """Claude Code style — no compact, no consolidate.""" llm = MockLLM() agent = InternClawAgent( - policy_agent=AsyncPolicyAgent(llm=llm), + policy_agent=AsyncAgent(llm=llm), env_agent=AsyncEnvAgent(actions=[]), max_turn=10, ) diff --git a/tests/test_agents/test_internclaw_e2e.py b/tests/test_agents/test_internclaw_e2e.py index 6938dc33..7ad0e6ae 100644 --- a/tests/test_agents/test_internclaw_e2e.py +++ b/tests/test_agents/test_internclaw_e2e.py @@ -16,19 +16,18 @@ import tempfile from pathlib import Path -from lagent.agents.compact_agent import AsyncCompactAgent, estimate_token_count -from lagent.agents.internclaw_agent import ( - AsyncEnvAgent, AsyncPolicyAgent, InternClawAgent, -) +from lagent.actions.save_memory import AsyncSaveMemoryAction, SaveMemoryAction +from lagent.agents import AsyncAgent from lagent.agents.aggregator.context import InternClawContextBuilder from lagent.agents.aggregator.default_aggregator import DefaultAggregator +from lagent.agents.compact_agent import AsyncCompactAgent, estimate_token_count +from lagent.agents.internclaw_agent import AsyncEnvAgent, InternClawAgent from lagent.memory import Memory, OpenClawMemoryProvider -from lagent.actions.save_memory import SaveMemoryAction, AsyncSaveMemoryAction from lagent.schema import AgentMessage - # ── Test 1: Consolidate agent writes to LTM ────────────────────── + async def test_consolidate_agent_writes_memory(): """Consolidate agent = InternClawAgent with SaveMemoryAction. @@ -37,26 +36,31 @@ async def test_consolidate_agent_writes_memory(): class ConsolidateLLM: """Mock LLM that calls save_memory tool.""" + async def chat(self, messages, **kwargs): return { 'content': '', - 'tool_calls': [{ - 'id': 'call_1', - 'function': { - 'name': 'AsyncSaveMemoryAction', - 'arguments': json.dumps({ - 'history_entry': '[2026-04-09 12:00] Discussed memory refactoring. Decided to split compact and LTM.', - 'memory_update': '# Facts\n- User prefers minimal abstractions\n- Memory is a pure list\n- CompactAgent is a standard AsyncAgent', - }), - }, - }], + 'tool_calls': [ + { + 'id': 'call_1', + 'function': { + 'name': 'AsyncSaveMemoryAction', + 'arguments': json.dumps( + { + 'history_entry': '[2026-04-09 12:00] Discussed memory refactoring. Decided to split compact and LTM.', + 'memory_update': '# Facts\n- User prefers minimal abstractions\n- Memory is a pure list\n- CompactAgent is a standard AsyncAgent', + } + ), + }, + } + ], } with tempfile.TemporaryDirectory() as tmpdir: workspace = Path(tmpdir) save_action = AsyncSaveMemoryAction(workspace) - consolidate_policy = AsyncPolicyAgent(llm=ConsolidateLLM()) + consolidate_policy = AsyncAgent(llm=ConsolidateLLM()) consolidate_env = AsyncEnvAgent(actions=[save_action]) consolidate_agent = InternClawAgent( @@ -90,6 +94,7 @@ async def chat(self, messages, **kwargs): # ── Test 2: Consolidate + Provider round-trip ───────────────────── + async def test_consolidate_then_provider_reads(): """Consolidate writes → provider reads back the same content.""" @@ -97,16 +102,20 @@ class ConsolidateLLM: async def chat(self, messages, **kwargs): return { 'content': '', - 'tool_calls': [{ - 'id': 'call_1', - 'function': { - 'name': 'AsyncSaveMemoryAction', - 'arguments': json.dumps({ - 'history_entry': '[2026-04-09] Round-trip test', - 'memory_update': '# Memory\n- Round-trip test passed', - }), - }, - }], + 'tool_calls': [ + { + 'id': 'call_1', + 'function': { + 'name': 'AsyncSaveMemoryAction', + 'arguments': json.dumps( + { + 'history_entry': '[2026-04-09] Round-trip test', + 'memory_update': '# Memory\n- Round-trip test passed', + } + ), + }, + } + ], } with tempfile.TemporaryDirectory() as tmpdir: @@ -114,7 +123,7 @@ async def chat(self, messages, **kwargs): # Consolidate writes consolidate = InternClawAgent( - policy_agent=AsyncPolicyAgent(llm=ConsolidateLLM()), + policy_agent=AsyncAgent(llm=ConsolidateLLM()), env_agent=AsyncEnvAgent(actions=[AsyncSaveMemoryAction(workspace)]), max_turn=1, finish_condition=None, @@ -130,6 +139,7 @@ async def chat(self, messages, **kwargs): # ── Test 3: Full pipeline mock — compact + consolidate ──────────── + async def test_full_pipeline_compact_and_consolidate(): """Full InternClawAgent pipeline: policy loops → compact triggers → consolidate writes LTM → compact compresses context. @@ -146,10 +156,12 @@ async def chat(self, messages, **kwargs): if self._turn <= 2: return { 'content': f'Working on turn {self._turn}...', - 'tool_calls': [{ - 'id': f'call_{self._turn}', - 'function': {'name': 'test_tool', 'arguments': '{}'}, - }], + 'tool_calls': [ + { + 'id': f'call_{self._turn}', + 'function': {'name': 'test_tool', 'arguments': '{}'}, + } + ], } return {'content': 'All done.'} @@ -158,16 +170,20 @@ async def chat(self, messages, **kwargs): consolidate_called['count'] += 1 return { 'content': '', - 'tool_calls': [{ - 'id': 'cons_1', - 'function': { - 'name': 'AsyncSaveMemoryAction', - 'arguments': json.dumps({ - 'history_entry': '[2026-04-09] Consolidated', - 'memory_update': '# Consolidated memory', - }), - }, - }], + 'tool_calls': [ + { + 'id': 'cons_1', + 'function': { + 'name': 'AsyncSaveMemoryAction', + 'arguments': json.dumps( + { + 'history_entry': '[2026-04-09] Consolidated', + 'memory_update': '# Consolidated memory', + } + ), + }, + } + ], } class CompactLLM: @@ -181,7 +197,7 @@ async def chat(self, messages, **kwargs): # Consolidate agent consolidate = InternClawAgent( - policy_agent=AsyncPolicyAgent(llm=ConsolidateLLM()), + policy_agent=AsyncAgent(llm=ConsolidateLLM()), env_agent=AsyncEnvAgent(actions=[AsyncSaveMemoryAction(workspace)]), max_turn=1, finish_condition=None, @@ -196,7 +212,7 @@ async def chat(self, messages, **kwargs): # Main agent agent = InternClawAgent( - policy_agent=AsyncPolicyAgent( + policy_agent=AsyncAgent( llm=PolicyLLM(), aggregator=DefaultAggregator(), ), @@ -222,6 +238,7 @@ async def chat(self, messages, **kwargs): # ── Test 4: ContextBuilder with compact + provider together ────── + async def test_context_builder_full_assembly(): """ContextBuilder assembles: system prompt (with LTM) + compact summary + recent messages.""" with tempfile.TemporaryDirectory() as tmpdir: @@ -247,10 +264,14 @@ async def test_context_builder_full_assembly(): 'conversation_summary': '## Summary\nDiscussed Python preferences.', 'compact_boundary': 4, } - mem.add(AgentMessage( - sender='user', content='new msg after compact', role='user', - env_info=env_info_with_compact, - )) + mem.add( + AgentMessage( + sender='user', + content='new msg after compact', + role='user', + env_info=env_info_with_compact, + ) + ) mem.add(AgentMessage(sender='agent', content='response after compact', role='assistant')) messages, tools = builder.aggregate(mem, name='agent') @@ -275,6 +296,7 @@ async def test_context_builder_full_assembly(): # ── Test 5: Multiple compact rounds ────────────────────────────── + async def test_multiple_compact_rounds(): """Verify compact can trigger multiple times in a long session.""" compact_count = {'n': 0} @@ -288,10 +310,12 @@ async def chat(self, messages, **kwargs): if self._turn <= 6: return { 'content': f'Turn {self._turn} ' + 'x' * 100, # padding for tokens - 'tool_calls': [{ - 'id': f'c_{self._turn}', - 'function': {'name': 'noop', 'arguments': '{}'}, - }], + 'tool_calls': [ + { + 'id': f'c_{self._turn}', + 'function': {'name': 'noop', 'arguments': '{}'}, + } + ], } return {'content': 'Done.'} @@ -307,7 +331,7 @@ async def chat(self, messages, **kwargs): ) agent = InternClawAgent( - policy_agent=AsyncPolicyAgent( + policy_agent=AsyncAgent( llm=PolicyLLM(), aggregator=DefaultAggregator(), ), @@ -322,13 +346,14 @@ async def chat(self, messages, **kwargs): # ── Real model test ─────────────────────────────────────────────── + async def test_real_full_pipeline(): """Real model: consolidate + compact + provider full round-trip.""" from lagent.llms.model import AsyncAPIClient, ModelConfig, SampleParameters model_name = "gpt-5.4-mini" api_base = "http://35.220.164.252:3888/v1" - api_key = "" + api_key = "" proxy = "http://100.100.72.89:8899" model = AsyncAPIClient( @@ -351,7 +376,7 @@ async def test_real_full_pipeline(): # Consolidate agent with real model consolidate = InternClawAgent( - policy_agent=AsyncPolicyAgent(llm=model), + policy_agent=AsyncAgent(llm=model), env_agent=AsyncEnvAgent(actions=[AsyncSaveMemoryAction(workspace)]), max_turn=1, finish_condition=None, @@ -394,9 +419,12 @@ async def test_real_full_pipeline(): {'role': 'assistant', 'content': 'Done, no state tracking'}, ] - compact_result = await compact(AgentMessage( - sender='test', content=formatted_messages, - )) + compact_result = await compact( + AgentMessage( + sender='test', + content=formatted_messages, + ) + ) content = compact_result.content if isinstance(content, dict): @@ -411,6 +439,7 @@ async def test_real_full_pipeline(): # ── Runner ──────────────────────────────────────────────────────── + async def main(): run_real = True diff --git a/tests/test_services/test_agent_loader.py b/tests/test_services/test_agent_loader.py index cb0ef800..cb38e33f 100644 --- a/tests/test_services/test_agent_loader.py +++ b/tests/test_services/test_agent_loader.py @@ -15,17 +15,16 @@ from lagent.services.agent_loader import AgentLoader, AgentSpec, _import_module_from_path - # ── Fixtures ───────────────────────────────────────────────────────── CONFIG_STANDARD = """\ from lagent.agents.internclaw_agent import ( AsyncEnvAgent, - AsyncPolicyAgent, InternClawAgent, ) from lagent.llms.model import AsyncAPIClient +from lagent.agents import AsyncAgent from lagent.agents.aggregator.context import InternClawContextBuilder name = "code-reviewer" @@ -47,7 +46,7 @@ agent_config = dict( type=InternClawAgent, policy_agent=dict( - type=AsyncPolicyAgent, + type=AsyncAgent, llm=llm, aggregator=dict(type=InternClawContextBuilder), name="policy", @@ -64,10 +63,10 @@ CONFIG_WITH_BUILD = """\ from lagent.agents.internclaw_agent import ( AsyncEnvAgent, - AsyncPolicyAgent, InternClawAgent, ) from lagent.llms.model import AsyncAPIClient +from lagent.agents import AsyncAgent from lagent.agents.aggregator.context import InternClawContextBuilder name = "translator" @@ -87,7 +86,7 @@ agent_config = dict( type=InternClawAgent, policy_agent=dict( - type=AsyncPolicyAgent, + type=AsyncAgent, llm=llm, aggregator=dict(type=InternClawContextBuilder), name="policy", @@ -269,6 +268,7 @@ async def test_spec_config_has_internclaw_structure(workspace): # Top-level: InternClawAgent from lagent.agents.internclaw_agent import InternClawAgent + assert cfg["type"] is InternClawAgent # Nested: policy_agent with LLM @@ -317,6 +317,7 @@ def mock_build(config): @pytest.mark.asyncio async def test_spec_acreate_uses_async_build(): """acreate() handles async build functions.""" + async def async_build(config): return type("MockAgent", (), {"name": "async-built"})() diff --git a/workspace/agents/default_agent/config.py b/workspace/agents/default_agent/config.py index 688fe31b..bf546ec4 100644 --- a/workspace/agents/default_agent/config.py +++ b/workspace/agents/default_agent/config.py @@ -58,7 +58,7 @@ # ── Policy Agent ── policy_agent = dict( - type="lagent.agents.internclaw_agent.AsyncPolicyAgent", + type="lagent.agents.AsyncAgent", llm=model, aggregator=dict( type="lagent.agents.aggregator.context.InternClawContextBuilder", @@ -94,7 +94,7 @@ consolidate_agent = dict( type="lagent.agents.internclaw_agent.InternClawAgent", policy_agent=dict( - type="lagent.agents.internclaw_agent.AsyncPolicyAgent", + type="lagent.agents.AsyncAgent", name="consolidate_policy", llm=model, template=CONSOLIDATION_PROMPT, From a51312f6bb597b07c2c3e3c797db33ae9a9329d0 Mon Sep 17 00:00:00 2001 From: braisedpork1964 <497494458@qq.com> Date: Tue, 28 Apr 2026 05:12:42 +0000 Subject: [PATCH 2/3] fix type --- .../agents/aggregator/default_aggregator.py | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/lagent/agents/aggregator/default_aggregator.py b/lagent/agents/aggregator/default_aggregator.py index 0e28434b..4da71ed1 100644 --- a/lagent/agents/aggregator/default_aggregator.py +++ b/lagent/agents/aggregator/default_aggregator.py @@ -7,13 +7,14 @@ class DefaultAggregator: - def aggregate(self, - messages: Memory, - name: str, - parser: StrParser = None, - system_instruction: str = None, - tools: List[Dict] = None, - ) -> Tuple[List[Dict[str, str]], Optional[List[Dict]]]: + def aggregate( + self, + messages: Memory, + name: str, + parser: StrParser = None, + system_instruction: str = None, + tools: List[Dict] = None, + ) -> Tuple[List[Dict[str, str]], Optional[List[Dict]]]: _message = [] messages = messages.get_memory() if system_instruction: @@ -38,12 +39,17 @@ def aggregate(self, ) ) else: - if len(_message) > 0 and _message[-1]['role'] == 'user': + if ( + len(_message) > 0 + and _message[-1]['role'] == 'user' + and isinstance(_message[-1]['content'], str) + and isinstance(user_message, str) + ): _message[-1]['content'] += user_message _message[-1]['extra_info'] = extra_info else: _message.append(dict(role='user', content=user_message, extra_info=extra_info)) - + latest_env_info = None for message in messages: if getattr(message, 'env_info', None) is not None: @@ -52,7 +58,7 @@ def aggregate(self, tools_to_use = tools if latest_env_info and latest_env_info.get("tools"): tools_to_use = latest_env_info.get("tools") - + return _message, tools_to_use @staticmethod From 79fa628f7d34d906c1c6a67545667dc308e77c21 Mon Sep 17 00:00:00 2001 From: braisedpork1964 <497494458@qq.com> Date: Tue, 28 Apr 2026 05:13:23 +0000 Subject: [PATCH 3/3] improve `FunctionCallAgent` --- lagent/agents/fc_agent.py | 164 ++++++++++++++++++++++++++++++-------- 1 file changed, 132 insertions(+), 32 deletions(-) diff --git a/lagent/agents/fc_agent.py b/lagent/agents/fc_agent.py index 50037504..f450fb65 100644 --- a/lagent/agents/fc_agent.py +++ b/lagent/agents/fc_agent.py @@ -1,17 +1,22 @@ import asyncio import json +import logging +import platform from copy import deepcopy from dataclasses import asdict -from typing import Dict, List, Literal, Optional, Union +from typing import Any, Dict, List, Literal, Optional, Protocol, Union from tenacity import retry, retry_if_result, stop_after_attempt, wait_fixed from lagent.actions import AsyncActionExecutor from lagent.hooks import Hook from lagent.schema import ActionReturn, ActionStatusCode, ActionValidCode, AgentMessage, AgentStatusCode +from lagent.skills.skills import SkillsLoader from lagent.utils import create_object, truncate_text from .agent import AsyncAgent +logger = logging.getLogger("lagent.agents.fc_agent") + DEFAULT_TOOL_TEMPLATE = """# Tools You may call one or more functions to assist with the user query. @@ -27,7 +32,9 @@ """ -def get_tool_prompt(actions: list, exclude_arguments: list = None, template: str = DEFAULT_TOOL_TEMPLATE) -> str: +def get_tool_prompt( + actions: list, exclude_arguments: list = None, to_string: bool = True, template: str = DEFAULT_TOOL_TEMPLATE +) -> Union[str, List[dict]]: exclude_arguments = exclude_arguments or ['session_id'] def _convert_tool_schema(action_description: dict, name_pattern: str = '{}') -> dict: @@ -57,52 +64,123 @@ def _convert_tool_schema(action_description: dict, name_pattern: str = '{}') -> tools.append(_convert_tool_schema(api, f"{action.name}.{{}}")) else: tools.append(_convert_tool_schema(action_desc)) + if not to_string: + return tools return template.format(tools='\n'.join([json.dumps(tool, ensure_ascii=False) for tool in tools])) class FunctionCallAgent(AsyncAgent): def __init__( self, - select_agent: Union[Dict, AsyncAgent], + policy_agent: Union[Dict, AsyncAgent], env_agent: Union[Dict, AsyncAgent], - finish_condition: callable = lambda x, _: x and not x.tool_calls, + compact_agent: Optional[Dict] = None, + consolidate_agent: Optional[Dict] = None, + finish_condition: callable = lambda m, _: m and not m.tool_calls, max_turn: Optional[int] = None, + initialize_input: bool = True, name: Optional[str] = None, ): super().__init__(name=name) - self.select_agent = create_object(select_agent) + self.policy_agent = create_object(policy_agent) self.env_agent = create_object(env_agent) + self.compact_agent = create_object(compact_agent) + self.consolidate_agent = create_object(consolidate_agent) self.finish_condition = finish_condition self.max_turn = max_turn + self.initialize_input = initialize_input - async def forward(self, env_message: AgentMessage, session_id: str | int, **kwargs): - selection_message: AgentMessage = None + async def forward(self, env_message: AgentMessage, **kwargs): + policy_message: AgentMessage = None current_turn = 0 - while (self.finish_condition is None or not self.finish_condition(selection_message, env_message)) and ( + if self.initialize_input: + env_message = await self.env_agent(env_message, **kwargs) + + while (self.finish_condition is None or not self.finish_condition(policy_message, env_message)) and ( self.max_turn is None or current_turn < self.max_turn ): - selection_message = await self.select_agent(env_message, session_id=session_id, **kwargs) - if selection_message.stream_state == AgentStatusCode.SERVER_ERR: + policy_message = await self.policy_agent(env_message, **kwargs) + if policy_message.stream_state == AgentStatusCode.SERVER_ERR: raise ValueError("Rollout response error: state is neither completed nor aborted!") - if selection_message.stream_state == AgentStatusCode.SESSION_OUT_OF_LIMIT: + if policy_message.stream_state == AgentStatusCode.SESSION_OUT_OF_LIMIT: for _ in range(2): # remove the last two messages - self.select_agent.memory.get(session_id).delete(-1) + self.policy_agent.memory.delete(-1) return AgentMessage( sender=self.name, - content='Exceeded context length limit', - finish_reason=selection_message.finish_reason, + content=policy_message.content, + finish_reason=policy_message.finish_reason, ) - if selection_message.finish_reason == 'abort': - return AgentMessage(sender=self.name, content='Aborted request', finish_reason='abort') - env_message = await self.env_agent(selection_message, session_id=session_id) + if policy_message.finish_reason == 'abort': + return AgentMessage(sender=self.name, content=policy_message.content, finish_reason='abort') + + # Orchestrator manages memory + await self._maybe_manage_memory(policy_message, env_message) + + env_message = await self.env_agent(policy_message) current_turn += 1 + if policy_message is not None: + return AgentMessage(sender=self.name, content=policy_message.content, finish_reason='stop') return AgentMessage(sender=self.name, content="Finished", finish_reason='stop') + async def _maybe_manage_memory(self, policy_message: AgentMessage, env_message: AgentMessage) -> None: + """Orchestrate compact and consolidate. + + Orchestrator calls policy's aggregator to get formatted_messages, + checks should_compact, and if triggered: + 1. Runs consolidate_agent (optional) + 2. Runs compact_agent to produce summary + 3. Injects summary + boundary into env_message + ContextBuilder reads these on the next turn. + """ + if not self.compact_agent: + return + + from lagent.agents.compact_agent import estimate_token_count + + state = self.get_messages() + formatted_messages, tools = state['policy_agent.messages'], state['policy_agent.tools'] + compact_input = AgentMessage( + sender=self.name, + content=formatted_messages, + extra_info={'context_tokens': estimate_token_count(formatted_messages, tools)}, + ) + if not (hasattr(self.compact_agent, 'should_compact') and self.compact_agent.should_compact(compact_input)): + return + + # 1. Consolidate first (preserve info before compacting) + if self.consolidate_agent: + try: + await self.consolidate_agent(compact_input) + self.consolidate_agent.reset(recursive=True) + logger.info("Consolidation completed") + except Exception: + logger.exception("Consolidation failed, continuing with compact") + # 2. Compact — inject summary + boundary into env_message + try: + summary_msg = await self.compact_agent(compact_input) + self.compact_agent.reset(recursive=True) + if summary_msg and summary_msg.content: + if env_message.env_info is None: + env_message.env_info = {} + env_message.env_info['conversation_summary'] = summary_msg.content + env_message.env_info['compact_boundary'] = len(self.policy_agent.memory.memory) + logger.info("Compact summary injected (%d chars)", len(summary_msg.content)) + except Exception: + logger.exception("Compact failed") + + +class MemoryProvider(Protocol): + async def get_info(self) -> dict: + """Return long-term memory info for EnvAgent's env_info. The content and format are flexible, but should be concise.""" + ... + class EnvAgent(AsyncAgent): def __init__( self, - actions: list, + actions, + skills: Optional[SkillsLoader] = None, + long_term_memory: Optional[MemoryProvider] = None, stateful_tools: List[str] = None, max_tool_response_length: int = None, tool_response_truncate_side: Literal['left', 'right', 'middle'] = 'middle', @@ -110,7 +188,14 @@ def __init__( name: Optional[str] = None, ): super().__init__(name=name) - self.actions = AsyncActionExecutor(actions, hooks=action_hooks) + if isinstance(actions, AsyncActionExecutor): + for action_hook in action_hooks or []: + actions.register_hook(create_object(action_hook)) + self.actions = actions + else: + self.actions = AsyncActionExecutor(actions, hooks=action_hooks) + self.skills = create_object(skills) + self.long_term_memory = create_object(long_term_memory) self.stateful_tools = stateful_tools or [] self.max_tool_response_length = max_tool_response_length self.tool_response_truncate_side = tool_response_truncate_side @@ -124,33 +209,49 @@ def __init__( retry_error_callback=lambda retry_state: retry_state.outcome.result(), ) - async def forward(self, selection_message: AgentMessage, session_id: str | int, **kwargs): - if not selection_message.tool_calls: - return AgentMessage(sender=self.name, content='No tool call') + async def get_env_info(self) -> Dict[str, Any]: + env_info = {'skills': '', 'active_skills': '', 'memory': '', 'tools': [], 'runtime': {}} + if self.skills is not None: + env_info['skills'] = await self.skills.build_skills_summary() + always_skills = await self.skills.get_always_skills() + if always_skills: + env_info['active_skills'] = await self.skills.load_skills_for_context(always_skills) + if self.long_term_memory is not None: + env_info['memory'] = await self.long_term_memory.get_info() + if self.actions: + env_info['tools'] = get_tool_prompt(list(self.actions.actions.values()), to_string=False) + for name in ['system', 'machine', 'python_version']: + env_info['runtime'][name] = getattr(platform, name)() + return env_info + + async def forward(self, message: AgentMessage, **kwargs): + if not message.tool_calls: + return AgentMessage(sender=self.name, content=message.content, env_info=await self.get_env_info()) tool_responses = await asyncio.gather( - *[ - self._retry_mechanism(self.execute_tool)(tool_call, session_id) - for tool_call in selection_message.tool_calls - ] + *[self._retry_mechanism(self.execute_tool)(tool_call) for tool_call in message.tool_calls] ) - for tool_call_id, tool_response in zip(selection_message.tool_calls_ids, tool_responses): + for tool_call_id, tool_response in zip(message.tool_calls_ids, tool_responses): tool_response.tool_call_id = tool_call_id res = tool_response.format_result() if self.max_tool_response_length is not None and len(res) > self.max_tool_response_length: res = truncate_text(res, max_num=self.max_tool_response_length, side=self.tool_response_truncate_side) tool_response.result = [{'type': 'text', 'content': res}] - return AgentMessage(sender=self.name, content=[asdict(resp) for resp in tool_responses]) + return AgentMessage( + sender=self.name, content=[asdict(resp) for resp in tool_responses], env_info=await self.get_env_info() + ) - async def execute_tool(self, tool_call: dict, session_id: str | int) -> ActionReturn: + async def execute_tool(self, tool_call: dict) -> ActionReturn: + tool_call = deepcopy(tool_call) try: + if 'function' in tool_call: + tool_call = tool_call['function'] if tool_call['name'].split('.', 1)[0] not in self.actions: return ActionReturn(valid=ActionValidCode.INVALID, errmsg=f'Tool {tool_call["name"]} Not Found') if isinstance(tool_call['arguments'], str): tool_call['arguments'] = json.loads(tool_call['arguments']) if tool_call['name'] in self.stateful_tools: - tool_call = deepcopy(tool_call) - tool_call['arguments']['session_id'] = session_id + tool_call['arguments']['session_id'] = str(id(self)) except Exception as e: return ActionReturn(valid=ActionValidCode.INVALID, errmsg=f'Invalid tool call format: {str(e)}') tool_response: ActionReturn = ( @@ -158,7 +259,6 @@ async def execute_tool(self, tool_call: dict, session_id: str | int) -> ActionRe AgentMessage( sender='assistant', content=dict(name=tool_call['name'], parameters=tool_call['arguments']) ), - session_id=session_id, ) ).content return tool_response