Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions src/local/entrypoint.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3118,6 +3118,126 @@ describe('runLocal', () => {
}
});

it('drains broker stdout after SDK startup so event floods cannot wedge the workflow node', async () => {
const { chmod, copyFile, mkdir, mkdtemp, rm, writeFile } = await import('node:fs/promises');
const { tmpdir } = await import('node:os');
const { join } = await import('node:path');
const repo = await mkdtemp(join(tmpdir(), 'ricky-sdk-broker-drain-repo-'));
const stateHome = await mkdtemp(join(tmpdir(), 'ricky-sdk-broker-drain-state-'));
const brokerDir = await mkdtemp(join(tmpdir(), 'ricky-sdk-broker-drain-bin-'));
const brokerPath = join(brokerDir, process.platform === 'win32' ? 'agent-relay-broker.exe' : 'agent-relay-broker');
const artifactPath = 'workflows/generated/broker-drain.workflow.ts';
const previousStateHome = process.env.RICKY_STATE_HOME;
const previousFakeBrokerPath = process.env.FAKE_BROKER_PATH;

try {
process.env.RICKY_STATE_HOME = stateHome;
process.env.FAKE_BROKER_PATH = brokerPath;
await mkdir(join(repo, 'workflows/generated'), { recursive: true });
// Give the SDK a real executable named like the broker while keeping the
// broker body in the `init` script that Node receives as argv[1].
await copyFile(process.execPath, brokerPath);
if (process.platform !== 'win32') {
await chmod(brokerPath, 0o755);
}
await writeFile(
join(repo, 'init'),
[
"const http = require('node:http');",
'const server = http.createServer((req, res) => {',
" if (req.url === '/api/session') {",
" res.writeHead(200, { 'content-type': 'application/json' });",
" res.end(JSON.stringify({ workspace_key: 'wk_fake' }));",
' return;',
' }',
" if (req.url === '/health' || req.url === '/api/session/renew' || req.url === '/api/shutdown') {",
" res.writeHead(200, { 'content-type': 'application/json' });",
' res.end(JSON.stringify({ ok: true }));',
' return;',
' }',
" res.writeHead(404, { 'content-type': 'application/json' });",
" res.end(JSON.stringify({ error: 'not found' }));",
'});',
"server.listen(0, '127.0.0.1', () => {",
' const address = server.address();',
' console.log(`[agent-relay] API listening on http://127.0.0.1:${address.port}`);',
' let index = 0;',
" const chunk = 'x'.repeat(1024);",
' const writeMore = () => {',
' let ok = true;',
' while (index < 20000 && ok) {',
' ok = process.stdout.write(`event-${index}:${chunk}\\n`);',
' index += 1;',
' }',
' if (index >= 20000) {',
" console.error('FAKE_BROKER_FLOOD_DONE');",
' setTimeout(() => process.exit(0), 25);',
' return;',
' }',
" process.stdout.once('drain', writeMore);",
' };',
' writeMore();',
'});',
'',
].join('\n'),
'utf8',
);
await writeFile(
join(repo, artifactPath),
[
'import { AgentRelayClient } from "@agent-relay/sdk";',
'const client = await AgentRelayClient.spawn({',
' binaryPath: process.env.FAKE_BROKER_PATH,',
' cwd: process.cwd(),',
' startupTimeoutMs: 3000,',
' requestTimeoutMs: 3000,',
'});',
'const outcome = await Promise.race([',
' new Promise((resolve) => client.child?.once("exit", () => resolve("exited"))),',
' new Promise((resolve) => setTimeout(() => resolve("blocked"), 1500)),',
']);',
'client.disconnect();',
'if (outcome !== "exited") {',
' client.child?.kill("SIGKILL");',
' throw new Error(`broker stdout flood ${outcome}`);',
'}',
'console.log("BROKER_STDOUT_DRAINED");',
'',
].join('\n'),
'utf8',
);

const result = await runLocal(
{ source: 'workflow-artifact', artifactPath, stageMode: 'run' },
{
artifactReader: mockArtifactReader('import { AgentRelayClient } from "@agent-relay/sdk";'),
localExecutor: {
cwd: repo,
timeoutMs: 10_000,
},
},
);

expect(result.ok).toBe(true);
expect(result.execution?.status).toBe('success');
expect(result.logs).toContain('[stdout] BROKER_STDOUT_DRAINED');
} finally {
if (previousStateHome === undefined) {
delete process.env.RICKY_STATE_HOME;
} else {
process.env.RICKY_STATE_HOME = previousStateHome;
}
if (previousFakeBrokerPath === undefined) {
delete process.env.FAKE_BROKER_PATH;
} else {
process.env.FAKE_BROKER_PATH = previousFakeBrokerPath;
}
await rm(repo, { recursive: true, force: true });
await rm(stateHome, { recursive: true, force: true });
await rm(brokerDir, { recursive: true, force: true });
}
});

it('kills the SDK workflow process tree when the local timeout fires', async () => {
const { mkdir, mkdtemp, readFile, rm, writeFile } = await import('node:fs/promises');
const { tmpdir } = await import('node:os');
Expand Down
28 changes: 27 additions & 1 deletion src/local/entrypoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -978,9 +978,35 @@ async function workflowSdkLoaderNodeOption(cwd: string): Promise<string | undefi
].join('\n');
await mkdir(dirname(loaderPath), { recursive: true });
await writeFile(loaderPath, loaderSource, 'utf8');
// The SDK reads broker stdout just long enough to parse the startup URL.
// Once readline closes, the stream can pause and a chatty broker can block
// writing events into the full pipe. Patch spawn in the workflow process so
// only managed agent-relay-broker init children are resumed after that pause.
const registerSource = [
'import{register}from"node:module";',
'import{createRequire,register,syncBuiltinESMExports}from"node:module";',
'import{pathToFileURL}from"node:url";',
'const require=createRequire(pathToFileURL("./"));',
'const childProcess=require("node:child_process");',
'const brokerStdoutDrainPatchKey=Symbol.for("ricky.sdkRuntimeLoader.brokerStdoutDrainPatch");',
'if(!childProcess[brokerStdoutDrainPatchKey]){',
'const originalSpawn=childProcess.spawn;',
'childProcess.spawn=function rickySpawnWithBrokerStdoutDrain(command,args,options){',
'const child=originalSpawn.apply(this,arguments);',
'const argv=Array.isArray(args)?args.map(String):[];',
'const executable=String(command??"");',
'if(argv[0]==="init"&&/(?:^|[/\\\\])agent-relay-broker(?:\\.exe)?$/u.test(executable)&&child.stdout){',
'const drainBrokerStdout=()=>{',
'child.stdout?.off("pause",drainBrokerStdout);',
'child.stdout?.on("data",()=>{});',
'child.stdout?.resume();',
'};',
'child.stdout.on("pause",drainBrokerStdout);',
'}',
'return child;',
'};',
'childProcess[brokerStdoutDrainPatchKey]=true;',
'syncBuiltinESMExports();',
'}',
`register(${JSON.stringify(pathToFileURL(loaderPath).href)},pathToFileURL("./"));`,
].join('');
return `--import=data:text/javascript,${encodeURIComponent(registerSource)}`;
Expand Down
Loading