Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions .trajectories/completed/2026-05/traj_mz5m5ysjj31e.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{
"id": "traj_mz5m5ysjj31e",
"version": 1,
"task": {
"title": "Fix Relay SDK broker stdout drain"
},
"status": "completed",
"startedAt": "2026-05-10T20:24:43.831Z",
"completedAt": "2026-05-10T20:35:47.359Z",
"agents": [
{
"name": "default",
"role": "lead",
"joinedAt": "2026-05-10T20:35:41.197Z"
}
],
"chapters": [
{
"id": "chap_a1rg3jdl8b1w",
"title": "Work",
"agentName": "default",
"startedAt": "2026-05-10T20:35:41.197Z",
"endedAt": "2026-05-10T20:35:47.359Z",
"events": [
{
"ts": 1778445341198,
"type": "decision",
"content": "Drain spawned broker stdout in SDK clients: Drain spawned broker stdout in SDK clients",
"raw": {
"question": "Drain spawned broker stdout in SDK clients",
"chosen": "Drain spawned broker stdout in SDK clients",
"alternatives": [],
"reasoning": "agent-relay run and direct SDK users both reach AgentRelayClient.spawn; after startup URL parsing stdout was left paused/undrained, so the root fix belongs in the TypeScript and Python SDK clients rather than only in Ricky's loader workaround."
},
"significance": "high"
}
]
}
],
"retrospective": {
"summary": "Fixed Relay SDK broker stdout drain at the source. TypeScript AgentRelayClient.spawn now resumes and drains broker stdout after startup URL parsing; Python SDK spawn now starts a stdout drain task and cancels it during shutdown. Added a TypeScript regression that floods fake broker stdout after startup and a Python unit test for stdout draining. Verified SDK typecheck/build, focused Vitest, and focused pytest suites.",
"approach": "Standard approach",
"confidence": 0.9
},
"commits": [],
"filesChanged": [],
"projectId": "/Users/khaliqgant/Projects/AgentWorkforce/relay-broker-stdout-drain",
"tags": [],
"_trace": {
"startRef": "bffd6b21275090f2648701d2005e875f2ca882c6",
"endRef": "bffd6b21275090f2648701d2005e875f2ca882c6"
}
}
31 changes: 31 additions & 0 deletions .trajectories/completed/2026-05/traj_mz5m5ysjj31e.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Trajectory: Fix Relay SDK broker stdout drain

> **Status:** ✅ Completed
> **Confidence:** 90%
> **Started:** May 10, 2026 at 10:24 PM
> **Completed:** May 10, 2026 at 10:35 PM

---

## Summary

Fixed Relay SDK broker stdout drain at the source. TypeScript AgentRelayClient.spawn now resumes and drains broker stdout after startup URL parsing; Python SDK spawn now starts a stdout drain task and cancels it during shutdown. Added a TypeScript regression that floods fake broker stdout after startup and a Python unit test for stdout draining. Verified SDK typecheck/build, focused Vitest, and focused pytest suites.

**Approach:** Standard approach

---

## Key Decisions

### Drain spawned broker stdout in SDK clients
- **Chose:** Drain spawned broker stdout in SDK clients
- **Reasoning:** agent-relay run and direct SDK users both reach AgentRelayClient.spawn; after startup URL parsing stdout was left paused/undrained, so the root fix belongs in the TypeScript and Python SDK clients rather than only in Ricky's loader workaround.

---

## Chapters

### 1. Work
*Agent: default*

- Drain spawned broker stdout in SDK clients: Drain spawned broker stdout in SDK clients
9 changes: 8 additions & 1 deletion .trajectories/index.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"version": 1,
"lastUpdated": "2026-05-10T15:29:41.965Z",
"lastUpdated": "2026-05-10T20:35:47.482Z",
"trajectories": {
"traj_1775914133873_35667beb": {
"title": "fix-sdk-build-resolution-workflow",
Expand Down Expand Up @@ -353,6 +353,13 @@
"startedAt": "2026-05-10T15:18:12.326Z",
"completedAt": "2026-05-10T15:29:41.840Z",
"path": "/Users/khaliqgant/Projects/AgentWorkforce/relay-pr831-ci-fix/.trajectories/completed/2026-05/traj_iole5zdt9orr.json"
},
"traj_mz5m5ysjj31e": {
"title": "Fix Relay SDK broker stdout drain",
"status": "completed",
"startedAt": "2026-05-10T20:24:43.831Z",
"completedAt": "2026-05-10T20:35:47.359Z",
"path": ".trajectories/completed/2026-05/traj_mz5m5ysjj31e.json"
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}
14 changes: 14 additions & 0 deletions packages/sdk-py/src/agent_relay/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def __init__(
self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
self._ws_task: Optional[asyncio.Task[None]] = None
self._lease_task: Optional[asyncio.Task[None]] = None
self._stdout_task: Optional[asyncio.Task[None]] = None
self._stderr_task: Optional[asyncio.Task[None]] = None
self._process: Optional[asyncio.subprocess.Process] = None
self._event_listeners: list[Callable[[BrokerEvent], None]] = []
Expand Down Expand Up @@ -188,8 +189,10 @@ async def _read_stderr() -> None:

# Parse API URL from stdout
base_url = await _wait_for_api_url(process, startup_timeout_ms)
stdout_task = asyncio.create_task(_drain_stdout(process))

client = cls(base_url=base_url, api_key=api_key)
client._stdout_task = stdout_task
client._stderr_task = stderr_task
client._process = process

Expand Down Expand Up @@ -503,6 +506,9 @@ async def shutdown(self) -> None:
if self._stderr_task and not self._stderr_task.done():
self._stderr_task.cancel()

if self._stdout_task and not self._stdout_task.done():
self._stdout_task.cancel()

if self._session and not self._session.closed:
await self._session.close()

Expand Down Expand Up @@ -558,3 +564,11 @@ async def _read() -> str:
raise AgentRelayProcessError(
f"Broker did not report API URL within {timeout_ms}ms"
) from None


async def _drain_stdout(process: asyncio.subprocess.Process) -> None:
"""Drain broker stdout after startup so the broker cannot block on a full pipe."""
if not process.stdout:
return
while await process.stdout.read(65536):
pass
20 changes: 19 additions & 1 deletion packages/sdk-py/tests/test_wait_for_api_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pytest

from agent_relay.client import AgentRelayProcessError, _wait_for_api_url
from agent_relay.client import AgentRelayProcessError, _drain_stdout, _wait_for_api_url


def _make_process(lines: list[str], returncode: int | None = 0):
Expand Down Expand Up @@ -90,3 +90,21 @@ async def test_ignores_lines_with_colon_but_no_url(self):
])
url = await _wait_for_api_url(process, 5000)
assert url == "http://127.0.0.1:9999"


class TestDrainStdout:
async def test_reads_until_eof(self):
chunks = [b"x" * 1024, b"y" * 1024, b""]
reads: list[int] = []

async def read(size: int) -> bytes:
reads.append(size)
return chunks.pop(0)

process = AsyncMock()
process.stdout = AsyncMock()
process.stdout.read = read

await _drain_stdout(process)

assert reads == [65536, 65536, 65536]
85 changes: 85 additions & 0 deletions packages/sdk/src/__tests__/client-stdout-drain.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import assert from 'node:assert/strict';
import { mkdir, mkdtemp, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { setTimeout as sleep } from 'node:timers/promises';
import { test } from 'vitest';

import { AgentRelayClient } from '../client.js';

const BROKER_STDOUT_DRAIN_TIMEOUT_MS = 5_000;

test('spawn drains broker stdout after startup so event floods cannot wedge the broker', async () => {
const cwd = await mkdtemp(join(tmpdir(), 'agent-relay-sdk-stdout-drain-'));

try {
await mkdir(cwd, { recursive: true });
await writeFile(
join(cwd, 'init'),
[
"const http = require('node:http');",
'const server = http.createServer((req, res) => {',
" if (req.url === '/api/session') {",
" res.writeHead(200, { 'content-type': 'application/json' });",
" res.end(JSON.stringify({ workspace_key: 'wk_fake', mode: 'test', uptime_secs: 0 }));",
' return;',
' }',
" if (req.url === '/api/session/renew' || req.url === '/api/shutdown') {",
" res.writeHead(200, { 'content-type': 'application/json' });",
' res.end(JSON.stringify({ ok: true }));',
' return;',
' }',
" res.writeHead(404, { 'content-type': 'application/json' });",
" res.end(JSON.stringify({ error: 'not found' }));",
'});',
"server.listen(0, '127.0.0.1', () => {",
' const address = server.address();',
' console.log(`[agent-relay] API listening on http://127.0.0.1:${address.port}`);',
' let index = 0;',
" const chunk = 'x'.repeat(1024);",
' const writeMore = () => {',
' let ok = true;',
' while (index < 20000 && ok) {',
' ok = process.stdout.write(`event-${index}:${chunk}\\n`);',
' index += 1;',
' }',
' if (index >= 20000) {',
' setTimeout(() => process.exit(0), 25);',
' return;',
' }',
" process.stdout.once('drain', writeMore);",
' };',
' writeMore();',
'});',
'',
].join('\n'),
'utf8'
);

const client = await AgentRelayClient.spawn({
binaryPath: process.execPath,
cwd,
startupTimeoutMs: 3_000,
requestTimeoutMs: 3_000,
});

const child = client.child;
assert.ok(child, 'spawned client should retain broker child process');
const outcome =
child.exitCode !== null
? 'exited'
: await Promise.race([
new Promise<'exited'>((resolve) => child.once('exit', () => resolve('exited'))),
sleep(BROKER_STDOUT_DRAIN_TIMEOUT_MS).then(() => 'blocked' as const),
]);
Comment thread
coderabbitai[bot] marked this conversation as resolved.

client.disconnect();
if (outcome !== 'exited') {
child.kill('SIGKILL');
}

assert.equal(outcome, 'exited');
} finally {
await rm(cwd, { recursive: true, force: true });
}
});
11 changes: 11 additions & 0 deletions packages/sdk/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ export class AgentRelayClient {
stdoutLines,
stderrLines,
});
drainBrokerStdoutAfterStartup(child);

const client = new AgentRelayClient({
baseUrl,
Expand Down Expand Up @@ -680,6 +681,16 @@ async function waitForApiUrl(
});
}

function drainBrokerStdoutAfterStartup(child: ChildProcess): void {
if (!child.stdout) return;

child.stdout.on('data', () => {
// Drain broker stdout after startup so high-volume broker diagnostics/events
// cannot fill the pipe and block the broker process.
});
child.stdout.resume();
}

function pushBufferedLine(lines: string[], line: string): void {
lines.push(line);
if (lines.length > 40) {
Expand Down
Loading