Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions ax_cli/commands/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,27 +456,39 @@ def _sse_loop(bridge: ChannelBridge) -> None:
for event_type, data in _iter_sse(response):
if bridge.shutdown.is_set():
return
# Reconnect before JWT expires (15-min TTL, reconnect at 10 min)
if time.monotonic() - connect_time > _SSE_RECONNECT_INTERVAL:
bridge.log("SSE reconnecting to refresh JWT")
break
# Reconnect before JWT expires (15-min TTL, reconnect at
# 10 min), but never drop the event that woke an idle
# stream. Process the current event first, then reconnect.
reconnect_after_event = time.monotonic() - connect_time > _SSE_RECONNECT_INTERVAL
if event_type in {"bootstrap", "heartbeat", "ping", "connected", "identity_bootstrap"}:
bridge.log(f"skip {event_type}")
if reconnect_after_event:
bridge.log("SSE reconnecting to refresh JWT")
break
continue
if event_type not in {"message", "mention"} or not isinstance(data, dict):
bridge.log(f"skip non-msg: {event_type}")
if reconnect_after_event:
bridge.log("SSE reconnecting to refresh JWT")
break
continue

message_id = data.get("id") or ""
content_preview = (data.get("content") or "")[:60]
bridge.log(f"event {event_type} id={message_id[:12]} content={content_preview!r}")
if not message_id or message_id in seen_ids:
bridge.log(" -> skip: dup or no id")
if reconnect_after_event:
bridge.log("SSE reconnecting to refresh JWT")
break
continue
if _is_self_authored(data, bridge.agent_name, bridge.agent_id):
_remember_reply_anchor(bridge._reply_anchor_ids, message_id)
seen_ids.add(message_id)
bridge.log(" -> skip self-authored, remembered as reply anchor")
if reconnect_after_event:
bridge.log("SSE reconnecting to refresh JWT")
break
continue
if not _should_respond(
data,
Expand All @@ -485,11 +497,17 @@ def _sse_loop(bridge: ChannelBridge) -> None:
reply_anchor_ids=bridge._reply_anchor_ids,
):
bridge.log(f" -> skip: not for @{bridge.agent_name}")
if reconnect_after_event:
bridge.log("SSE reconnecting to refresh JWT")
break
continue
bridge.log(" -> MATCH! delivering")

prompt = _strip_mention(data.get("content", ""), bridge.agent_name)
if not prompt:
if reconnect_after_event:
bridge.log("SSE reconnecting to refresh JWT")
break
continue

seen_ids.add(message_id)
Expand Down Expand Up @@ -549,6 +567,9 @@ def _sse_loop(bridge: ChannelBridge) -> None:
attachments=attachments,
)
)
if reconnect_after_event:
bridge.log("SSE reconnecting to refresh JWT")
break
except (httpx.ConnectError, httpx.ReadTimeout, ConnectionError) as exc:
bridge.log(f"SSE reconnect in {backoff}s after: {exc}")
time.sleep(backoff)
Expand Down
6 changes: 5 additions & 1 deletion channel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,18 @@ axctl profile verify your-agent
Preferred for a fixed Claude Code session: launch the channel through the
verified profile so no raw token is stored in `.mcp.json`:

Use the same working directory that `axctl profile verify <profile>` expects.
Profile verification is intentionally cwd-bound; running the channel from a
different directory can fail closed before the listener starts.

```json
{
"mcpServers": {
"ax-channel": {
"command": "bash",
"args": [
"-lc",
"eval \"$(axctl profile env your-agent)\" && exec axctl channel --agent your_agent --space-id your_space_uuid"
"cd /path/to/profile-bound-workdir && eval \"$(axctl profile env your-agent)\" && exec axctl channel --agent your_agent --space-id your_space_uuid"
]
}
}
Expand Down
12 changes: 12 additions & 0 deletions specs/LISTENER-001/spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ explicit `@agent` mention.
`ax channel` must make channel liveness visible in the same Activity Stream
surface as other agent runtimes:

- When the channel bridge authenticates and connects to the space SSE stream, it
should publish or expose `channel_connected` for `(agent_id, space_id)`.
- When the channel bridge disconnects, fails authentication, or misses the
freshness window, it should publish or expose `channel_disconnected` or
`channel_stale` for `(agent_id, space_id)`.
- Listener presence is not message delivery. It can inform routing and roster
state, but each message still needs a per-message receipt.
- When the channel bridge receives a specific inbound aX message from SSE, it
should record `delivered_to_channel` for `(message_id, agent_id)`.
- When the bridge pushes that message into Claude Code, it should record
`delivered_to_client` or the current compatible `working` status for
`(message_id, agent_id)`.
- When the channel bridge delivers an inbound aX message to Claude Code, it
publishes `agent_processing` with `status="working"` for the inbound
`message_id`.
Expand Down
62 changes: 62 additions & 0 deletions tests/test_channel.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Tests for the Claude Code channel bridge identity boundary."""

import asyncio
import json
import os

from ax_cli.commands import channel as channel_mod
from ax_cli.commands.channel import ChannelBridge, MentionEvent, _load_channel_env
from ax_cli.commands.listen import _is_self_authored, _remember_reply_anchor, _should_respond

Expand Down Expand Up @@ -48,6 +50,24 @@ async def write_message(self, payload):
self.writes.append(payload)


class FakeSseResponse:
status_code = 200

def __init__(self, payload):
self.payload = payload

def __enter__(self):
return self

def __exit__(self, exc_type, exc, tb):
return False

def iter_lines(self):
yield "event: message"
yield f"data: {json.dumps(self.payload)}"
yield ""


def test_channel_rejects_user_pat_for_agent_reply():
client = FakeClient("axp_u_UserKey.Secret")
bridge = CaptureBridge(client)
Expand Down Expand Up @@ -108,6 +128,48 @@ def test_channel_can_publish_working_status_on_delivery():
]


def test_channel_processes_idle_event_before_jwt_reconnect(monkeypatch):
"""The event that wakes an idle stream must not be dropped for reconnect."""

class FakeSseClient(FakeClient):
def __init__(self):
super().__init__()
self.connect_calls = 0

def connect_sse(self, *, space_id):
self.connect_calls += 1
assert space_id == "space-123"
return FakeSseResponse(
{
"id": "incoming-123",
"content": "@anvil please check this",
"author": {"id": "user-123", "name": "madtank", "type": "user"},
"mentions": ["anvil"],
}
)

def get_message(self, message_id):
assert message_id == "incoming-123"
return {"message": {"metadata": {}}}

client = FakeSseClient()
bridge = CaptureBridge(client)
delivered: list[MentionEvent] = []

def capture_delivery(event):
delivered.append(event)
bridge.shutdown.set()

bridge.enqueue_from_thread = capture_delivery
ticks = iter([0, channel_mod._SSE_RECONNECT_INTERVAL + 1])
monkeypatch.setattr(channel_mod.time, "monotonic", lambda: next(ticks, channel_mod._SSE_RECONNECT_INTERVAL + 2))

channel_mod._sse_loop(bridge)

assert [event.message_id for event in delivered] == ["incoming-123"]
assert delivered[0].prompt == "please check this"


def test_channel_processing_status_can_be_disabled():
client = FakeClient("axp_a_AgentKey.Secret")
bridge = CaptureBridge(client, processing_status=False)
Expand Down
Loading