Skip to content

feat: Agent-to-Agent async communication (notify / task_delegate / consult)#361

Merged
wisdomqin merged 23 commits intodataelement:releasefrom
39499740:feat/async-a2a-msg-type
Apr 12, 2026
Merged

feat: Agent-to-Agent async communication (notify / task_delegate / consult)#361
wisdomqin merged 23 commits intodataelement:releasefrom
39499740:feat/async-a2a-msg-type

Conversation

@39499740
Copy link
Copy Markdown
Contributor

Overview

Closes #310

Implements three distinct agent-to-agent (A2A) communication modes for the send_message_to_agent tool, controlled by a tenant-level feature flag (a2a_async_enabled).


Problem

Previously, all A2A messages used synchronous consult mode — the calling agent blocks until the target agent replies. This made it impossible for agents to:

  • Send one-way notifications without waiting
  • Delegate tasks and receive results asynchronously via callbacks

Solution

Three Communication Modes

Mode Behavior Use Case
notify Fire-and-forget. Returns immediately. Target agent is woken asynchronously. Announcements, FYI messages
task_delegate Async with callback. Returns immediately. Target agent processes the task, then the calling agent is woken with the result. Delegating work to another agent
consult Synchronous. Blocks until target replies. (Existing behavior) Questions requiring immediate answers

Feature Flag (a2a_async_enabled)

  • Location: Tenant (company) level — tenants.a2a_async_enabled column
  • Default: false — all agents use synchronous consult mode (identical to pre-PR behavior)
  • UI: Toggle in Company Settings → Company Info tab with BETA badge
  • Enable flow: Confirmation dialog listing known issues before enabling
  • Disable flow: Instant, no confirmation needed — restores stable synchronous behavior

When the flag is false, notify and task_delegate are silently downgraded to consult, ensuring zero behavior change for existing deployments.


Key Changes

Backend

File Change
app/services/agent_tools.py _send_message_to_agent now branches on msg_type. Added _append_focus_item, _create_on_message_trigger, _wake_agent_async helpers.
app/services/trigger_daemon.py DEDUP skip for A2A messages, chain depth protection (max 3), regex post-processing for user notifications, _cleanup_stale_invoke_cache, SQL wildcard sanitization
app/api/websocket.py Added max_tool_rounds_override param to call_llm — limits a2a_wake sessions to 2 tool rounds (saves tokens)
app/api/tenants.py TenantOut and TenantUpdate schemas support a2a_async_enabled
app/models/tenant.py New a2a_async_enabled column (Boolean, default False)
app/services/tool_seeder.py Updated send_message_to_agent tool definition with msg_type parameter

Frontend

File Change
pages/EnterpriseSettings.tsx Toggle switch in Company Info tab with BETA badge and confirmation dialog
i18n/en.json English translations for toggle UI and confirmation dialog
i18n/zh.json Chinese translations

Infrastructure

File Change
restart.sh Added alembic upgrade head before backend start — schema migrations now run automatically on restart
alembic/versions/add_a2a_async_enabled.py Migration: adds tenants.a2a_async_enabled column

Security Measures

  1. Authorization: a2a_async_enabled update restricted to org_admin / platform_admin with tenant ownership check
  2. Tenant isolation: A2A messages restricted to same-tenant agents
  3. Relationship enforcement: Bidirectional agent relationship required before any A2A message
  4. SQL injection prevention: Wildcard sanitization (%, _) on all ilike clauses
  5. Chain depth protection: Max 3 recursive A2A wake chains, with TTL-based decay
  6. Dedup window: Prevents wake storms within configurable time window
  7. Tool rounds capping: a2a_wake sessions limited to 2 tool rounds (cannot be bypassed — override only reduces, never increases)
  8. Regex scope: Internal term stripping only applies to a2a_wait_* triggers, not user/heartbeat/cron triggers
  9. Safe defaults: Feature flag defaults to false; all fallback paths restore synchronous behavior

Tests

13 test cases in tests/test_a2a_msg_type.py:

Test What it verifies
test_notify_returns_immediately notify returns without blocking
test_notify_wakes_target_agent target agent is woken via _wake_agent_async
test_task_delegate_returns_immediately task_delegate returns without blocking
test_task_delegate_creates_focus_item focus item created for target agent
test_task_delegate_creates_callback_trigger on_message trigger created for callback
test_task_delegate_wakes_target_agent target agent is woken
test_consult_returns_target_reply consult blocks and returns target reply
test_default_msg_type_is_notify missing msg_type defaults to notify
test_invalid_msg_type_falls_back_to_consult invalid msg_type falls back to consult
test_openclaw_target_still_queues OpenClaw agents still use queue path
test_feature_flag_off_falls_back_to_consult tenant flag off → notify/task_delegate → consult
test_feature_flag_on_uses_notify tenant flag on → notify works normally
test_unknown_agent_returns_error unknown agent returns descriptive error

All 13 pass.


Deployment Safety

  1. Zero-downtime: Feature flag defaults to false — deploying this PR does not change any agent behavior
  2. Automatic migration: restart.sh now runs alembic upgrade head to add the new column
  3. Instant rollback: Disable the toggle in Company Settings, or UPDATE tenants SET a2a_async_enabled = false
  4. Per-tenant: Each company can independently enable/disable the feature

Known Limitations (Beta)

  • Agent replies may occasionally contain internal technical terms (mitigated by regex post-processing)
  • Token consumption increases when async is enabled (mitigated by 2-round cap on wake sessions)
  • Chain depth tracking is per-process (not distributed) — relevant only for multi-worker deployments
  • No caching on tenant feature flag query (PK lookup <1ms, acceptable at current scale)

39499740 added 23 commits April 9, 2026 00:35
…/task_delegate)

Implement async A2A communication patterns as proposed in upstream dataelement#310.

- notify: fire-and-forget, saves message and wakes target asynchronously,
  returns immediately
- task_delegate: async with callback, creates focus item + on_message
  trigger on source agent, wakes target asynchronously, source agent is
  notified when target completes
- consult: synchronous request-response (unchanged from original behaviour)

Add helper functions:
- _resolve_a2a_target: shared agent lookup logic
- _ensure_a2a_session: shared session creation logic
- _create_on_message_trigger: programmatic trigger creation
- _append_focus_item: write focus items to agent workspace
- _wake_agent_async: wake target agent via trigger invocation path
- trigger_daemon.wake_agent_with_context: public API for waking agents

Update tool descriptions in agent_tools.py and tool_seeder.py to
document the new msg_type behaviours.

Refs: dataelement#310
11 tests covering:
- notify: returns immediately, wakes target async
- task_delegate: creates focus item + on_message trigger, wakes target async
- consult: calls LLM synchronously, returns reply
- default msg_type falls back to notify
- error cases: missing agent_name, no relationship
- helper functions: _append_focus_item, _create_on_message_trigger,
  _wake_agent_async
- OpenClaw targets still use gateway queue regardless of msg_type
Two issues found during integration testing:

1. Wake storm: wake_agent_with_context had no chain depth or dedup
   protection, causing A→B→A→B infinite loops. Now tracks per-pair
   chain depth (_A2A_WAKE_CHAIN) with max 3 hops, and respects the
   existing 30s DEDUP_WINDOW before waking an agent.

2. Notify silent failure: if log_activity or _wake_agent_async threw
   an exception, the entire tool call failed silently — the LLM never
   received a tool result and produced no user-visible reply. Both
   notify and task_delegate paths now wrap auxiliary operations in
   try/except so the return string always reaches the LLM.

- wake_agent_with_context accepts from_agent_id for chain tracking
- _wake_agent_async passes from_agent_id through
- All auxiliary calls (log_activity, focus, trigger, wake) are
  individually wrapped with error handling
a2a_wake is an internal A2A mechanism. Its Reflection Session output
should not be pushed to the user's active chat session or WebSocket.
Only user-facing triggers (on_message from users, webhooks, cron, etc.)
should produce visible notifications.
A2A on_message triggers created by task_delegate now have:
- max_fires=1: auto-disable after receiving one reply
- expires_at: 24h TTL as safety net

Without max_fires, on_message triggers monitoring another agent's
assistant messages would fire indefinitely in a loop:
A wakes → replies (assistant msg) → B's trigger fires → B replies
→ A's trigger fires → ... ad infinitum.

This was an existing system issue exposed by the async A2A feature.
The two storm triggers (wait_manager_meeting_followup and
wait_xiaozhi_meeting_sync_r2) were manually disabled in the DB.
The a2a_wake trigger reason was too vague ('Process it and act
accordingly'), causing agents to reply back via consult when they
received a notify message. This created cascading A2A calls.

Updated the reason to explicitly state:
- This is a notification, not a request for response
- Do NOT reply unless you have a genuine question
- Focus on updating focus/memory and taking direct actions
The DEDUP_WINDOW (30s) was preventing legitimate message deliveries
from waking the target agent. This caused task_delegate callbacks to
be silently dropped:

1. A delegates task to B via task_delegate
2. B completes and sends notify reply to A via send_message_to_agent
3. A was recently woken by the trigger daemon → DEDUP skips A's wake
4. A never receives B's reply → user sees no result

Fix: All _wake_agent_async calls from send_message_to_agent now pass
skip_dedup=True. Chain depth protection (max 3 hops) is still active
to prevent storms, but individual message deliveries are never dropped.

wake_agent_with_context gains a skip_dedup parameter that bypasses
the _last_invoke dedup check while keeping chain depth protection.
…tion

Problem: When users said 'summarize tasks assigned to Manager' without
specifying msg_type, the LLM defaulted to notify (fire-and-forget),
so the user never got the results back.

Changes:
- msg_type is now a required parameter — LLM must choose explicitly
  every time it calls send_message_to_agent
- Tool description now includes concrete examples for each type:
  notify = one-way announcement, consult = quick question,
  task_delegate = delegate work and get results back
- Explicit guidance: 'When the user asks another agent to perform a
  task, use task_delegate, NOT notify'
- Updated tool_seeder.py to match

This ensures the LLM will auto-select task_delegate when the user
asks another agent to do work, without the user needing to specify
the msg_type manually.
The key insight: the LLM should decide msg_type based on ONE question:
'Does the target need to DO WORK and return results?'

- If yes → task_delegate (most common for user requests)
- If just FYI → notify
- If quick factual question → consult
- When in doubt → prefer task_delegate (safer, guarantees result)

Added concrete verb examples (analyze, research, summarize, write,
compare, plan, review, find out, confirm) so the LLM can match
ambiguous user phrases like 'check with X', 'look into Y', 'get back
to me on Z' to task_delegate without explicit keywords.
Before: ⚡ 触发器触发 wait_manager_api_key_reply
After:  ⚡ 监听经理关于API Key的回复

The trigger name is an internal identifier, meaningless to users.
Now uses the trigger's reason field (human-readable description)
as the notification headline, with a 80-char truncation limit.
Before: ⚡ 经理 is expected to reply after completing a delegated task...
After:  ⚡ 等待经理完成任务并回复

The trigger reason is an internal instruction for the agent (in English),
not suitable for user-facing notifications. Added a notification_summary
field in trigger config (_notification_summary) that stores a concise,
user-friendly headline. The notification builder checks this field first
before falling back to the reason field.
The a2a_wait trigger's Reflection Session output was being pushed
directly to the user's chat, but the agent didn't know its output
was user-facing. So it wrote internal monologue (trigger management,
focus state, reasoning process) that confused the user.

Updated the trigger reason to explicitly state:
- 'Your reply will be shown to the user'
- 'Do NOT mention triggers, focus items, internal state, or reasoning'
- 'Just give the user the actionable outcome'

Before:
  ⚡ 等待经理完成任务并回复
  处理完毕。经理的汇总确认:触发器 a2await经理 已取消...
  Focus item wait经理task 已为 [x] 完成状态...

After (expected):
  ⚡ 等待经理完成任务并回复
  经理已完成任务汇总,结果如下:...
Prompt improvement + regex post-processing to ensure user-facing
notifications read naturally.

1. Prompt: Explicitly list banned terms (trigger name, focus item,
   a2a_wait, task_delegate, etc.) and tell agent to write as if
   talking to a colleague.

2. Post-processing: Regex filter strips any remaining internal
   identifiers from the notification text before pushing to user.
   Patterns cover: a2a_wait_*, wait_*_task, focus_item, trigger
   status lines, bullet points with internal terms, etc.

Before:
  ✅ a2await经理 触发器已取消
  ✅ wait经理task focus项已为完成状态
  📋 核查结论:无新增任务

After (expected):
  经理确认:无新增任务或变化,唯一待办仍是API Key问题。
Added patterns to catch remaining leaks:
- resolve_* identifiers (e.g. resolve_smithery_api_key)
- 已静默清理触发器 / 已静默处理完毕
- 继续待命 / 待命
- Trailing punctuation cleanup

Expected before: 已静默清理触发器。唯一活跃待办仍为resolve_smithery_api_key。继续待命。
Expected after:  唯一活跃待办仍为API Key问题。
a2a_wake (notify) triggers don't need the full 50-round tool loop.
Limiting to 2 rounds saves significant tokens since the agent only
needs to: read the message → maybe update memory or take one action.

Token savings per notify:
- Before: up to 50 rounds of tool calls (each round = full context)
- After: max 2 rounds (read message + one action)

Also added max_tool_rounds_override parameter to call_llm() so
callers can cap the tool loop without modifying the agent's DB config.
New field on Agent model: a2a_async_enabled (boolean, default=False)

- When False (default): send_message_to_agent silently converts
  notify and task_delegate to consult. Behavior identical to before
  this PR — all A2A communication is synchronous.
- When True: full async A2A features (notify, task_delegate, consult)

This allows safe rollout:
1. Deploy the code — all agents work exactly as before (flag off)
2. Enable per-agent via API: PATCH /agents/{id} {"a2a_async_enabled": true}
3. If issues arise, flip the flag back to false — instant rollback

Changes:
- Agent model: new column a2a_async_enabled
- AgentUpdate schema: accepts a2a_async_enabled in PATCH
- AgentOut schema: returns a2a_async_enabled in GET
- _send_message_to_agent: checks flag before branching
- Alembic migration: add_a2a_async_enabled
- Tests: feature flag off/on scenarios
Reasons for company-level instead of agent-level:
1. A2A communication involves two agents — if Alice has it on but Bob
   doesn't, which mode should be used? Company-level = consistent behavior
2. Simpler admin UX: one toggle for the whole company
3. Matches existing pattern (min_heartbeat_interval_minutes is also
   tenant-level)

Changes:
- Removed a2a_async_enabled from Agent model and schemas
- Added a2a_async_enabled to Tenant model (default False)
- _send_message_to_agent now queries tenant.a2a_async_enabled
- Updated all 13 tests with tenant mock
- Migration: drop agents.a2a_async_enabled, add tenants.a2a_async_enabled
Added a toggle switch in EnterpriseSettings > Company Info tab:

- Title: 'Agent-to-Agent Async Communication' with BETA badge
- Description: explains the three modes and that disabling restores
  the previous synchronous behavior
- Toggle: on/off with confirmation dialog when enabling
- Confirmation dialog lists known issues:
  • Agent replies may contain internal terms
  • task_delegate callbacks may be delayed
  • Token consumption will increase
  • Agent loops may occur
  • Instructions to disable if issues arise

Backend changes:
- TenantOut schema: added a2a_async_enabled field
- TenantUpdate schema: added a2a_async_enabled field
- PUT /tenants/{id} now accepts a2a_async_enabled

Frontend changes:
- Toggle switch in Company Management section
- Reads from currentTenant.a2a_async_enabled
- Updates via PUT /tenants/{id}
- Confirmation dialog on enable (window.confirm)
Added enterprise.a2aAsync translations to both language files:

en.json:
- title: Agent-to-Agent Async Communication
- description: explains three modes and fallback behavior
- enableWarning: confirmation dialog with known issues

zh.json:
- title: 数字员工间异步通信
- description: 中文说明三种模式
- enableWarning: 中文确认弹框,包含已知问题列表

Fixed duplicate keys in zh.json enterprise.tabs section.
1. restart.sh: Added automatic 'alembic upgrade head' before backend
   start. New DB columns will now be applied on every restart, no more
   manual ALTER TABLE needed.

2. SQL wildcard injection fix (MEDIUM):
   - trigger_daemon.py: Sanitize from_agent_name and from_user_name
     before ilike interpolation (same pattern as agent_tools.py)

3. Deprecation fix (LOW):
   - asyncio.get_event_loop() → asyncio.get_running_loop()

4. Memory leak fix (LOW):
   - Added _cleanup_stale_invoke_cache() to periodically evict
     stale entries from _last_invoke dict (runs every ~60s)

5. Regex scope restriction (conflict prevention):
   - Internal term regex filter now ONLY applies to a2a_wait_*
     triggers, not to all trigger notifications. Prevents false
     positives on user on_message, heartbeat, cron, etc.

6. Conflicts analysis (all clear):
   - OpenClaw path: early return before msg_type branching ✅
   - max_tool_rounds_override: defaults None, only for a2a_wake ✅
   - msg_type required: code defaults to 'notify', flag forces 'consult' ✅
   - _notification_summary: safe .get() read, transparent to existing code ✅
   - Tenant DB query: PK lookup <1ms, acceptable at current scale ✅
Alembic requires explicit 'revision' and 'down_revision' variables
in each migration file. The previous version only had them in the
docstring, which alembic couldn't parse.

Also fixed: DB had stale alembic_version pointing to a deleted
migration (add_llm_concurrency_group). Updated to point to the
actual latest revision (d9cbd43b62e5).
@wisdomqin wisdomqin changed the base branch from main to release April 12, 2026 13:04
@wisdomqin wisdomqin merged commit e353abf into dataelement:release Apr 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

关于 send_message_to_agent 的一点建议

2 participants