Skip to content

Commit 79b1616

Browse files
khaliqgantclaude
andcommitted
fix: workflow local mode — don't connect to Relaycast when disabled
When AGENT_RELAY_WORKFLOW_DISABLE_RELAYCAST=1 is set, the broker was still receiving --channels which triggered Relaycast registration. Now: - Skip --channels arg when channel list is empty (client.ts) - Pass empty channels to broker when Relaycast is disabled (runner.ts) - Validate config in execute() to catch DAG cycles before run starts - Initialize abortController early so abort() works during setup - Mark pending/running steps as failed on cancellation - Fix continue strategy: treat failed steps as done for run completion - Fix verification test: avoid anti-injection false positive All 28 tests across workflow-dag, workflow-lifecycle, workflow-runner, workflow-verification, and workflow-patterns now pass. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 64bbe5f commit 79b1616

4 files changed

Lines changed: 47 additions & 8 deletions

File tree

packages/sdk/src/client.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -486,8 +486,7 @@ export class AgentRelayClient {
486486
'init',
487487
'--name',
488488
this.options.brokerName,
489-
'--channels',
490-
this.options.channels.join(','),
489+
...(this.options.channels.length > 0 ? ['--channels', this.options.channels.join(',')] : []),
491490
...this.options.binaryArgs,
492491
];
493492

packages/sdk/src/workflows/runner.ts

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,8 +1165,15 @@ export class WorkflowRunner {
11651165
workflowName?: string,
11661166
vars?: VariableContext
11671167
): Promise<WorkflowRunRow> {
1168+
// Set up abort controller early so callers can abort() even during setup
1169+
this.abortController = new AbortController();
1170+
this.paused = false;
1171+
11681172
const resolved = vars ? this.resolveVariables(config, vars) : config;
11691173

1174+
// Validate config (catches cycles, missing deps, invalid steps, etc.)
1175+
this.validateConfig(resolved);
1176+
11701177
// Resolve and validate named paths from the top-level `paths` config
11711178
const pathResult = this.resolvePathDefinitions(resolved.paths, this.cwd);
11721179
if (pathResult.errors.length > 0) {
@@ -1250,6 +1257,10 @@ export class WorkflowRunner {
12501257

12511258
/** Resume a previously paused or partially completed run. */
12521259
async resume(runId: string, vars?: VariableContext): Promise<WorkflowRunRow> {
1260+
// Set up abort controller early so callers can abort() even during setup
1261+
this.abortController = new AbortController();
1262+
this.paused = false;
1263+
12531264
const run = await this.db.getRun(runId);
12541265
if (!run) {
12551266
throw new Error(`Run "${runId}" not found`);
@@ -1312,9 +1323,7 @@ export class WorkflowRunner {
13121323
const { run, workflow, config, stepStates, isResume } = input;
13131324
const runId = run.id;
13141325

1315-
// Start execution
1316-
this.abortController = new AbortController();
1317-
this.paused = false;
1326+
// Start execution (abortController already set by execute()/resume())
13181327
this.currentConfig = config;
13191328
this.currentRunId = runId;
13201329
this.runStartTime = Date.now();
@@ -1386,7 +1395,7 @@ export class WorkflowRunner {
13861395
this.relay = new AgentRelay({
13871396
...this.relayOptions,
13881397
brokerName,
1389-
channels: [channel],
1398+
channels: relaycastDisabled ? [] : [channel],
13901399
env: this.getRelayEnv(),
13911400
// Workflows spawn agents across multiple waves; each spawn requires a PTY +
13921401
// Relaycast registration. 60s is too tight when the broker is saturated with
@@ -1578,8 +1587,15 @@ export class WorkflowRunner {
15781587
this.log(`Executing ${workflow.steps.length} steps (pattern: ${config.swarm.pattern})`);
15791588
await this.executeSteps(workflow, stepStates, agentMap, config.errorHandling, runId);
15801589

1590+
const errorStrategy =
1591+
config.errorHandling?.strategy ?? workflow.onError ?? 'fail-fast';
1592+
const continueOnError =
1593+
errorStrategy === 'continue' || errorStrategy === 'skip';
15811594
const allCompleted = [...stepStates.values()].every(
1582-
(s) => s.row.status === 'completed' || s.row.status === 'skipped'
1595+
(s) =>
1596+
s.row.status === 'completed' ||
1597+
s.row.status === 'skipped' ||
1598+
(continueOnError && s.row.status === 'failed')
15831599
);
15841600

15851601
if (allCompleted) {
@@ -1624,6 +1640,19 @@ export class WorkflowRunner {
16241640
await this.updateRunStatus(runId, status, errorMsg);
16251641

16261642
if (status === 'cancelled') {
1643+
// Mark any pending or in-progress steps as failed due to cancellation
1644+
for (const [stepName, state] of stepStates) {
1645+
if (state.row.status === 'pending' || state.row.status === 'running') {
1646+
state.row.status = 'failed';
1647+
state.row.error = 'Cancelled';
1648+
await this.db.updateStep(state.row.id, {
1649+
status: 'failed',
1650+
error: 'Cancelled',
1651+
updatedAt: new Date().toISOString(),
1652+
});
1653+
this.emit({ type: 'step:failed', runId, stepName, error: 'Cancelled' });
1654+
}
1655+
}
16271656
this.emit({ type: 'run:cancelled', runId });
16281657
this.postToChannel(`Workflow **${workflow.name}** cancelled`);
16291658
await this.trajectory.abandon('Cancelled by user');

tests/integration/broker/workflow-lifecycle.test.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,12 +149,23 @@ test('workflow-lifecycle: abort cancels a running workflow', { timeout: 120_000
149149
{ cwd }
150150
);
151151

152+
// Wait for the step to actually start running before aborting
152153
let currentRunner = harness.getCurrentRunner();
153154
for (let i = 0; i < 20 && !currentRunner; i += 1) {
154155
await sleep(250);
155156
currentRunner = harness.getCurrentRunner();
156157
}
157158
assert.ok(currentRunner, 'Expected workflow runner to be available while running');
159+
160+
// Wait for step:started event before aborting so the step is actually in-flight
161+
await new Promise<void>((resolve) => {
162+
const unsub = currentRunner!.on((event) => {
163+
if (event.type === 'step:started') {
164+
unsub();
165+
resolve();
166+
}
167+
});
168+
});
158169
currentRunner.abort();
159170

160171
const result = await runPromise;

tests/integration/broker/workflow-verification.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ function makeConfig(overrides?: Partial<RelayYamlConfig>): RelayYamlConfig {
3939
{
4040
name: 'step-verify-output',
4141
agent: 'worker',
42-
task: 'Return DONE',
42+
task: 'Do one thing',
4343
verification: { type: 'output_contains', value: 'DONE' },
4444
},
4545
],

0 commit comments

Comments
 (0)