Enhance A2A protocol support and fix various bugs#2822
Conversation
…2776) * 🐛 Bugfix: skill deletion failed occasionally * ♻️ Simplify how run_skill_script() receives additional parameters * 🧪 Add test files * 🧪 Fix test files * ♻️ Update system prompt to better support models with lower parameters * ♻️ Update system prompt to better support models with lower parameters * ♻️ Remove unnecessary rely on re to reduce the risk of DoS * ♻️ Remove unnecessary rely on re to reduce the risk of DoS
…ent discovery (#2773) * Feat: Add A2A protocol support for agent publishing and external agent discovery - A2A Server: Enable publishing platform agents as A2A agents with configurable endpoints, including database schema for agent registration and management - A2A Client: Support discovering and invoking external A2A agents through agent discovery modal, HTTP client, and agent adapter integration - Database: Add `a2a_agent` and `a2a_agent_endpoint` tables with foreign key relationships to existing agent tables - SDK: Implement `A2AAgentProxy` for SDK-level A2A agent invocation support - Frontend: Add A2A Server Settings Panel, Agent Discovery Modal, and corresponding hooks (`useA2AServerAgents`, `useExternalAgents`) - Backend API: Add `a2a_client_app.py` and `a2a_server_app.py` endpoints with service layer implementations - i18n: Update English and Chinese localization files with A2A-related translations - System Prompts: Update manager system prompt templates to support A2A protocol context * Potential fix for pull request finding 'CodeQL / Information exposure through an exception' Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> * Potential fix for pull request finding 'CodeQL / Information exposure through an exception' Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> * Bugfix: temporarily not support grpc for external 2a2 agent * Bugfix: 解决在 AgentConfig 类中(第36-52行)引用了 ExternalA2AAgentConfig,但后者在第100行才定义。这构成了循环引用/前向引用 * delete debug logger * delete unsafe log * Bugfix: 使用 A2A 1.0 封套格式检测流式终止状态 * Bugfix: 删除对a2a 0.3版本对kind字段的兼容 * Bugfix: 修复了只读模式下关联代理信息完全隐藏的问题 * Bugfix: 当 supported_interfaces 同时包含两种协议接口且 jsonrpcInterface 有自定义 url 时,之前会错误地使用 REST 的 url,现在会正确使用 JSON-RPC 的 url * Bugfix: 当触发 aiohttp.ClientError 或通用 Exception 时,代码会抛出 NameError: name 'agent_id' is not defined,根本走不到 return 语句 * Bugfix: 确保了 HTTP 连接的正确释放,避免在高负载场景下出现连接泄漏问题 * Bugfix: 修复数据库允许 task_id 为 NULL,但 ORM 层强制要求非空的问题 * update sql, make sure sql is consistent with orm modal * delete unused code * 把详情日志降级为 debug 级别,只在 INFO 保留计数 * Refactor this function to reduce its Cognitive Complexity from 30 to the 15 allowed. * Use "Annotated" type hints for FastAPI dependency injection * fix code analysis * Replace the unused local variable "user_id" with "_" * Bugfix: 修复代码圈复杂度&定义常量避免重复 * 修复圈复杂度 * 修复sonarlcoud问题 * 修复报错 * 修复循环依赖 * 修复单元测试问题 * 修复单元测试 * 新增northbound单元测试 * 修复代码报错 * 优化前端展示数据样式 * 修改报错 * 新增单元测试 * 新增测试用例 * 新增单元测试 * 新增单元测试 * 新增单元测试 * 新增测试用例 * 删除无法识别的UT * 新增单元测试 * 现在 MinioClient 在导入时不会立即连接 MinIO,只在实际调用方法时才会初始化连接。 * 修改 MinioClient,在懒加载时同时设置 storage_config 属性 --------- Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR expands A2A protocol support end-to-end (backend A2A client/server, SDK external agent wrappers, and frontend management UI), while also addressing several bugs and prompt-format updates.
Changes:
- Add A2A server endpoints (northbound), A2A client management APIs, and DB models for external agents/relations/tasks.
- Extend SDK agent configuration to support external A2A sub-agents and update code-tag parsing/rendering.
- Update frontend to manage A2A discovery, publishing-as-A2A, and display A2A server settings; plus minor bug fixes/tests.
Reviewed changes
Copilot reviewed 72 out of 85 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
| test/backend/database/test_skill_db.py | Adds coverage for deleting already-deleted skills. |
| test/backend/database/test_client.py | Updates MinioClient tests for lazy init; resets singleton flags. |
| test/backend/consts/test_a2a_models.py | Adds unit tests for new A2A Pydantic models. |
| test/backend/app/test_agent_app.py | Updates publish-version test to include publish_as_a2a. |
| test/backend/agents/test_create_agent_info.py | Adds tests for external A2A agent config extraction/building. |
| sdk/nexent/skills/skill_manager.py | Changes script params to raw CLI string; uses shlex.split. |
| sdk/nexent/memory/init.py | Exposes memory service APIs via package init. |
| sdk/nexent/core/tools/run_skill_script_tool.py | Aligns tool signature with raw CLI params string. |
| sdk/nexent/core/agents/nexent_agent.py | Wraps external A2A agents as managed agents via proxy wrapper. |
| sdk/nexent/core/agents/core_agent.py | Switches execution/display code parsing to <code> / <DISPLAY:...>. |
| sdk/nexent/core/agents/agent_model.py | Adds external agent config model + protocol constants; extends AgentConfig. |
| frontend/types/agentConfig.ts | Adds is_a2a_server flag to agent type. |
| frontend/stores/agentConfigStore.ts | Stores external sub-agent IDs and exposes updater. |
| frontend/services/modelService.ts | Adds model_factory support to model create/update payloads. |
| frontend/services/api.ts | Adds API endpoint definitions for A2A client/server management. |
| frontend/services/agentVersionService.ts | Extends publish response to include A2A agent/card info. |
| frontend/services/agentConfigService.ts | Maps backend is_a2a_server to frontend model. |
| frontend/public/locales/zh/common.json | Adds zh translations for A2A discovery/settings/publish and related errors. |
| frontend/public/locales/en/common.json | Adds en translations for A2A discovery/settings/publish and related errors. |
| frontend/hooks/agent/useExternalAgents.ts | Adds hook to query available external A2A agents. |
| frontend/hooks/agent/useA2AServerAgents.ts | Adds hooks/util to query A2A server agents & check agent status. |
| frontend/components/ui/markdownRenderer.tsx | Converts custom code tags to markdown fences before rendering. |
| frontend/app/[locale]/tenant-resources/components/resources/ModelList.tsx | Switches connectivity check to verifyCustomModel. |
| frontend/app/[locale]/models/components/model/ModelEditDialog.tsx | Removes tenantId-specific connectivity branch; uses unified verification. |
| frontend/app/[locale]/models/components/model/ModelAddDialog.tsx | Removes tenantId-specific connectivity branch; uses unified verification. |
| frontend/app/[locale]/chat/streaming/taskWindow.tsx | Adds streaming-safe conversion/extraction for new custom code tags. |
| frontend/app/[locale]/chat/streaming/chatStreamFinalMessage.tsx | Converts custom code tags to markdown fences for final message. |
| frontend/app/[locale]/agents/versions/AgentVersionPubulishModal.tsx | Adds “Publish as A2A Agent” switch + post-publish A2A preview modal. |
| frontend/app/[locale]/agents/components/agentManage/AgentList.tsx | Adds A2A settings modal entry point for A2A server agents. |
| frontend/app/[locale]/agents/components/agentConfig/CollaborativeAgent.tsx | Adds selection/removal of external A2A sub-agents alongside internal ones. |
| frontend/app/[locale]/agents/components/a2a/A2AServerSettingsPanel.tsx | New UI panel showing A2A endpoint/card URLs & usage hints. |
| frontend/app/[locale]/agents/components/AgentConfigComp.tsx | Adds entry point to A2A discovery modal and external agents invalidation. |
| docker/docker-compose.prod.yml | Exposes northbound port 5013 for external A2A access. |
| docker/.env.example | Documents NORTHBOUND_EXTERNAL_URL for public agent-card URLs. |
| backend/utils/a2a_http_client.py | New aiohttp-based A2A HTTP client with retries + SSE streaming. |
| backend/services/vectordatabase_service.py | Uses timezone-aware UTC timestamps (timezone.utc). |
| backend/services/agent_version_service.py | Adds publish_as_a2a option and returns A2A card preview data. |
| backend/services/agent_service.py | Adds is_a2a_server enrichment in agent list. |
| backend/services/a2a_agent_adapter.py | New adapter mapping internal agent IO to A2A protocol structures/events. |
| backend/prompts/utils/prompt_generate_zh.yaml | Updates execution/display code format to <code> / <DISPLAY>. |
| backend/prompts/utils/prompt_generate_en.yaml | Updates execution/display code format to <code> / <DISPLAY>. |
| backend/prompts/skill_creation_simple_en.yaml | Adds a new prompt template for simple skill creation. |
| backend/prompts/manager_system_prompt_template_zh.yaml | Updates tool/script calling instructions + adds external agent guidance. |
| backend/prompts/manager_system_prompt_template_en.yaml | Updates code formatting + adds external agent guidance. |
| backend/prompts/managed_system_prompt_template_zh.yaml | Updates code formatting + external agent guidance; trims prompt content. |
| backend/prompts/managed_system_prompt_template_en.yaml | Updates code formatting + external agent guidance. |
| backend/database/skill_db.py | Prevents deleting already-deleted skills by filtering delete_flag != 'Y'. |
| backend/database/db_models.py | Adds DB models for A2A configs, external agents, relations, server agents, tasks/messages/artifacts. |
| backend/database/client.py | Makes MinioClient lazily initialize storage client on first use. |
| backend/consts/model.py | Adds model_factory to model requests; adds publish_as_a2a to publish request. |
| backend/consts/exceptions.py | Adds A2A-specific exceptions (TaskNotFound/UnsupportedOperation). |
| backend/consts/const.py | Adds NORTHBOUND_EXTERNAL_URL and bumps app version. |
| backend/consts/a2a_models.py | Introduces Pydantic models for A2A protocol & management APIs. |
| backend/apps/remote_mcp_app.py | Removes unused user_id binding in remote MCP tool list endpoint. |
| backend/apps/northbound_base_app.py | New northbound base app wiring A2A discovery + JSON-RPC/REST endpoints. |
| backend/apps/northbound_app.py | Exports _get_northbound_context for reuse by A2A routes. |
| backend/apps/config_app.py | Registers A2A client/server routers into the main FastAPI app. |
| backend/apps/agent_app.py | Passes publish_as_a2a into publish service. |
| backend/apps/a2a_server_app.py | Adds internal management endpoints for A2A server registrations/settings. |
| backend/apps/a2a_client_app.py | Adds endpoints for external agent discovery, management, relations, and Nacos configs. |
| backend/agents/create_agent_info.py | Adds external A2A sub-agent loading into agent config and templates. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ) -> aiohttp.ClientResponse: | ||
| """Execute HTTP request with automatic retry on transient failures. | ||
|
|
||
| Args: | ||
| method: HTTP method | ||
| url: Target URL | ||
| read_response: If True, return (status, body_text). If False, return response object. | ||
| **kwargs: Additional arguments for the request | ||
| """ | ||
| last_exception = None | ||
|
|
||
| for attempt in range(self.max_retries): | ||
| try: | ||
| async with await self._session.request(method, url, **kwargs) as response: | ||
| if response.status < 500 and not read_response: | ||
| return response | ||
| body = await response.read() |
There was a problem hiding this comment.
aiohttp.ClientSession.request(...) should not be awaited before async with (use async with self._session.request(...) as response:). Also, returning response from inside an async with will return a closed response object after context exit, and the function annotation claims aiohttp.ClientResponse but returns tuples in other branches—please refactor to return a consistent type (e.g., always (status, body_bytes)), or avoid async with when you intend to return the live response.
| ) -> aiohttp.ClientResponse: | |
| """Execute HTTP request with automatic retry on transient failures. | |
| Args: | |
| method: HTTP method | |
| url: Target URL | |
| read_response: If True, return (status, body_text). If False, return response object. | |
| **kwargs: Additional arguments for the request | |
| """ | |
| last_exception = None | |
| for attempt in range(self.max_retries): | |
| try: | |
| async with await self._session.request(method, url, **kwargs) as response: | |
| if response.status < 500 and not read_response: | |
| return response | |
| body = await response.read() | |
| ) -> aiohttp.ClientResponse | tuple[int, bytes]: | |
| """Execute HTTP request with automatic retry on transient failures. | |
| Args: | |
| method: HTTP method | |
| url: Target URL | |
| read_response: If True, return ``(status, body_bytes)``. | |
| If False, return an open ``aiohttp.ClientResponse`` and the caller | |
| is responsible for consuming and closing it. | |
| **kwargs: Additional arguments for the request | |
| """ | |
| last_exception = None | |
| for attempt in range(self.max_retries): | |
| try: | |
| if not read_response: | |
| response = await self._session.request(method, url, **kwargs) | |
| if response.status < 500: | |
| return response | |
| body = await response.read() | |
| await response.release() | |
| if attempt < self.max_retries - 1: | |
| wait_time = RETRY_BACKOFF_FACTOR * (2 ** attempt) | |
| logger.warning( | |
| f"HTTP {response.status} for {url}, " | |
| f"retrying in {wait_time}s (attempt {attempt + 1}/{self.max_retries})" | |
| ) | |
| await asyncio.sleep(wait_time) | |
| continue | |
| return (response.status, body) | |
| async with self._session.request(method, url, **kwargs) as response: | |
| body = await response.read() |
| for attempt in range(self.max_retries): | ||
| try: | ||
| async with await self._session.request(method, url, **kwargs) as response: | ||
| if response.status < 500 and not read_response: | ||
| return response | ||
| body = await response.read() | ||
| if response.status < 500: | ||
| return (response.status, body) |
There was a problem hiding this comment.
aiohttp.ClientSession.request(...) should not be awaited before async with (use async with self._session.request(...) as response:). Also, returning response from inside an async with will return a closed response object after context exit, and the function annotation claims aiohttp.ClientResponse but returns tuples in other branches—please refactor to return a consistent type (e.g., always (status, body_bytes)), or avoid async with when you intend to return the live response.
| async def post_stream( | ||
| self, | ||
| url: str, | ||
| payload: Dict[str, Any], | ||
| headers: Optional[Dict[str, str]] = None | ||
| ) -> AsyncIterator[Dict[str, Any]]: |
There was a problem hiding this comment.
post_stream does not use an async context manager for the response (or otherwise ensure response.release()/close), which can leak connections and exhaust the connector under load. Consider async with self._session.post(...) as response: and ensure the response is properly closed on normal completion and exceptions; also consider reusing the retry/timeout strategy used elsewhere for transient disconnects.
| response = await self._session.post( | ||
| url, | ||
| json=payload, | ||
| headers=headers | ||
| ) | ||
| response.raise_for_status() | ||
|
|
||
| async for line in response.content: | ||
| decoded = line.decode('utf-8').strip() | ||
| if decoded.startswith("data: "): | ||
| data_str = decoded[6:].strip() | ||
| if data_str: | ||
| import json | ||
| try: | ||
| yield json.loads(data_str) | ||
| except json.JSONDecodeError: | ||
| logger.warning(f"Failed to parse SSE data: {data_str}") |
There was a problem hiding this comment.
post_stream does not use an async context manager for the response (or otherwise ensure response.release()/close), which can leak connections and exhaust the connector under load. Consider async with self._session.post(...) as response: and ensure the response is properly closed on normal completion and exceptions; also consider reusing the retry/timeout strategy used elsewhere for transient disconnects.
| response = await self._session.post( | |
| url, | |
| json=payload, | |
| headers=headers | |
| ) | |
| response.raise_for_status() | |
| async for line in response.content: | |
| decoded = line.decode('utf-8').strip() | |
| if decoded.startswith("data: "): | |
| data_str = decoded[6:].strip() | |
| if data_str: | |
| import json | |
| try: | |
| yield json.loads(data_str) | |
| except json.JSONDecodeError: | |
| logger.warning(f"Failed to parse SSE data: {data_str}") | |
| async with self._session.post( | |
| url, | |
| json=payload, | |
| headers=headers | |
| ) as response: | |
| response.raise_for_status() | |
| async for line in response.content: | |
| decoded = line.decode('utf-8').strip() | |
| if decoded.startswith("data: "): | |
| data_str = decoded[6:].strip() | |
| if data_str: | |
| import json | |
| try: | |
| yield json.loads(data_str) | |
| except json.JSONDecodeError: | |
| logger.warning(f"Failed to parse SSE data: {data_str}") |
| if headers: | ||
| request_headers.update(headers) | ||
|
|
||
| logger.info(f"A2A POST request: url={url}, payload={payload}") |
There was a problem hiding this comment.
Logging full request payloads at INFO can leak sensitive user content and credentials (e.g., API keys in headers/metadata). Consider logging only high-level fields (method/url/request-id) or redacting sensitive keys, and downgrade verbose payload logging to DEBUG with explicit redaction.
| These endpoints are for internal use only (user authentication required). | ||
| They are NOT registered to any FastAPI app - only used by internal code. |
There was a problem hiding this comment.
This module docstring claims the router is not registered to any FastAPI app, but backend/apps/config_app.py includes a2a_server_router. Please update the docstring to reflect actual registration/usage to prevent operational confusion.
| These endpoints are for internal use only (user authentication required). | |
| They are NOT registered to any FastAPI app - only used by internal code. | |
| These endpoints are intended for internal, authenticated management use. | |
| This module defines the router used to expose these endpoints via FastAPI app registration. |
| default=[] | ||
| ) | ||
| external_a2a_agents: List["ExternalA2AAgentConfig"] = Field( | ||
| description="External A2A agents called via HTTP requests", | ||
| default=[] |
There was a problem hiding this comment.
Using default=[] for list fields is error-prone and can trigger shared-mutable-default issues (and is discouraged by Pydantic conventions). Prefer default_factory=list for both managed_agents and external_a2a_agents.
| default=[] | |
| ) | |
| external_a2a_agents: List["ExternalA2AAgentConfig"] = Field( | |
| description="External A2A agents called via HTTP requests", | |
| default=[] | |
| default_factory=list | |
| ) | |
| external_a2a_agents: List["ExternalA2AAgentConfig"] = Field( | |
| description="External A2A agents called via HTTP requests", | |
| default_factory=list |
| base_url = str(request.base_url).rstrip("/") | ||
|
|
||
| card = a2a_server_service.get_agent_card( | ||
| endpoint_id=endpoint_id, | ||
| base_url=base_url, | ||
| use_northbound=True | ||
| ) |
There was a problem hiding this comment.
Using request.base_url will generate incorrect public URLs when the service is behind a reverse proxy (host/port mismatch), which this PR explicitly addresses via NORTHBOUND_EXTERNAL_URL. Consider using NORTHBOUND_EXTERNAL_URL (or honoring X-Forwarded-Host / X-Forwarded-Proto via FastAPI proxy headers middleware) when building the Agent Card base URL.
| const convertToMarkdownCodeFences = (content: string): string => { | ||
| // Step 1: Handle complete <DISPLAY:language>...</DISPLAY> blocks | ||
| content = content.replace(/<DISPLAY:(\w+)>([\s\S]*?)<\/DISPLAY>/g, (_match, language, code) => { | ||
| return `\`\`\`${language}\n${code.trim()}\n\`\`\``; | ||
| }); | ||
|
|
||
| // Step 2: Handle complete <code>...</code> blocks | ||
| content = content.replace(/<code>([\s\S]*?)<\/code>/g, (_match, code) => { | ||
| return `\`\`\`python\n${code.trim()}\n\`\`\``; | ||
| }); |
There was a problem hiding this comment.
The custom-tag → markdown-fence conversion is duplicated across taskWindow.tsx, chatStreamFinalMessage.tsx, and markdownRenderer.tsx. Centralize this into a shared utility (e.g., frontend/lib/markdown/convertCustomCodeTags.ts) to prevent drift and inconsistencies in streaming vs. non-streaming rendering.
| class A2AHttpClient: | ||
| """HTTP client for A2A protocol communication.""" |
There was a problem hiding this comment.
This introduces a non-trivial retrying HTTP/SSE client (timeouts, backoff, streaming parsing), but no unit tests are added alongside it. Please add tests that cover: retry behavior on transient aiohttp errors, handling of 5xx vs <5xx responses, JSON decode errors, and SSE parsing (data: lines, empty lines, malformed JSON).
No description provided.