From 70e18ec0c9ea0b4afa09dec0c6c8bb885cc8f330 Mon Sep 17 00:00:00 2001 From: Khaliq Date: Sun, 10 May 2026 22:18:46 +0200 Subject: [PATCH 1/2] Fix broker stdout drain in SDK workflow runner --- src/local/entrypoint.test.ts | 116 +++++++++++++++++++++++++++++++++++ src/local/entrypoint.ts | 28 ++++++++- 2 files changed, 143 insertions(+), 1 deletion(-) diff --git a/src/local/entrypoint.test.ts b/src/local/entrypoint.test.ts index 8486371..ec5ac45 100644 --- a/src/local/entrypoint.test.ts +++ b/src/local/entrypoint.test.ts @@ -3118,6 +3118,122 @@ describe('runLocal', () => { } }); + it('drains broker stdout after SDK startup so event floods cannot wedge the workflow node', async () => { + const { chmod, mkdir, mkdtemp, rm, writeFile } = await import('node:fs/promises'); + const { tmpdir } = await import('node:os'); + const { join } = await import('node:path'); + const repo = await mkdtemp(join(tmpdir(), 'ricky-sdk-broker-drain-repo-')); + const stateHome = await mkdtemp(join(tmpdir(), 'ricky-sdk-broker-drain-state-')); + const brokerDir = await mkdtemp(join(tmpdir(), 'ricky-sdk-broker-drain-bin-')); + const brokerPath = join(brokerDir, 'agent-relay-broker'); + const artifactPath = 'workflows/generated/broker-drain.workflow.ts'; + const previousStateHome = process.env.RICKY_STATE_HOME; + const previousFakeBrokerPath = process.env.FAKE_BROKER_PATH; + + try { + process.env.RICKY_STATE_HOME = stateHome; + process.env.FAKE_BROKER_PATH = brokerPath; + await mkdir(join(repo, 'workflows/generated'), { recursive: true }); + await writeFile( + brokerPath, + [ + '#!/usr/bin/env node', + "import http from 'node:http';", + 'const server = http.createServer((req, res) => {', + " if (req.url === '/api/session') {", + " res.writeHead(200, { 'content-type': 'application/json' });", + " res.end(JSON.stringify({ workspace_key: 'wk_fake' }));", + ' return;', + ' }', + " if (req.url === '/health' || req.url === '/api/session/renew' || req.url === '/api/shutdown') {", + " res.writeHead(200, { 'content-type': 'application/json' });", + ' res.end(JSON.stringify({ ok: true }));', + ' return;', + ' }', + " res.writeHead(404, { 'content-type': 'application/json' });", + " res.end(JSON.stringify({ error: 'not found' }));", + '});', + "server.listen(0, '127.0.0.1', () => {", + ' const address = server.address();', + ' console.log(`[agent-relay] API listening on http://127.0.0.1:${address.port}`);', + ' let index = 0;', + " const chunk = 'x'.repeat(1024);", + ' const writeMore = () => {', + ' let ok = true;', + ' while (index < 20000 && ok) {', + ' ok = process.stdout.write(`event-${index}:${chunk}\\n`);', + ' index += 1;', + ' }', + ' if (index >= 20000) {', + " console.error('FAKE_BROKER_FLOOD_DONE');", + ' setTimeout(() => process.exit(0), 25);', + ' return;', + ' }', + " process.stdout.once('drain', writeMore);", + ' };', + ' writeMore();', + '});', + '', + ].join('\n'), + 'utf8', + ); + await chmod(brokerPath, 0o755); + await writeFile( + join(repo, artifactPath), + [ + 'import { AgentRelayClient } from "@agent-relay/sdk";', + 'const client = await AgentRelayClient.spawn({', + ' binaryPath: process.env.FAKE_BROKER_PATH,', + ' cwd: process.cwd(),', + ' startupTimeoutMs: 3000,', + ' requestTimeoutMs: 3000,', + '});', + 'const outcome = await Promise.race([', + ' new Promise((resolve) => client.child?.once("exit", () => resolve("exited"))),', + ' new Promise((resolve) => setTimeout(() => resolve("blocked"), 1500)),', + ']);', + 'client.disconnect();', + 'if (outcome !== "exited") {', + ' client.child?.kill("SIGKILL");', + ' throw new Error(`broker stdout flood ${outcome}`);', + '}', + 'console.log("BROKER_STDOUT_DRAINED");', + '', + ].join('\n'), + 'utf8', + ); + + const result = await runLocal( + { source: 'workflow-artifact', artifactPath, stageMode: 'run' }, + { + artifactReader: mockArtifactReader('import { AgentRelayClient } from "@agent-relay/sdk";'), + localExecutor: { + cwd: repo, + timeoutMs: 10_000, + }, + }, + ); + + expect(result.ok).toBe(true); + expect(result.execution?.status).toBe('success'); + expect(result.logs).toContain('[stdout] BROKER_STDOUT_DRAINED'); + } finally { + if (previousStateHome === undefined) { + delete process.env.RICKY_STATE_HOME; + } else { + process.env.RICKY_STATE_HOME = previousStateHome; + } + if (previousFakeBrokerPath === undefined) { + delete process.env.FAKE_BROKER_PATH; + } else { + process.env.FAKE_BROKER_PATH = previousFakeBrokerPath; + } + await rm(repo, { recursive: true, force: true }); + await rm(stateHome, { recursive: true, force: true }); + await rm(brokerDir, { recursive: true, force: true }); + } + }); + it('kills the SDK workflow process tree when the local timeout fires', async () => { const { mkdir, mkdtemp, readFile, rm, writeFile } = await import('node:fs/promises'); const { tmpdir } = await import('node:os'); diff --git a/src/local/entrypoint.ts b/src/local/entrypoint.ts index ee2c22c..d7a82f8 100644 --- a/src/local/entrypoint.ts +++ b/src/local/entrypoint.ts @@ -978,9 +978,35 @@ async function workflowSdkLoaderNodeOption(cwd: string): Promise{', + 'child.stdout?.off("pause",drainBrokerStdout);', + 'child.stdout?.on("data",()=>{});', + 'child.stdout?.resume();', + '};', + 'child.stdout.on("pause",drainBrokerStdout);', + '}', + 'return child;', + '};', + 'childProcess[brokerStdoutDrainPatchKey]=true;', + 'syncBuiltinESMExports();', + '}', `register(${JSON.stringify(pathToFileURL(loaderPath).href)},pathToFileURL("./"));`, ].join(''); return `--import=data:text/javascript,${encodeURIComponent(registerSource)}`; From 53ae63b4488e389a50fdca3ab7bdd30067383e25 Mon Sep 17 00:00:00 2001 From: Khaliq Date: Sun, 10 May 2026 22:29:25 +0200 Subject: [PATCH 2/2] Make broker drain regression portable --- src/local/entrypoint.test.ts | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/local/entrypoint.test.ts b/src/local/entrypoint.test.ts index ec5ac45..cb75d09 100644 --- a/src/local/entrypoint.test.ts +++ b/src/local/entrypoint.test.ts @@ -3119,13 +3119,13 @@ describe('runLocal', () => { }); it('drains broker stdout after SDK startup so event floods cannot wedge the workflow node', async () => { - const { chmod, mkdir, mkdtemp, rm, writeFile } = await import('node:fs/promises'); + const { chmod, copyFile, mkdir, mkdtemp, rm, writeFile } = await import('node:fs/promises'); const { tmpdir } = await import('node:os'); const { join } = await import('node:path'); const repo = await mkdtemp(join(tmpdir(), 'ricky-sdk-broker-drain-repo-')); const stateHome = await mkdtemp(join(tmpdir(), 'ricky-sdk-broker-drain-state-')); const brokerDir = await mkdtemp(join(tmpdir(), 'ricky-sdk-broker-drain-bin-')); - const brokerPath = join(brokerDir, 'agent-relay-broker'); + const brokerPath = join(brokerDir, process.platform === 'win32' ? 'agent-relay-broker.exe' : 'agent-relay-broker'); const artifactPath = 'workflows/generated/broker-drain.workflow.ts'; const previousStateHome = process.env.RICKY_STATE_HOME; const previousFakeBrokerPath = process.env.FAKE_BROKER_PATH; @@ -3134,11 +3134,16 @@ describe('runLocal', () => { process.env.RICKY_STATE_HOME = stateHome; process.env.FAKE_BROKER_PATH = brokerPath; await mkdir(join(repo, 'workflows/generated'), { recursive: true }); + // Give the SDK a real executable named like the broker while keeping the + // broker body in the `init` script that Node receives as argv[1]. + await copyFile(process.execPath, brokerPath); + if (process.platform !== 'win32') { + await chmod(brokerPath, 0o755); + } await writeFile( - brokerPath, + join(repo, 'init'), [ - '#!/usr/bin/env node', - "import http from 'node:http';", + "const http = require('node:http');", 'const server = http.createServer((req, res) => {', " if (req.url === '/api/session') {", " res.writeHead(200, { 'content-type': 'application/json' });", @@ -3177,7 +3182,6 @@ describe('runLocal', () => { ].join('\n'), 'utf8', ); - await chmod(brokerPath, 0o755); await writeFile( join(repo, artifactPath), [