Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3f22c13
feat: differentiate send_message_to_agent by msg_type (notify/consult…
39499740 Apr 8, 2026
1a47120
test: add unit tests for async A2A msg_type differentiation
39499740 Apr 8, 2026
74b8285
chore: remove uv.lock generated by test dependency install
39499740 Apr 8, 2026
ba60f37
fix: prevent a2a_wake storm and notify silent failure
39499740 Apr 8, 2026
ca34f1d
fix: suppress a2a_wake results from user chat
39499740 Apr 8, 2026
09abd64
fix: set max_fires=1 on A2A on_message triggers to prevent loops
39499740 Apr 8, 2026
4512589
fix: improve a2a_wake prompt to prevent unwanted consult-back
39499740 Apr 9, 2026
c24af29
fix: skip DEDUP for send_message_to_agent wake calls
39499740 Apr 9, 2026
e71d745
feat: make msg_type required, improve tool description for auto-selec…
39499740 Apr 9, 2026
3662def
feat: improve msg_type decision guide for ambiguous user intent
39499740 Apr 9, 2026
616d6b7
fix: use trigger reason instead of internal name in user notification
39499740 Apr 9, 2026
a93b3ba
fix: user-friendly notification headline for task_delegate triggers
39499740 Apr 9, 2026
596bb92
fix: tell agent its a2a_wait reply is user-visible
39499740 Apr 9, 2026
647ec25
fix: dual protection against internal terms in user notifications
39499740 Apr 9, 2026
45347c7
fix: strengthen regex filter for internal terms in user notifications
39499740 Apr 9, 2026
7229219
perf: limit a2a_wake Reflection Sessions to 2 tool rounds
39499740 Apr 9, 2026
bf9b7f6
feat: add a2a_async_enabled feature flag (per-agent toggle)
39499740 Apr 9, 2026
d1a8a51
refactor: move a2a_async_enabled from Agent to Tenant (company-level)
39499740 Apr 9, 2026
952cb81
feat: add A2A async toggle to company settings page
39499740 Apr 9, 2026
44d9da2
feat: add i18n translations for A2A async toggle (en + zh)
39499740 Apr 9, 2026
7b8e6fa
fix: security hardening and conflict prevention
39499740 Apr 9, 2026
e4809cd
fix: add proper revision/down_revision to alembic migration
39499740 Apr 9, 2026
cc3e75e
Merge branch 'main' into feat/async-a2a-msg-type
39499740 Apr 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions backend/alembic/versions/add_a2a_async_enabled.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Add a2a_async_enabled column to tenants table.

Revision ID: f1a2b3c4d5e6
Revises: d9cbd43b62e5
Create Date: 2026-04-10 02:50:00.000000
"""
from alembic import op


revision = "f1a2b3c4d5e6"
down_revision = "d9cbd43b62e5"


def upgrade() -> None:
op.execute(
"ALTER TABLE agents DROP COLUMN IF EXISTS a2a_async_enabled"
)
op.execute(
"ALTER TABLE tenants ADD COLUMN IF NOT EXISTS a2a_async_enabled BOOLEAN DEFAULT FALSE"
)


def downgrade() -> None:
op.execute("ALTER TABLE tenants DROP COLUMN IF EXISTS a2a_async_enabled")
2 changes: 2 additions & 0 deletions backend/app/api/tenants.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class TenantOut(BaseModel):
is_active: bool
sso_enabled: bool = False
sso_domain: str | None = None
a2a_async_enabled: bool = False
created_at: datetime | None = None

model_config = {"from_attributes": True}
Expand All @@ -49,6 +50,7 @@ class TenantUpdate(BaseModel):
is_active: bool | None = None
sso_enabled: bool | None = None
sso_domain: str | None = None
a2a_async_enabled: bool | None = None


# ─── Helpers ────────────────────────────────────────────
Expand Down
6 changes: 6 additions & 0 deletions backend/app/api/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ async def call_llm(
on_tool_call=None,
on_thinking=None,
supports_vision=False,
max_tool_rounds_override: int | None = None,
) -> str:
"""Call LLM via unified client with function-calling tool loop.

Expand All @@ -142,13 +143,18 @@ async def call_llm(
_agent = _ar.scalar_one_or_none()
if _agent:
_max_tool_rounds = _agent.max_tool_rounds or 50
if max_tool_rounds_override and max_tool_rounds_override < _max_tool_rounds:
_max_tool_rounds = max_tool_rounds_override
if _agent.max_tokens_per_day and _agent.tokens_used_today >= _agent.max_tokens_per_day:
return f"⚠️ Daily token usage has reached the limit ({_agent.tokens_used_today:,}/{_agent.max_tokens_per_day:,}). Please try again tomorrow or ask admin to increase the limit."
if _agent.max_tokens_per_month and _agent.tokens_used_month >= _agent.max_tokens_per_month:
return f"⚠️ Monthly token usage has reached the limit ({_agent.tokens_used_month:,}/{_agent.max_tokens_per_month:,}). Please ask admin to increase the limit."
except Exception:
pass

if max_tool_rounds_override and max_tool_rounds_override < _max_tool_rounds:
_max_tool_rounds = max_tool_rounds_override

# Build rich prompt with soul, memory, skills, relationships
from app.services.agent_context import build_agent_context
# Look up current user's display name so the agent knows who it's talking to
Expand Down
4 changes: 4 additions & 0 deletions backend/app/models/tenant.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,7 @@ class Tenant(Base):
min_poll_interval_floor: Mapped[int] = mapped_column(Integer, default=5)
max_webhook_rate_ceiling: Mapped[int] = mapped_column(Integer, default=5)

# A2A async communication (notify / task_delegate)
# When False, all agent-to-agent messages use synchronous consult mode
a2a_async_enabled: Mapped[bool] = mapped_column(Boolean, default=False)

324 changes: 302 additions & 22 deletions backend/app/services/agent_tools.py

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions backend/app/services/tool_seeder.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@
{
"name": "send_message_to_agent",
"display_name": "Agent Message",
"description": "Send a message to a digital employee colleague and receive a reply. Suitable for questions, delegation, or collaboration.",
"description": "Send a message to a digital employee colleague. Decision guide: target needs to DO WORK and return results? → task_delegate. Just FYI? → notify. Quick factual question? → consult. When unsure, prefer task_delegate.",
"category": "communication",
"icon": "🤖",
"is_default": True,
Expand All @@ -281,9 +281,9 @@
"properties": {
"agent_name": {"type": "string", "description": "Target agent name"},
"message": {"type": "string", "description": "Message content"},
"msg_type": {"type": "string", "enum": ["notify", "consult", "task_delegate"], "description": "Message type: notify (notification), consult (ask a question), task_delegate (delegate a task)"},
"msg_type": {"type": "string", "enum": ["notify", "consult", "task_delegate"], "description": "(1) Target needs to DO WORK and return results? → task_delegate. (2) Just FYI? → notify. (3) Quick factual question? → consult. When unsure, prefer task_delegate."},
},
"required": ["agent_name", "message"],
"required": ["agent_name", "message", "msg_type"],
},
"config": {},
"config_schema": {},
Expand Down
139 changes: 132 additions & 7 deletions backend/app/services/trigger_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,19 @@
MAX_AGENT_CHAIN_DEPTH = 5 # A→B→A→B→A max depth before stopping
MIN_POLL_INTERVAL_MINUTES = 5 # minimum poll interval to prevent abuse

# Track last invocation time per agent to enforce dedup window
_last_invoke: dict[uuid.UUID, datetime] = {}

_A2A_WAKE_CHAIN: dict[str, int] = {}
_A2A_WAKE_CHAIN_TTL = 300
_A2A_MAX_WAKE_DEPTH = 3


def _cleanup_stale_invoke_cache():
now = datetime.now(timezone.utc)
stale = [k for k, v in _last_invoke.items() if (now - v).total_seconds() > DEDUP_WINDOW * 2]
for k in stale:
del _last_invoke[k]

# Webhook rate limiter: token -> list of timestamps
_webhook_hits: dict[str, list[float]] = {}
WEBHOOK_RATE_LIMIT = 5 # max hits per minute per token
Expand Down Expand Up @@ -265,8 +275,9 @@ async def _check_new_agent_messages(trigger: AgentTrigger) -> bool:
# --- Agent-to-agent message check (existing logic) ---
from app.models.participant import Participant
from app.models.agent import Agent as AgentModel
safe_agent_name = from_agent_name.replace("%", "").replace("_", r"\_")
agent_r = await db.execute(
select(AgentModel).where(AgentModel.name.ilike(f"%{from_agent_name}%"))
select(AgentModel).where(AgentModel.name.ilike(f"%{safe_agent_name}%"))
)
source_agent = agent_r.scalars().first()
if not source_agent:
Expand Down Expand Up @@ -313,13 +324,14 @@ async def _check_new_agent_messages(trigger: AgentTrigger) -> bool:
# Look up user by display name or username within tenant
from sqlalchemy import or_
from app.models.user import User, Identity
safe_user_name = from_user_name.replace("%", "").replace("_", r"\_")
query = (
select(User)
.join(User.identity)
.where(
or_(
User.display_name.ilike(f"%{from_user_name}%"),
Identity.username.ilike(f"%{from_user_name}%"),
User.display_name.ilike(f"%{safe_user_name}%"),
Identity.username.ilike(f"%{safe_user_name}%"),
)
)
)
Expand Down Expand Up @@ -500,6 +512,8 @@ async def on_tool_call(data):
except Exception as e:
logger.warning(f"Failed to persist tool call for trigger session: {e}")

_is_a2a_wake = all(t.name == "a2a_wake" for t in triggers)

reply = await call_llm(
model=model,
messages=messages,
Expand All @@ -509,6 +523,7 @@ async def on_tool_call(data):
user_id=agent.creator_id,
on_chunk=on_chunk,
on_tool_call=on_tool_call,
max_tool_rounds_override=2 if _is_a2a_wake else None,
)

# Save assistant reply to Reflection session
Expand All @@ -535,14 +550,57 @@ async def on_tool_call(data):

# Push trigger result to user's active WebSocket connections
final_reply = reply or "".join(collected_content)
if final_reply:

is_a2a_internal = all(t.name == "a2a_wake" for t in triggers)

if final_reply and not is_a2a_internal:
try:
from app.api.websocket import manager as ws_manager
agent_id_str = str(agent_id)

# Build notification message with trigger badge
trigger_badge = ", ".join(trigger_names)
notification = f"⚡ **触发器触发** `{trigger_badge}`\n\n{final_reply}"
trigger_reasons = []
for t in triggers:
ns = (t.config or {}).get("_notification_summary", "").strip()
if ns:
trigger_reasons.append(ns)
else:
r = (t.reason or "").strip()
if r and len(r) <= 80:
trigger_reasons.append(r)
elif r:
trigger_reasons.append(r[:77] + "...")
summary = trigger_reasons[0] if trigger_reasons else "有新的事件需要处理"

_is_a2a_wait = any(t.name.startswith("a2a_wait_") for t in triggers)
if _is_a2a_wait:
import re as _re
cleaned = final_reply
_internal_patterns = [
r'\b(a2a_wait_\w+|a2a_wake)\b',
r'\bwait_?\w+_?(task|reply|followup|meeting|sync|api_key)\w*\b',
r'\bresolve_\w+\b',
r'\bfocus[_ ]?item\b',
r'\btask_delegate\b',
r'\bfocus_ref\b',
r'✅\s*(a2a\w+|wait\w+|触发器\w*|focus\w*).*(?:已取消|已为|保持|活跃|完成状态)[^\n]*',
r'[\-•]\s*(?:触发器|trigger|focus|wait_\w+|a2a\w+).*[^\n]*',
r'(?:触发器|trigger)\s+\S+\s*(?:已取消|保持活跃|已为完成状态|fired)',
r'已静默清理触发器',
r'已静默处理完毕',
r'继续待命[。,]?\s*',
r',?\s*(?:继续)?待命。',
]
for _pat in _internal_patterns:
cleaned = _re.sub(_pat, '', cleaned, flags=_re.IGNORECASE)
cleaned = _re.sub(r'\n{3,}', '\n\n', cleaned).strip()
cleaned = _re.sub(r'[。,]\s*$', '', cleaned).strip()
if not cleaned:
cleaned = final_reply
else:
cleaned = final_reply

notification = f"⚡ {summary}\n\n{cleaned}"

# Save to user's active chat session(s) for persistence
async with async_session() as db:
Expand Down Expand Up @@ -693,6 +751,72 @@ async def _tick():
asyncio.create_task(_invoke_agent_for_triggers(agent_id, agent_triggers))


async def wake_agent_with_context(agent_id: uuid.UUID, message_context: str, *, from_agent_id: uuid.UUID | None = None, skip_dedup: bool = False) -> None:
"""Public API: wake an agent asynchronously with a message context.

Creates a synthetic trigger invocation so the agent processes the
message in a Reflection Session via the standard trigger path.
Safe to call from any async context.

Args:
agent_id: The agent to wake.
message_context: The message to deliver.
from_agent_id: The agent that initiated this wake (for chain depth tracking).
skip_dedup: If True, bypass the dedup window check. Use this for
genuine message deliveries (e.g. a task_delegate callback)
where skipping the wake would lose a real message.
"""
import time as _time

now = datetime.now(timezone.utc)

if from_agent_id:
chain_key = f"{from_agent_id}->{agent_id}"
current_depth = _A2A_WAKE_CHAIN.get(chain_key, 0)
if current_depth >= _A2A_MAX_WAKE_DEPTH:
logger.warning(
f"[A2A] Wake chain depth {current_depth} reached for {chain_key}, "
f"stopping to prevent wake storm"
)
return

_A2A_WAKE_CHAIN[chain_key] = current_depth + 1

def _decay_chain():
_A2A_WAKE_CHAIN.pop(chain_key, None)
asyncio.get_running_loop().call_later(_A2A_WAKE_CHAIN_TTL, _decay_chain)

if not skip_dedup and agent_id in _last_invoke:
elapsed = (now - _last_invoke[agent_id]).total_seconds()
if elapsed < DEDUP_WINDOW:
logger.info(
f"[A2A] Skipping wake for agent {agent_id} — "
f"invoked {elapsed:.0f}s ago (dedup window {DEDUP_WINDOW}s)"
)
return

_last_invoke[agent_id] = now

dummy_trigger = AgentTrigger(
id=uuid.uuid4(),
agent_id=agent_id,
name="a2a_wake",
type="on_message",
config={"from_agent_name": "", "_matched_message": message_context[:2000], "_matched_from": "agent"},
reason=(
"You received a notification from another agent. "
"Read the message content above, update your focus and memory if needed, "
"and take any action you deem necessary. "
"Do NOT reply back to the sender unless you have a genuine question — "
"this was a notification, not a request for response."
),
is_enabled=True,
last_fired_at=now,
fire_count=0,
)
asyncio.create_task(_invoke_agent_for_triggers(agent_id, [dummy_trigger]))


async def start_trigger_daemon():
"""Start the background trigger daemon loop. Called from FastAPI startup."""
logger.info("⚡ Trigger Daemon started (15s tick, heartbeat every ~60s)")
Expand All @@ -709,6 +833,7 @@ async def start_trigger_daemon():
_heartbeat_counter += 1
if _heartbeat_counter >= 4:
_heartbeat_counter = 0
_cleanup_stale_invoke_cache()
try:
from app.services.heartbeat import _heartbeat_tick
await _heartbeat_tick()
Expand Down
Loading