feat: Communicate Mode SDK (on_relay)#565
Conversation
willwashburn
left a comment
There was a problem hiding this comment.
Review: Communicate Mode SDK (on_relay)
Substantial and well-architected addition. The core Relay and transport layers are solid. However, there are two blocking issues plus several high-priority items to address before merge.
What's Good
- Clean layered architecture:
types → transport → core → adapters - Thorough
MockRelayServerin Python conftest - Good test coverage (96 Python + 16 TS tests)
- Lazy connection pattern — no network calls until first use
- WebSocket reconnect with exponential backoff in both languages
- Frozen
Messagedataclass is a solid immutability choice - Docs sync between .mdx and .md is generally correct
Medium & Low Priority ItemsMedium8. HTTP retry doesn't retry on 9. 10. Google ADK adapter appends raw string to 11. 12. Low / Suggestions13. 14. Tool definitions ( 15. Spec ( 16. Docs Sync Gap
|
- Fix claude_sdk adapter argument order: on_relay(options, relay, *, name) to match other adapters' (agent, relay) pattern - Fix return type annotation: callable -> Callable[[], None] in core.py - URL-encode WebSocket token in transport.py to prevent connection failures - Add retry for aiohttp.ClientError in HTTP transport (was only retrying 5xx) - Add Relay.peek() method and use it in OpenAI Agents/Agno instructions wrappers to avoid draining inbox and starving the relay_inbox tool - Remove transport re-export from TS communicate/index.ts (internal detail) - Improve error message for TS onRelay() auto-detection failures - Add SIGTERM/SIGINT handlers for autoCleanup in TS core.ts - Use thread-based fallback in _run_sync for async context compatibility - Fix @agent-relay/config version: 3.1.23 -> 3.2.1 in SDK package.json - Update spec: withRelay references -> onRelay - Add per-framework guide links to docs/markdown/communicate.md (docs sync) - Update all tests for new signatures Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Fix claude_sdk adapter argument order: on_relay(options, relay, *, name) to match other adapters' (agent, relay) pattern - Fix return type annotation: callable -> Callable[[], None] in core.py - URL-encode WebSocket token in transport.py to prevent connection failures - Add retry for aiohttp.ClientError in HTTP transport (was only retrying 5xx) - Add Relay.peek() method and use it in OpenAI Agents/Agno instructions wrappers to avoid draining inbox and starving the relay_inbox tool - Remove transport re-export from TS communicate/index.ts (internal detail) - Improve error message for TS onRelay() auto-detection failures - Add SIGTERM/SIGINT handlers for autoCleanup in TS core.ts - Use thread-based fallback in _run_sync for async context compatibility - Fix @agent-relay/config version: 3.1.23 -> 3.2.1 in SDK package.json - Update spec: withRelay references -> onRelay - Add per-framework guide links to docs/markdown/communicate.md (docs sync) - Update all tests for new signatures Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Fix claude_sdk adapter argument order: on_relay(options, relay, *, name) to match other adapters' (agent, relay) pattern - Fix return type annotation: callable -> Callable[[], None] in core.py - URL-encode WebSocket token in transport.py to prevent connection failures - Add retry for aiohttp.ClientError in HTTP transport (was only retrying 5xx) - Add Relay.peek() method and use it in OpenAI Agents/Agno instructions wrappers to avoid draining inbox and starving the relay_inbox tool - Remove transport re-export from TS communicate/index.ts (internal detail) - Improve error message for TS onRelay() auto-detection failures - Add SIGTERM/SIGINT handlers for autoCleanup in TS core.ts - Use thread-based fallback in _run_sync for async context compatibility - Fix @agent-relay/config version: 3.1.23 -> 3.2.1 in SDK package.json - Update spec: withRelay references -> onRelay - Add per-framework guide links to docs/markdown/communicate.md (docs sync) - Update all tests for new signatures Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Devin Review ResponseReviewed all findings. Status: Already fixed in prior commits:
Acknowledged (low priority, will address in follow-up):
New in this push: A2A protocol transport layer (Python 89 tests ✅ + TypeScript) |
barryonthecape
left a comment
There was a problem hiding this comment.
Thanks for the huge push here — there is a lot of good work, but I'm still seeing a few blocking issues that need to be fixed before merge:
-
TS A2A parser bug (runtime crash):
packages/sdk/src/communicate/a2a-types.tsa2aMessageFromDict()currently uses:
const parts = (d.parts as Record<string, unknown>[] | undefined ?? []).map(a2aPartFromDict);- Because of the expression form, this is not safely defaulting in the intended way and can leave
partsundefined at runtime before.map(...), causing a crash. - Please change to explicit grouping:
const parts = ((d.parts as Record<string, unknown>[] | undefined) ?? []).map(a2aPartFromDict);
-
TS connect retry dead-end:
packages/sdk/src/communicate/core.tsensureConnected()cachesconnectPromise, but iftransport.connect()rejects once, the rejected promise is retained and every future call rethrows forever.- Need to clear/reset
connectPromiseon failure so a subsequent call can retry.
-
Python connect future can deadlock:
packages/sdk-py/src/agent_relay/communicate/core.py_ensure_connected()sets_connect_futurebefore connect attempt. Iftransport.connect()fails and fallbackregister_agent()also fails,_connect_futureis never resolved/rejected.- Later calls await that stale pending future forever.
- Please make sure
_connect_futureis always completed (set_exception) and reset on failure paths.
-
Version skew in this PR:
packages/sdk-py/pyproject.toml- Still
version = "3.1.23"while this PR bumps the JS workspace packages to3.2.1. - Please align Python package version with the release version strategy used in this PR.
- Still
Also: PR currently shows merge conflicts with main, so please rebase/merge main before final review pass.
…DoS regex
1. Remove onRelay() auto-detect that threw on empty config {}; only
export onPiRelay/onClaudeRelay (Will's suggestion).
2. core.ts handleTransportMessage already buffers AND dispatches — verified.
3. transport.ts sendHttp already has retry with exponential backoff — verified.
4. Replace polynomial regex /\/+$/ with iterative trimTrailingSlashes()
to eliminate ReDoS risk.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| @@ -0,0 +1,5 @@ | |||
| """Framework-specific adapters for on_relay().""" | |||
|
|
|||
| from .pi import on_relay as on_pi_relay | |||
There was a problem hiding this comment.
🟡 Python adapters __init__.py imports Pi adapter at module load, breaking without subprocess dependency
The adapters __init__.py (packages/sdk-py/src/agent_relay/communicate/adapters/__init__.py:3) unconditionally imports from .pi import on_relay as on_pi_relay. The Pi adapter (pi.py) imports subprocess and threading (which are stdlib so that's fine), but it also defines PiRpcSession that expects a real subprocess.Popen process. More importantly, this top-level import means any from agent_relay.communicate.adapters import ... will execute the Pi adapter module. This is inconsistent with all other adapters which use lazy imports — the other adapters (google_adk, openai_agents, etc.) are NOT imported in __init__.py and are only loaded when specifically requested. If the Pi adapter is the only one exported, it should match the pattern of being lazily imported, or all adapters should be exported consistently.
Was this helpful? React with 👍 or 👎 to provide feedback.
Implement the Connect SDK spec — a new "Communicate Mode" that lets any framework agent join Relaycast with a single on_relay() call. Python SDK: - Relay core (lazy WebSocket, send/post/reply/inbox/agents/close) - RelayTransport (HTTP + WS, auto-reconnect, exponential backoff) - 6 framework adapters: OpenAI Agents, Claude SDK, Google ADK, Agno, Swarms, CrewAI - Tier 1 (Push) adapters inject messages mid-execution via hooks/callbacks - Tier 2 (Poll) adapters surface messages at tool-call boundaries - 96 tests (unit + integration + cross-framework) TypeScript SDK: - Relay core, transport, types mirroring Python API - 2 framework adapters: Pi (session.steer), Claude SDK (PostToolUse hook) - 16 tests (unit + integration + cross-framework) Docs: - communicate.mdx overview + 7 per-framework guides - Plain markdown mirrors (docs-sync rule) - Updated introduction.mdx with Communicate Mode section Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Resolves npm ci failure due to missing @sinclair/typebox@0.34.48 in lock file. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Endpoints changed:
- POST /v1/agents/register → POST /v1/agents (returns {ok, data: {id, token}})
- DELETE /v1/agents/{id} → POST /v1/agents/disconnect (agent token auth)
- POST /v1/messages/dm → POST /v1/dm (agent token auth)
- POST /v1/messages/channel → POST /v1/channels/{name}/messages (agent token auth)
- POST /v1/messages/reply → POST /v1/messages/{id}/replies (agent token auth)
- GET /v1/inbox/{agent_id} → GET /v1/inbox (agent token auth, returns unread_dms/mentions)
- GET /v1/ws/{agent_id}?token=... → GET /v1/ws?token=...
Key fixes:
- Added _send_http_as_agent() for per-agent token auth (vs workspace key)
- Inbox now fetches DM conversation messages from /v1/dm/{conv_id}/messages
- WS dispatch handles Relaycast event types (message.created, dm.received, etc.)
- Error message extraction handles {ok, error: {message}} envelope
- Updated mock server + unit tests + e2e tests
All 64 unit tests + 2 real e2e tests against live Relaycast pass.
…ypebox dep for Pi adapter
…CrewAI + review fixes - openai-agents.ts: tool injection via Agent tools array, message routing via instructions - langgraph.ts: relay tools as LangGraph tool nodes, message routing via graph.invoke - google-adk.ts: FunctionTool-compatible relay tools, message routing via runner.runAsync - crewai.ts: CrewAI tool interface, step_callback routing, onCrewRelay for full crews - Review fixes: Pi cleanup exposed, CrewAI dead code removed, onCrewRelay exported, LangGraph naming - All 39 adapter tests pass
… Relaycast TS e2e (5): Pi, OpenAI Agents, LangGraph, Google ADK, CrewAI Python e2e (8): Pi, OpenAI Agents, CrewAI, LangGraph, Google ADK, Swarms, Agno, Claude SDK All test real agent registration, relay tool execution (send/inbox/post/agents), message routing, and cleanup against live api.relaycast.dev
Adds Agent2Agent (A2A) protocol support as an alternative transport: - a2a_types: AgentCard, Message, Task, Part data model - a2a_transport: JSON-RPC 2.0 client for sending to A2A agents - a2a_server: HTTP server exposing Relay agents as A2A endpoints - a2a_bridge: Bidirectional bridge between A2A and Relay workspaces Python: 4 source files (1,040 lines) + 4 test files (1,612 lines), 89/89 passing TypeScript: 4 source files + 4 test files (parity with Python) Spec: A2A_TRANSPORT_SPEC.md with 3-phase implementation plan
…DoS regex
1. Remove onRelay() auto-detect that threw on empty config {}; only
export onPiRelay/onClaudeRelay (Will's suggestion).
2. core.ts handleTransportMessage already buffers AND dispatches — verified.
3. transport.ts sendHttp already has retry with exponential backoff — verified.
4. Replace polynomial regex /\/+$/ with iterative trimTrailingSlashes()
to eliminate ReDoS risk.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Claude SDK adapter: fix argument order to on_relay(name, options, relay), add hooks null check - Google ADK adapter: return before_model_callback result instead of discarding - core.py: fix return type callable → Callable[[], None], connect-dedup in _ensure_connected - Updated test call sites to match new claude_sdk signature - Issues 1,4,5,6,8,9,10,11,12 already fixed in prior commit (1e2d591)
- Fix A2A parser operator precedence bug in a2aMessageFromDict (TS) - Clear connectPromise on rejection so retries work (TS) - Ensure _connect_future is always resolved/rejected to prevent deadlock (Python) - Align Python SDK version with workspace (3.2.3) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
b795a50 to
58649a1
Compare
| JSON.stringify({ | ||
| jsonrpc: '2.0', | ||
| error: { code: -32602, message: String(err) }, | ||
| id: rpcId, | ||
| }), |
Check warning
Code scanning / CodeQL
Information exposure through a stack trace Medium
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 7 days ago
In general, the fix is to stop sending the raw error string derived from the caught exception back to the client and instead send a generic, non-sensitive message while logging the detailed error on the server. This preserves debuggability for developers but prevents attackers from learning about internal implementation details via error text or stack traces.
The best targeted fix here is to modify the catch (err) block in _handleJsonRpc (lines 231–240). Specifically:
- Replace
message: String(err)with a generic message such as"Internal error"or a stable, non-sensitive description of the failure. - Optionally, log the error (including its stack) to the server console or another logging system so developers can still diagnose issues.
- Keep the existing JSON-RPC structure, error code (
-32602),id, and HTTP status code behavior unchanged to avoid breaking existing clients’ expectations, unless the project has a specific error contract that suggests a different generic message.
No new external dependencies are strictly necessary; Node’s built-in console.error can be used for logging. All changes are confined to the catch block in _handleJsonRpc within packages/sdk/src/communicate/a2a-server.ts.
| @@ -229,11 +229,13 @@ | ||
| res.writeHead(200, { 'Content-Type': 'application/json' }); | ||
| res.end(JSON.stringify({ jsonrpc: '2.0', result, id: rpcId })); | ||
| } catch (err) { | ||
| // Log detailed error information on the server, but do not expose it to the client. | ||
| console.error('Error handling JSON-RPC request:', err); | ||
| res.writeHead(404, { 'Content-Type': 'application/json' }); | ||
| res.end( | ||
| JSON.stringify({ | ||
| jsonrpc: '2.0', | ||
| error: { code: -32602, message: String(err) }, | ||
| error: { code: -32602, message: 'Internal error' }, | ||
| id: rpcId, | ||
| }), | ||
| ); |
- Fix A2AServer constructor arg order and void return in bridge test - Fix openai-agents test type annotations for callable instructions - Exclude communicate test files from build tsconfig - Guard Pi adapter import in adapters __init__.py with try/except - Clear _connect_future on successful connect to avoid stale reference - Guard send_dm/post_message/reply against None payload on 204 responses - Switch OpenAI/Agno instructions wrapper from peek() to inbox() to prevent unbounded buffer - Fix CrewAI TS adapter step_callback composition to preserve routing - Fix CrewAI Python _resolve_sync to poll inbox even when event loop is running Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace individual test file exclusions with a single glob pattern to prevent vitest type resolution errors in CI. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The tsconfig.json (used by check/build:full) overrides the exclude from tsconfig.build.json. Apply the same src/__tests__/** glob exclusion to both configs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Add 2s timeout to TS WebSocket disconnect to prevent hanging - Remove zod dependency from Google ADK TS adapter, use plain JSON schemas - Fix Claude SDK e2e test call signatures to match on_relay(name, options, relay) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| text = m.get("text", "") | ||
| channel = m.get("channel") or m.get("channel_name") or m.get("channelName") or payload.get("channel") or payload.get("channel_name") | ||
| thread_id = m.get("thread_id") or m.get("threadId") or m.get("conversation_id") or m.get("conversationId") or payload.get("thread_id") | ||
| timestamp = m.get("timestamp") or m.get("created_at") or m.get("createdAt") or payload.get("timestamp") |
There was a problem hiding this comment.
🔴 Python _message_from_payload uses or instead of None-checks, storing string timestamps in float | None field
The _message_from_payload method at packages/sdk-py/src/agent_relay/communicate/transport.py:476 uses Python or chains to resolve the timestamp field. This has two problems:
- If
m.get("timestamp")returns0(valid Unix epoch), theortreats it as falsy and falls through tocreated_at, which is an ISO date string — silently losing the numeric timestamp. - If
timestampis absent butcreated_atis present (the common API case), a string like"2024-01-01T00:00:00Z"is stored inMessage.timestampwhich is typedfloat | None. This violates the type contract and will break any downstream code doing arithmetic or comparisons on the timestamp.
The equivalent TypeScript version at packages/sdk/src/communicate/transport.ts:468-469 correctly uses nullish coalescing (??) and filters with typeof rawTs === 'number' ? rawTs : undefined.
| timestamp = m.get("timestamp") or m.get("created_at") or m.get("createdAt") or payload.get("timestamp") | |
| raw_ts = m.get("timestamp") if m.get("timestamp") is not None else m.get("created_at") if m.get("created_at") is not None else m.get("createdAt") if m.get("createdAt") is not None else payload.get("timestamp") | |
| timestamp = raw_ts if isinstance(raw_ts, (int, float)) else None |
Was this helpful? React with 👍 or 👎 to provide feedback.
| text=item.get("text", ""), | ||
| channel=None, | ||
| thread_id=conv_id, | ||
| timestamp=item.get("created_at"), |
There was a problem hiding this comment.
🔴 Python check_inbox stores string created_at values in Message.timestamp (typed float | None)
The check_inbox method constructs Message objects with timestamp=item.get("created_at") at lines 260, 272, and 283. The Relaycast API returns created_at as an ISO 8601 string (e.g., "2024-01-01T00:00:00Z"), but Message.timestamp is typed float | None. This stores a string in a float field, violating the type contract.
The equivalent TypeScript code at packages/sdk/src/communicate/transport.ts:177 correctly handles this: typeof last.created_at === 'string' ? undefined : (last.created_at as number | undefined).
All three occurrences in check_inbox have this issue — the created_at string value should either be converted to a float (epoch seconds) or set to None.
Prompt for agents
In packages/sdk-py/src/agent_relay/communicate/transport.py, the check_inbox method at lines 260, 272, and 283 passes raw created_at string values into Message.timestamp which is typed float | None. Fix all three occurrences by either:
1. Setting timestamp=None when the value is a string (matching the TS behavior), or
2. Parsing the ISO 8601 string to a float epoch timestamp.
Specifically:
- Line 260: Change timestamp=item.get("created_at") to filter out non-numeric values
- Line 272: Change timestamp=last.get("created_at") to filter out non-numeric values
- Line 283: Change timestamp=mention.get("created_at") to filter out non-numeric values
A helper like `_numeric_timestamp(val)` that returns `val if isinstance(val, (int, float)) else None` would reduce duplication.
Was this helpful? React with 👍 or 👎 to provide feedback.
- Align mock routes with transport.ts endpoints (POST /v1/agents,
POST /v1/dm, POST /v1/channels/{ch}/messages, etc.)
- Switch agent-authenticated routes to validate per-agent token
- Update WebSocket upgrade path to /v1/ws?token={token}
- Wrap responses in {ok: true, data: {...}} format
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…encies @google/adk, @langchain/langgraph, @openai/agents, and crewai are only needed when using their respective adapters. Move them to peerDependencies with optional: true to avoid bloating the global install. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Fix Message.timestamp type to accept str | float | None (API returns ISO strings) - Fix TS errorMessage double-consuming response body — read text first, then parse - Use local onMessage buffer in OpenAI/Agno instructions wrapper instead of inbox()/peek() to avoid starving the relay_inbox tool - Store CrewAI on_message unsubscribe handle on agent to prevent leak Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| except Exception: | ||
| # WebSocket failed — register agent via HTTP and fall back to polling | ||
| await self.transport.register_agent() | ||
| self._ws_connected = False | ||
| self._start_poll_loop() |
There was a problem hiding this comment.
🔴 Python _ensure_connected catches config/auth errors in WebSocket fallback path
The except Exception on line 163 is intended to catch WebSocket connection failures and fall back to HTTP polling, but it catches all exceptions from transport.connect(), including RelayConfigError (missing API key/workspace) and RelayAuthError (401). When a config or auth error occurs, transport.connect() fails, the code falls through to transport.register_agent() on line 165 which also calls _require_config() (or hits the same 401), causing the error to be raised from the fallback path instead of the original call. This doubles HTTP requests for auth failures and produces a confusing error origin.
Trace of the wrong-path execution
transport.connect()calls_require_config()which raisesRelayConfigError(orregister_agent()raisesRelayAuthError)except Exceptionon line 163 catches ittransport.register_agent()on line 165 also fails with the same error- Exception propagates to outer
exceptat line 177 - User sees the error from the fallback path, not the original
connect()call
The fix should re-raise RelayConfigError and RelayAuthError (or at minimum non-transient errors) before falling back to polling.
| except Exception: | |
| # WebSocket failed — register agent via HTTP and fall back to polling | |
| await self.transport.register_agent() | |
| self._ws_connected = False | |
| self._start_poll_loop() | |
| except (RelayConfigError, RelayAuthError): | |
| raise | |
| except Exception: | |
| # WebSocket failed — register agent via HTTP and fall back to polling | |
| await self.transport.register_agent() | |
| self._ws_connected = False | |
| self._start_poll_loop() |
Was this helpful? React with 👍 or 👎 to provide feedback.
| def _resolve_sync(self) -> str: | ||
| messages = self._drain_buffer() | ||
| try: | ||
| loop = asyncio.get_running_loop() | ||
| except RuntimeError: | ||
| loop = None | ||
|
|
||
| if loop is None: | ||
| messages.extend(self._relay.inbox_sync()) | ||
| else: | ||
| # Running inside an event loop — use a thread to avoid blocking | ||
| import concurrent.futures | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: | ||
| polled = pool.submit(asyncio.run, self._relay.inbox()).result() | ||
| messages.extend(polled) | ||
| return _format_backstory(self._dedupe(messages), self._base_backstory) |
There was a problem hiding this comment.
🔴 CrewAI _RelayBackstory drains relay inbox and triggers network I/O on every string operation
Every __str__, __repr__, __contains__, __eq__, or __getattr__ call on _RelayBackstory invokes _resolve_sync() which calls self._relay.inbox_sync() (packages/sdk-py/src/agent_relay/communicate/adapters/crewai.py:55). This has two problems:
- Side effect:
inbox_sync()drains the relay's_pendingbuffer (core.py:59-60), so any other consumer (other adapters, user code callingrelay.inbox()) will see empty results after the backstory is accessed as a string. - Performance: Simple operations like
"text" in agent.backstoryorstr(agent.backstory)trigger a synchronous HTTP round-trip to the relay server (viainbox_sync()), which is surprising for what looks like a property access.
CrewAI may access backstory multiple times during a single task execution (for prompt construction, logging, etc.), each time draining the inbox and making network calls.
Prompt for agents
In packages/sdk-py/src/agent_relay/communicate/adapters/crewai.py, the _RelayBackstory class's _resolve_sync() method (lines 47-62) calls self._relay.inbox_sync() on every string operation (__str__, __repr__, __contains__, __eq__, __getattr__). This drains the relay's pending message buffer as a side effect and triggers network I/O on simple property accesses.
The fix should:
1. Remove the inbox_sync()/inbox() call from _resolve_sync() and _resolve_async(). The on_message callback buffer (self._buffer) already receives all messages via the subscription at line 139.
2. Only use self._drain_buffer() to get messages, since the on_message subscription at line 139 already populates the buffer with all incoming messages.
3. This eliminates the side effect of draining the relay's _pending buffer and removes the network I/O from string operations.
The key insight is that the _buffer_message callback (line 136-137) already captures all messages via relay.on_message(), so there's no need to also poll inbox(). The _dedupe call can also be removed since messages only come from one source.
Was this helpful? React with 👍 or 👎 to provide feedback.
Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com>
| def _run_sync(awaitable: Any) -> Any: | ||
| try: | ||
| asyncio.get_running_loop() | ||
| except RuntimeError: | ||
| return asyncio.run(awaitable) | ||
| # Running inside an active event loop — execute in a separate thread | ||
| import concurrent.futures | ||
|
|
||
| with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: | ||
| return pool.submit(asyncio.run, awaitable).result() |
There was a problem hiding this comment.
🔴 Python _run_sync / CrewAI _resolve_sync reuses aiohttp session across incompatible event loops
Both Relay._run_sync() (core.py:246-255) and _RelayBackstory._resolve_sync() (crewai.py:47-62) use pool.submit(asyncio.run, coroutine).result() to run async code from a sync context. asyncio.run() creates a new event loop each time. If the Relay was previously connected (creating an aiohttp.ClientSession bound to event loop A), and then a sync method is called that needs to make HTTP requests, the existing session bound to the now-closed loop A is reused in loop B.
aiohttp sessions are bound to the event loop they were created in — using one in a different loop raises RuntimeError. The _ensure_session() check at transport.py:362-364 only checks self._session.closed, which is False for a session whose loop was closed (since session.close() was never called).
This manifests in the CrewAI adapter when: (1) the Relay connects asynchronously, then (2) CrewAI accesses agent.backstory synchronously, triggering _resolve_sync() which calls relay.inbox() in a new event loop. If the inbox path needs an HTTP call (e.g., WebSocket is disconnected and _ws_connected is False at core.py:54), it would use the stale session and crash.
Prompt for agents
In packages/sdk-py/src/agent_relay/communicate/core.py, the _run_sync static method (lines 246-255) and the CrewAI adapter's _resolve_sync (packages/sdk-py/src/agent_relay/communicate/adapters/crewai.py lines 47-62) both call asyncio.run() in a ThreadPoolExecutor, which creates a new event loop. If the RelayTransport already has an aiohttp.ClientSession bound to a different (now-closed) event loop, reusing it will fail.
To fix this, either:
1. In RelayTransport._ensure_session() (transport.py lines 362-364), also check whether the session's loop is still running (not just session.closed). If the loop is closed, recreate the session.
2. Or, in _run_sync, close the existing transport session before calling asyncio.run so it gets recreated in the new loop.
3. Or, track which event loop the session was created in and force recreation if it differs from the current running loop.
Was this helpful? React with 👍 or 👎 to provide feedback.
Summary
on_relay()puts any framework agent on Relaycast with a single call — no broker neededArchitecture
Two tiers of framework integration:
Brokerless transport: HTTP + WebSocket directly to Relaycast API with auto-reconnect and exponential backoff.
Test plan
tsc -p tsconfig.build.jsonpasses cleanSpec:
specs/connect-sdk.md🤖 Generated with Claude Code