Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/broker-sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
"typescript": "^5.7.3"
},
"dependencies": {
"@relaycast/sdk": "^0.3.0",
"@relaycast/sdk": "^0.3.1",
"yaml": "^2.7.0"
}
}
21 changes: 17 additions & 4 deletions packages/broker-sdk/src/relaycast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,33 @@ export class RelaycastApi {

/** Register an external agent in the workspace (e.g., a spawned workflow agent).
* Uses the workspace API key to register, not an agent token.
* No-op if the agent already exists. */
async registerExternalAgent(name: string, persona?: string): Promise<void> {
* No-op if the agent already exists (returns null).
* Returns an AgentClient that can send heartbeats. */
async registerExternalAgent(name: string, persona?: string): Promise<AgentClient | null> {
const apiKey = await this.resolveApiKey();
const relay = new RelayCast({ apiKey, baseUrl: this.baseUrl });
try {
await relay.agents.register({ name, type: "agent", ...(persona ? { persona } : {}) });
const reg = await relay.agents.register({ name, type: "agent", ...(persona ? { persona } : {}) });
return relay.as(reg.token);
} catch (err) {
if (err instanceof RelayError && err.code === "agent_already_exists") {
return;
return null;
}
throw err;
}
}

/** Start a heartbeat loop for an external agent. Returns a cleanup function. */
startHeartbeat(agentClient: AgentClient, intervalMs = 30_000): () => void {
const timer = setInterval(() => {
agentClient.heartbeat().catch(() => {});
}, intervalMs);
timer.unref();
// Send first heartbeat immediately
agentClient.heartbeat().catch(() => {});
return () => clearInterval(timer);
}

/** Fetch message history from a channel. */
async getMessages(
channel: string,
Expand Down
92 changes: 83 additions & 9 deletions packages/broker-sdk/src/workflows/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,6 @@ export class WorkflowRunner {
if (allCompleted) {
await this.updateRunStatus(runId, 'completed');
this.emit({ type: 'run:completed', runId });
this.postToChannel(`Workflow **${workflow.name}** completed — all steps passed`);

// Complete trajectory with summary
const outcomes = this.collectOutcomes(stepStates, workflow.steps);
Expand All @@ -542,12 +541,17 @@ export class WorkflowRunner {
learnings: this.trajectory.extractLearnings(outcomes),
challenges: this.trajectory.extractChallenges(outcomes),
});

// Post rich completion report to channel
this.postCompletionReport(workflow.name, outcomes, summary, confidence);
} else {
const failedStep = [...stepStates.values()].find((s) => s.row.status === 'failed');
const errorMsg = failedStep?.row.error ?? 'One or more steps failed';
await this.updateRunStatus(runId, 'failed', errorMsg);
this.emit({ type: 'run:failed', runId, error: errorMsg });
this.postToChannel(`Workflow **${workflow.name}** failed: ${errorMsg}`);

const outcomes = this.collectOutcomes(stepStates, workflow.steps);
this.postFailureReport(workflow.name, outcomes, errorMsg);

// Abandon trajectory on failure
await this.trajectory.abandon(errorMsg);
Expand All @@ -559,7 +563,7 @@ export class WorkflowRunner {

if (status === 'cancelled') {
this.emit({ type: 'run:cancelled', runId });
this.postToChannel(`Workflow cancelled`);
this.postToChannel(`Workflow **${workflow.name}** cancelled`);
await this.trajectory.abandon('Cancelled by user');
} else {
this.emit({ type: 'run:failed', runId, error: errorMsg });
Expand Down Expand Up @@ -666,7 +670,6 @@ export class WorkflowRunner {
if (allCompleted) {
await this.updateRunStatus(runId, 'completed');
this.emit({ type: 'run:completed', runId });
this.postToChannel(`Workflow **${workflow.name}** completed — all steps passed`);

const outcomes = this.collectOutcomes(stepStates, workflow.steps);
const summary = this.trajectory.buildRunSummary(outcomes);
Expand All @@ -675,12 +678,16 @@ export class WorkflowRunner {
learnings: this.trajectory.extractLearnings(outcomes),
challenges: this.trajectory.extractChallenges(outcomes),
});

this.postCompletionReport(workflow.name, outcomes, summary, confidence);
} else {
const failedStep = [...stepStates.values()].find((s) => s.row.status === 'failed');
const errorMsg = failedStep?.row.error ?? 'One or more steps failed';
await this.updateRunStatus(runId, 'failed', errorMsg);
this.emit({ type: 'run:failed', runId, error: errorMsg });
this.postToChannel(`Workflow **${workflow.name}** failed: ${errorMsg}`);

const outcomes = this.collectOutcomes(stepStates, workflow.steps);
this.postFailureReport(workflow.name, outcomes, errorMsg);
await this.trajectory.abandon(errorMsg);
}
} catch (err) {
Expand Down Expand Up @@ -947,7 +954,7 @@ export class WorkflowRunner {
const agentName = `${step.name}-${this.generateShortId()}`;
const taskWithExit = step.task + '\n\n---\n' +
'IMPORTANT: When you have fully completed this task, you MUST self-terminate by calling ' +
`the MCP tool: relay_release(name="${agentName}", reason="Task completed"). ` +
`the MCP tool: remove_agent(name="${agentName}", reason="Task completed"). ` +
'Do not wait for further input — release yourself immediately after finishing.';

const agentChannels = this.channel ? [this.channel] : agentDef.channels;
Expand All @@ -961,12 +968,18 @@ export class WorkflowRunner {
idleThresholdSecs: agentDef.constraints?.idleThresholdSecs,
});

// Register the spawned agent in Relaycast for observability
// Register the spawned agent in Relaycast for observability + start heartbeat
let stopHeartbeat: (() => void) | undefined;
if (this.relaycastApi) {
await this.relaycastApi.registerExternalAgent(
const agentClient = await this.relaycastApi.registerExternalAgent(
agent.name,
`Workflow agent for step "${step.name}" (${agentDef.cli})`,
).catch(() => {});
).catch(() => null);

// Keep the agent online in the dashboard while it's working
if (agentClient) {
stopHeartbeat = this.relaycastApi.startHeartbeat(agentClient);
}
}

// Invite the spawned agent to the workflow channel
Expand All @@ -983,6 +996,9 @@ export class WorkflowRunner {
// Wait for agent to exit (self-termination via /exit)
const exitResult = await agent.waitForExit(timeoutMs);

// Stop heartbeat now that agent has exited
stopHeartbeat?.();

if (exitResult === 'timeout') {
// Safety net: check if the verification file exists before giving up.
// The agent may have completed work but failed to /exit.
Expand Down Expand Up @@ -1145,6 +1161,64 @@ export class WorkflowRunner {
});
}

/** Post a rich completion report to the channel. */
private postCompletionReport(
workflowName: string,
outcomes: StepOutcome[],
summary: string,
confidence: number,
): void {
const completed = outcomes.filter((o) => o.status === 'completed');
const skipped = outcomes.filter((o) => o.status === 'skipped');
const retried = outcomes.filter((o) => o.attempts > 1);

const lines: string[] = [
`## Workflow **${workflowName}** — Complete`,
'',
summary,
`Confidence: ${Math.round(confidence * 100)}%`,
'',
'### Steps',
...completed.map((o) =>
`- **${o.name}** (${o.agent}) — passed${o.verificationPassed ? ' (verified)' : ''}${o.attempts > 1 ? ` after ${o.attempts} attempts` : ''}`,
),
...skipped.map((o) => `- **${o.name}** — skipped`),
];

if (retried.length > 0) {
lines.push('', '### Retries');
for (const o of retried) {
lines.push(`- ${o.name}: ${o.attempts} attempts`);
}
}

this.postToChannel(lines.join('\n'));
}

/** Post a failure report to the channel. */
private postFailureReport(
workflowName: string,
outcomes: StepOutcome[],
errorMsg: string,
): void {
const completed = outcomes.filter((o) => o.status === 'completed');
const failed = outcomes.filter((o) => o.status === 'failed');
const skipped = outcomes.filter((o) => o.status === 'skipped');

const lines: string[] = [
`## Workflow **${workflowName}** — Failed`,
'',
`${completed.length}/${outcomes.length} steps passed. Error: ${errorMsg}`,
'',
'### Steps',
...completed.map((o) => `- **${o.name}** (${o.agent}) — passed`),
...failed.map((o) => `- **${o.name}** (${o.agent}) — FAILED: ${o.error ?? 'unknown'}`),
...skipped.map((o) => `- **${o.name}** — skipped`),
];

this.postToChannel(lines.join('\n'));
}

// ── Trajectory helpers ────────────────────────────────────────────────

/** Analyze DAG structure for trajectory context. */
Expand Down
Loading