diff --git a/ax_cli/commands/channel.py b/ax_cli/commands/channel.py index 102ee82..5c97bcf 100644 --- a/ax_cli/commands/channel.py +++ b/ax_cli/commands/channel.py @@ -456,15 +456,21 @@ def _sse_loop(bridge: ChannelBridge) -> None: for event_type, data in _iter_sse(response): if bridge.shutdown.is_set(): return - # Reconnect before JWT expires (15-min TTL, reconnect at 10 min) - if time.monotonic() - connect_time > _SSE_RECONNECT_INTERVAL: - bridge.log("SSE reconnecting to refresh JWT") - break + # Reconnect before JWT expires (15-min TTL, reconnect at + # 10 min), but never drop the event that woke an idle + # stream. Process the current event first, then reconnect. + reconnect_after_event = time.monotonic() - connect_time > _SSE_RECONNECT_INTERVAL if event_type in {"bootstrap", "heartbeat", "ping", "connected", "identity_bootstrap"}: bridge.log(f"skip {event_type}") + if reconnect_after_event: + bridge.log("SSE reconnecting to refresh JWT") + break continue if event_type not in {"message", "mention"} or not isinstance(data, dict): bridge.log(f"skip non-msg: {event_type}") + if reconnect_after_event: + bridge.log("SSE reconnecting to refresh JWT") + break continue message_id = data.get("id") or "" @@ -472,11 +478,17 @@ def _sse_loop(bridge: ChannelBridge) -> None: bridge.log(f"event {event_type} id={message_id[:12]} content={content_preview!r}") if not message_id or message_id in seen_ids: bridge.log(" -> skip: dup or no id") + if reconnect_after_event: + bridge.log("SSE reconnecting to refresh JWT") + break continue if _is_self_authored(data, bridge.agent_name, bridge.agent_id): _remember_reply_anchor(bridge._reply_anchor_ids, message_id) seen_ids.add(message_id) bridge.log(" -> skip self-authored, remembered as reply anchor") + if reconnect_after_event: + bridge.log("SSE reconnecting to refresh JWT") + break continue if not _should_respond( data, @@ -485,11 +497,17 @@ def _sse_loop(bridge: ChannelBridge) -> None: reply_anchor_ids=bridge._reply_anchor_ids, ): bridge.log(f" -> skip: not for @{bridge.agent_name}") + if reconnect_after_event: + bridge.log("SSE reconnecting to refresh JWT") + break continue bridge.log(" -> MATCH! delivering") prompt = _strip_mention(data.get("content", ""), bridge.agent_name) if not prompt: + if reconnect_after_event: + bridge.log("SSE reconnecting to refresh JWT") + break continue seen_ids.add(message_id) @@ -549,6 +567,9 @@ def _sse_loop(bridge: ChannelBridge) -> None: attachments=attachments, ) ) + if reconnect_after_event: + bridge.log("SSE reconnecting to refresh JWT") + break except (httpx.ConnectError, httpx.ReadTimeout, ConnectionError) as exc: bridge.log(f"SSE reconnect in {backoff}s after: {exc}") time.sleep(backoff) diff --git a/channel/README.md b/channel/README.md index 3bce53c..78cac1f 100644 --- a/channel/README.md +++ b/channel/README.md @@ -95,6 +95,10 @@ axctl profile verify your-agent Preferred for a fixed Claude Code session: launch the channel through the verified profile so no raw token is stored in `.mcp.json`: +Use the same working directory that `axctl profile verify ` expects. +Profile verification is intentionally cwd-bound; running the channel from a +different directory can fail closed before the listener starts. + ```json { "mcpServers": { @@ -102,7 +106,7 @@ verified profile so no raw token is stored in `.mcp.json`: "command": "bash", "args": [ "-lc", - "eval \"$(axctl profile env your-agent)\" && exec axctl channel --agent your_agent --space-id your_space_uuid" + "cd /path/to/profile-bound-workdir && eval \"$(axctl profile env your-agent)\" && exec axctl channel --agent your_agent --space-id your_space_uuid" ] } } diff --git a/specs/LISTENER-001/spec.md b/specs/LISTENER-001/spec.md index 42cdef5..3f581e8 100644 --- a/specs/LISTENER-001/spec.md +++ b/specs/LISTENER-001/spec.md @@ -30,6 +30,18 @@ explicit `@agent` mention. `ax channel` must make channel liveness visible in the same Activity Stream surface as other agent runtimes: +- When the channel bridge authenticates and connects to the space SSE stream, it + should publish or expose `channel_connected` for `(agent_id, space_id)`. +- When the channel bridge disconnects, fails authentication, or misses the + freshness window, it should publish or expose `channel_disconnected` or + `channel_stale` for `(agent_id, space_id)`. +- Listener presence is not message delivery. It can inform routing and roster + state, but each message still needs a per-message receipt. +- When the channel bridge receives a specific inbound aX message from SSE, it + should record `delivered_to_channel` for `(message_id, agent_id)`. +- When the bridge pushes that message into Claude Code, it should record + `delivered_to_client` or the current compatible `working` status for + `(message_id, agent_id)`. - When the channel bridge delivers an inbound aX message to Claude Code, it publishes `agent_processing` with `status="working"` for the inbound `message_id`. diff --git a/tests/test_channel.py b/tests/test_channel.py index f86add9..edf5a37 100644 --- a/tests/test_channel.py +++ b/tests/test_channel.py @@ -1,8 +1,10 @@ """Tests for the Claude Code channel bridge identity boundary.""" import asyncio +import json import os +from ax_cli.commands import channel as channel_mod from ax_cli.commands.channel import ChannelBridge, MentionEvent, _load_channel_env from ax_cli.commands.listen import _is_self_authored, _remember_reply_anchor, _should_respond @@ -48,6 +50,24 @@ async def write_message(self, payload): self.writes.append(payload) +class FakeSseResponse: + status_code = 200 + + def __init__(self, payload): + self.payload = payload + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def iter_lines(self): + yield "event: message" + yield f"data: {json.dumps(self.payload)}" + yield "" + + def test_channel_rejects_user_pat_for_agent_reply(): client = FakeClient("axp_u_UserKey.Secret") bridge = CaptureBridge(client) @@ -108,6 +128,48 @@ def test_channel_can_publish_working_status_on_delivery(): ] +def test_channel_processes_idle_event_before_jwt_reconnect(monkeypatch): + """The event that wakes an idle stream must not be dropped for reconnect.""" + + class FakeSseClient(FakeClient): + def __init__(self): + super().__init__() + self.connect_calls = 0 + + def connect_sse(self, *, space_id): + self.connect_calls += 1 + assert space_id == "space-123" + return FakeSseResponse( + { + "id": "incoming-123", + "content": "@anvil please check this", + "author": {"id": "user-123", "name": "madtank", "type": "user"}, + "mentions": ["anvil"], + } + ) + + def get_message(self, message_id): + assert message_id == "incoming-123" + return {"message": {"metadata": {}}} + + client = FakeSseClient() + bridge = CaptureBridge(client) + delivered: list[MentionEvent] = [] + + def capture_delivery(event): + delivered.append(event) + bridge.shutdown.set() + + bridge.enqueue_from_thread = capture_delivery + ticks = iter([0, channel_mod._SSE_RECONNECT_INTERVAL + 1]) + monkeypatch.setattr(channel_mod.time, "monotonic", lambda: next(ticks, channel_mod._SSE_RECONNECT_INTERVAL + 2)) + + channel_mod._sse_loop(bridge) + + assert [event.message_id for event in delivered] == ["incoming-123"] + assert delivered[0].prompt == "please check this" + + def test_channel_processing_status_can_be_disabled(): client = FakeClient("axp_a_AgentKey.Secret") bridge = CaptureBridge(client, processing_status=False)