diff --git a/packages/runtime/src/clients/github.test.ts b/packages/runtime/src/clients/github.test.ts index 21d6b97..f125eda 100644 --- a/packages/runtime/src/clients/github.test.ts +++ b/packages/runtime/src/clients/github.test.ts @@ -12,7 +12,7 @@ async function tempMount(): Promise { test('github.comment writes a draft comment file under issues//comments/', async () => { const root = await tempMount(); try { - const client = createGithubClient({ relayfileMountRoot: root }); + const client = createGithubClient({ relayfileMountRoot: root, writebackTimeoutMs: 0 }); await client.comment({ owner: 'o', repo: 'r', number: 2 }, 'hello'); const dir = path.join(root, 'github/repos/o/r/issues/2/comments'); @@ -29,7 +29,7 @@ test('github.comment writes a draft comment file under issues//comments/', as test('github.createIssue writes a draft issue file under issues/', async () => { const root = await tempMount(); try { - const client = createGithubClient({ relayfileMountRoot: root }); + const client = createGithubClient({ relayfileMountRoot: root, writebackTimeoutMs: 0 }); await client.createIssue({ owner: 'o', repo: 'r', @@ -55,7 +55,7 @@ test('github.createIssue writes a draft issue file under issues/', async () => { test('github.createPullRequest writes a draft pull request file under pulls/', async () => { const root = await tempMount(); try { - const client = createGithubClient({ relayfileMountRoot: root }); + const client = createGithubClient({ relayfileMountRoot: root, writebackTimeoutMs: 0 }); await client.createPullRequest({ owner: 'o', repo: 'r', @@ -97,7 +97,7 @@ test('github.upsertIssue updates an existing flat issue match', async () => { }) ); - const client = createGithubClient({ relayfileMountRoot: root }); + const client = createGithubClient({ relayfileMountRoot: root, writebackTimeoutMs: 0 }); const result = await client.upsertIssue({ owner: 'o', repo: 'r', @@ -130,7 +130,7 @@ test('github.upsertIssue ignores a closed issue title match', async () => { }) ); - const client = createGithubClient({ relayfileMountRoot: root }); + const client = createGithubClient({ relayfileMountRoot: root, writebackTimeoutMs: 0 }); const result = await client.upsertIssue({ owner: 'o', repo: 'r', @@ -153,7 +153,7 @@ test('github.upsertIssue ignores a closed issue title match', async () => { test('github.upsertIssue creates a draft when no open match exists', async () => { const root = await tempMount(); try { - const client = createGithubClient({ relayfileMountRoot: root }); + const client = createGithubClient({ relayfileMountRoot: root, writebackTimeoutMs: 0 }); const result = await client.upsertIssue({ owner: 'o', repo: 'r', @@ -188,7 +188,7 @@ test('github.getPr reads meta + diff from canonical paths', async () => { ); await writeFile(path.join(pullRoot, 'diff.patch'), 'diff --git a/x b/x\n'); - const client = createGithubClient({ relayfileMountRoot: root }); + const client = createGithubClient({ relayfileMountRoot: root, writebackTimeoutMs: 0 }); const pr = await client.getPr({ owner: 'o', repo: 'r', number: 42 }); assert.equal(pr.title, 'Add deploy v1'); assert.equal(pr.head, 'feature'); @@ -217,7 +217,7 @@ test('github.getPr reads a flat canonical pull request file', async () => { }) ); - const client = createGithubClient({ relayfileMountRoot: root }); + const client = createGithubClient({ relayfileMountRoot: root, writebackTimeoutMs: 0 }); const pr = await client.getPr({ owner: 'o', repo: 'r', number: 42 }); assert.equal(pr.title, 'Add deploy v1'); assert.equal(pr.head, 'feature'); @@ -232,7 +232,7 @@ test('github.getPr reads a flat canonical pull request file', async () => { test('github.postReview writes a review draft under pulls//reviews/', async () => { const root = await tempMount(); try { - const client = createGithubClient({ relayfileMountRoot: root }); + const client = createGithubClient({ relayfileMountRoot: root, writebackTimeoutMs: 0 }); await client.postReview( { owner: 'o', repo: 'r', number: 42 }, { @@ -256,7 +256,7 @@ test('github.postReview writes a review draft under pulls//reviews/', async ( test('github.postReview accepts COMMENT/APPROVE/REQUEST_CHANGES events', async () => { const root = await tempMount(); try { - const client = createGithubClient({ relayfileMountRoot: root }); + const client = createGithubClient({ relayfileMountRoot: root, writebackTimeoutMs: 0 }); for (const event of ['COMMENT', 'APPROVE', 'REQUEST_CHANGES'] as const) { await client.postReview({ owner: 'o', repo: 'r', number: event === 'COMMENT' ? 1 : event === 'APPROVE' ? 2 : 3 }, { body: event.toLowerCase(), diff --git a/packages/runtime/src/clients/jira.test.ts b/packages/runtime/src/clients/jira.test.ts index 7286d17..dd174a6 100644 --- a/packages/runtime/src/clients/jira.test.ts +++ b/packages/runtime/src/clients/jira.test.ts @@ -12,7 +12,7 @@ async function tempMount(): Promise { test('jira createIssue writes a Jira issue draft', async () => { const root = await tempMount(); try { - const client = createJiraClient({ relayfileMountRoot: root }); + const client = createJiraClient({ relayfileMountRoot: root, writebackTimeoutMs: 0 }); await client.createIssue({ cloudId: 'cloud_1', fields: { project: { key: 'ENG' }, summary: 'Ship it', issuetype: { name: 'Task' } } @@ -33,7 +33,7 @@ test('jira createIssue writes a Jira issue draft', async () => { test('jira transition writes an issue transition draft', async () => { const root = await tempMount(); try { - const client = createJiraClient({ relayfileMountRoot: root }); + const client = createJiraClient({ relayfileMountRoot: root, writebackTimeoutMs: 0 }); await client.transition({ cloudId: 'cloud_1', issueIdOrKey: 'ENG-1' }, '31'); const dir = path.join(root, 'jira/issues/ENG-1/transitions'); diff --git a/packages/runtime/src/clients/linear.test.ts b/packages/runtime/src/clients/linear.test.ts index 9f2dd51..6933670 100644 --- a/packages/runtime/src/clients/linear.test.ts +++ b/packages/runtime/src/clients/linear.test.ts @@ -12,7 +12,7 @@ async function tempMount(): Promise { test('linear createIssue writes an issue draft', async () => { const root = await tempMount(); try { - const client = createLinearClient({ relayfileMountRoot: root }); + const client = createLinearClient({ relayfileMountRoot: root, writebackTimeoutMs: 0 }); await client.createIssue({ teamId: 'team_1', title: 'Ship it', description: 'Soon' }); const dir = path.join(root, 'linear/issues'); @@ -38,7 +38,7 @@ test('linear getIssue reads a canonical issue file', async () => { JSON.stringify({ id: 'i1', identifier: 'ENG-1', title: 'Ship it', description: null, url: 'https://linear.app/i1', state: null }) ); - const client = createLinearClient({ relayfileMountRoot: root }); + const client = createLinearClient({ relayfileMountRoot: root, writebackTimeoutMs: 0 }); assert.equal((await client.getIssue('ENG-1')).identifier, 'ENG-1'); } finally { await rm(root, { recursive: true, force: true }); diff --git a/packages/runtime/src/clients/notion.test.ts b/packages/runtime/src/clients/notion.test.ts index 8266c57..98b457d 100644 --- a/packages/runtime/src/clients/notion.test.ts +++ b/packages/runtime/src/clients/notion.test.ts @@ -12,7 +12,7 @@ async function tempMount(): Promise { test('notion createPage writes a database page draft', async () => { const root = await tempMount(); try { - const client = createNotionClient({ relayfileMountRoot: root }); + const client = createNotionClient({ relayfileMountRoot: root, writebackTimeoutMs: 0 }); await client.createPage( { database_id: 'db_1' }, { Name: { title: [{ text: { content: 'Digest' } }] } }, @@ -32,7 +32,7 @@ test('notion createPage writes a database page draft', async () => { }); test('notion createPage requires a database parent for file writeback', async () => { - const client = createNotionClient({ relayfileMountRoot: '/tmp/unused' }); + const client = createNotionClient({ relayfileMountRoot: '/tmp/unused', writebackTimeoutMs: 0 }); await assert.rejects( () => client.createPage({}, {}, []), /parent\.database_id/ diff --git a/packages/runtime/src/clients/request.ts b/packages/runtime/src/clients/request.ts index 7d16794..45acaa5 100644 --- a/packages/runtime/src/clients/request.ts +++ b/packages/runtime/src/clients/request.ts @@ -16,8 +16,8 @@ import { WorkforceIntegrationError } from '../errors.js'; * * The handler-side ergonomics stay identical to the direct-REST shape * — `await ctx.github.comment(target, body)` returns when the write - * lands. Whether the receipt is awaited synchronously, polled, or - * fired-and-forgotten depends on `writebackTimeoutMs`. + * lands and, by default, waits briefly for the provider receipt. + * Setting `writebackTimeoutMs` to `0` keeps fire-and-forget behavior. */ export interface IntegrationClientOptions { /** Absolute path to the Relayfile mount the handler is running in. */ @@ -30,9 +30,8 @@ export interface IntegrationClientOptions { workspaceCwd?: string; /** * Max wait, in ms, for the Relayfile writeback worker to emit a - * receipt onto the just-written draft. `0` (default) means - * fire-and-forget — the client returns immediately and the receipt - * is whatever was readable at write time. + * receipt onto the just-written draft. Defaults to 3000ms. `0` means + * fire-and-forget — the client returns immediately without a receipt. */ writebackTimeoutMs?: number; /** Poll interval while waiting for a receipt. Default 250ms. */ @@ -71,6 +70,8 @@ export interface WritebackResult { receipt?: WritebackReceipt; } +const DEFAULT_WRITEBACK_TIMEOUT_MS = 3_000; + function isRecord(value: unknown): value is Record { return typeof value === 'object' && value !== null && !Array.isArray(value); } @@ -196,7 +197,7 @@ function isNoEntryError(error: unknown): boolean { /** * Write a draft JSON payload atomically (write-then-rename) so the * writeback worker never sees a partial file. Waits for a receipt - * when `writebackTimeoutMs > 0`; otherwise returns immediately. + * by default; pass `writebackTimeoutMs: 0` to return immediately. */ export async function writeJsonFile( client: IntegrationClientOptions, @@ -222,7 +223,7 @@ async function waitForReceipt( absolutePath: string, client: IntegrationClientOptions ): Promise { - const timeoutMs = client.writebackTimeoutMs ?? 0; + const timeoutMs = client.writebackTimeoutMs ?? DEFAULT_WRITEBACK_TIMEOUT_MS; // Fire-and-forget: never reinterpret the just-written draft as a // receipt. The draft payload may legitimately carry top-level `id` / // `path` / `created` fields (e.g. an upsert update writing back the diff --git a/packages/runtime/src/clients/slack.test.ts b/packages/runtime/src/clients/slack.test.ts index 3a153de..4151f55 100644 --- a/packages/runtime/src/clients/slack.test.ts +++ b/packages/runtime/src/clients/slack.test.ts @@ -1,6 +1,6 @@ import test from 'node:test'; import assert from 'node:assert/strict'; -import { mkdtemp, readFile, readdir, rm } from 'node:fs/promises'; +import { mkdtemp, readFile, readdir, rm, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import path from 'node:path'; import { WorkforceIntegrationError } from '../errors.js'; @@ -13,7 +13,7 @@ async function tempMount(): Promise { test('slack post writes a channel message draft', async () => { const root = await tempMount(); try { - const client = createSlackClient({ relayfileMountRoot: root }); + const client = createSlackClient({ relayfileMountRoot: root, writebackTimeoutMs: 0 }); await client.post('C123', 'hello'); const dir = path.join(root, 'slack/channels/C123/messages'); @@ -27,10 +27,39 @@ test('slack post writes a channel message draft', async () => { } }); +test('slack post waits for the writeback receipt by default', async () => { + const root = await tempMount(); + const dir = path.join(root, 'slack/channels/C123/messages'); + let timer: NodeJS.Timeout | undefined; + try { + const client = createSlackClient({ relayfileMountRoot: root, writebackPollMs: 10 }); + timer = setInterval(() => { + void readdir(dir) + .then(async (files) => { + const file = files.find((name) => name.endsWith('.json')); + if (!file) return; + await writeFile( + path.join(dir, file), + JSON.stringify({ created: '1716490000.123456', url: 'https://slack.example/C123/p1716490000123456' }), + 'utf8' + ); + if (timer) clearInterval(timer); + }) + .catch(() => undefined); + }, 10); + + const result = await client.post('C123', 'hello'); + assert.equal(result.ts, '1716490000.123456'); + } finally { + if (timer) clearInterval(timer); + await rm(root, { recursive: true, force: true }); + } +}); + test('slack dm writes a user direct-message draft', async () => { const root = await tempMount(); try { - const client = createSlackClient({ relayfileMountRoot: root }); + const client = createSlackClient({ relayfileMountRoot: root, writebackTimeoutMs: 0 }); await client.dm('U123', 'ping'); const dir = path.join(root, 'slack/users/U123/messages'); @@ -45,7 +74,7 @@ test('slack dm writes a user direct-message draft', async () => { }); test('slack reply rejects malformed string thread refs', async () => { - const client = createSlackClient({ relayfileMountRoot: '/tmp/unused' }); + const client = createSlackClient({ relayfileMountRoot: '/tmp/unused', writebackTimeoutMs: 0 }); await assert.rejects( () => client.reply('missing-ts', 'hello'), (error) => error instanceof WorkforceIntegrationError && error.provider === 'slack' @@ -53,7 +82,7 @@ test('slack reply rejects malformed string thread refs', async () => { }); test('slack reply rejects malformed object thread refs', async () => { - const client = createSlackClient({ relayfileMountRoot: '/tmp/unused' }); + const client = createSlackClient({ relayfileMountRoot: '/tmp/unused', writebackTimeoutMs: 0 }); await assert.rejects( () => client.reply({ channel: '', ts: '123.456' }, 'hello'), (error) => error instanceof WorkforceIntegrationError && error.provider === 'slack' diff --git a/packages/runtime/src/cloud-defaults.ts b/packages/runtime/src/cloud-defaults.ts index 6d941c9..0f9b9e0 100644 --- a/packages/runtime/src/cloud-defaults.ts +++ b/packages/runtime/src/cloud-defaults.ts @@ -12,6 +12,10 @@ import { type PersonaSpec } from '@agentworkforce/persona-kit'; import { createGithubClient } from './clients/github.js'; +import { createJiraClient } from './clients/jira.js'; +import { createLinearClient } from './clients/linear.js'; +import { createNotionClient } from './clients/notion.js'; +import { createSlackClient } from './clients/slack.js'; import type { FilesContext, HarnessRunArgs, @@ -20,17 +24,30 @@ import type { SandboxContext, WorkforceAgentContext, WorkforceCtx, - WorkforceDeploymentContext + WorkforceDeploymentContext, + WorkflowContext } from './types.js'; type AgentInputValue = string | number | boolean | null | undefined; const USAGE_REPORT_TIMEOUT_MS = 5_000; +const WORKFLOW_COMPLETION_POLL_MS = 1_000; +const WORKFLOW_COMPLETION_TIMEOUT_MS = 90 * 60_000; +const WORKFLOW_FETCH_TIMEOUT_MS = 15_000; +const WORKFLOW_COMPLETION_MAX_TRANSIENT_ERRORS = 3; +const WORKFLOW_INVOCATION_HEADER = 'x-agentworkforce-workspace-workflow-invocation'; interface AgentRowContext extends WorkforceAgentContext { input_values?: Record; inputValues?: Record; } +class WorkflowRequestError extends Error { + constructor(message: string, readonly retryable: boolean) { + super(message); + this.name = 'WorkflowRequestError'; + } +} + export interface CloudDefaultOptions { persona: PersonaSpec; agent: AgentRowContext; @@ -44,6 +61,7 @@ export interface CloudRuntimeDefaults { sandbox: SandboxContext; files: FilesContext; integrations?: Record; + workflow?: WorkflowContext; harnessRunner: (args: HarnessRunArgs) => Promise; } @@ -58,10 +76,15 @@ export function createCloudRuntimeDefaults(options: CloudDefaultOptions): CloudR workspaceRoot: root, env }); + const workflow = createDefaultWorkflow({ + workspaceRoot: root, + env + }); return { sandbox, files, ...(integrations ? { integrations } : {}), + ...(workflow ? { workflow } : {}), harnessRunner: createProcessHarnessRunner({ ...options, workspaceRoot: root, @@ -139,22 +162,284 @@ function createDefaultIntegrations(args: { env: NodeJS.ProcessEnv; }): Record | undefined { const integrations: Record = {}; + const common = { + relayfileMountRoot: firstNonEmpty(args.env.RELAYFILE_MOUNT_ROOT, args.env.RELAYFILE_ROOT) ?? args.workspaceRoot, + workspaceCwd: args.workspaceRoot, + workspaceId: args.workspaceId, + writebackTimeoutMs: numberFromEnv(args.env.WORKFORCE_RELAYFILE_WRITEBACK_TIMEOUT_MS), + writebackPollMs: numberFromEnv(args.env.WORKFORCE_RELAYFILE_WRITEBACK_POLL_MS), + relayfileBaseUrl: args.env.RELAYFILE_BASE_URL, + relayfileApiToken: args.env.RELAYFILE_TOKEN + }; + const workspaceCloudApiToken = firstNonEmpty(args.env.WORKFORCE_WORKSPACE_TOKEN); + const cloudApiToken = firstNonEmpty(workspaceCloudApiToken, args.env.WORKFORCE_AGENT_TOKEN); if (args.persona.integrations?.github) { integrations.github = createGithubClient({ - relayfileMountRoot: firstNonEmpty(args.env.RELAYFILE_MOUNT_ROOT, args.env.RELAYFILE_ROOT) ?? args.workspaceRoot, - workspaceCwd: args.workspaceRoot, - workspaceId: args.workspaceId, - writebackTimeoutMs: numberFromEnv(args.env.WORKFORCE_RELAYFILE_WRITEBACK_TIMEOUT_MS), - writebackPollMs: numberFromEnv(args.env.WORKFORCE_RELAYFILE_WRITEBACK_POLL_MS), + ...common, connectionId: args.env.WORKFORCE_INTEGRATION_GITHUB_CONNECTION_ID, - relayfileBaseUrl: args.env.RELAYFILE_BASE_URL, - relayfileApiToken: args.env.RELAYFILE_TOKEN, - cloudApiToken: firstNonEmpty(args.env.WORKFORCE_AGENT_TOKEN, args.env.WORKFORCE_WORKSPACE_TOKEN) + cloudApiToken + }); + } + if (args.persona.integrations?.slack && workspaceCloudApiToken) { + integrations.slack = createSlackClient({ + ...common, + connectionId: args.env.WORKFORCE_INTEGRATION_SLACK_CONNECTION_ID, + cloudApiToken: workspaceCloudApiToken, + slackTeamId: args.env.WORKFORCE_INTEGRATION_SLACK_TEAM_ID + }); + } + if (args.persona.integrations?.linear) { + integrations.linear = createLinearClient({ + ...common, + connectionId: args.env.WORKFORCE_INTEGRATION_LINEAR_CONNECTION_ID + }); + } + if (args.persona.integrations?.notion) { + integrations.notion = createNotionClient({ + ...common, + connectionId: args.env.WORKFORCE_INTEGRATION_NOTION_CONNECTION_ID + }); + } + if (args.persona.integrations?.jira && workspaceCloudApiToken) { + integrations.jira = createJiraClient({ + ...common, + connectionId: args.env.WORKFORCE_INTEGRATION_JIRA_CONNECTION_ID, + cloudApiToken: workspaceCloudApiToken }); } return Object.keys(integrations).length > 0 ? integrations : undefined; } +function createDefaultWorkflow(args: { + workspaceRoot: string; + env: NodeJS.ProcessEnv; +}): WorkflowContext | undefined { + const token = firstNonEmpty(args.env.WORKFORCE_WORKSPACE_TOKEN); + const baseUrl = firstNonEmpty(args.env.WORKFORCE_CLOUD_BASE_URL); + if (!token || !baseUrl) return undefined; + const base = normalizeBaseUrl(baseUrl); + return { + async run(name, runArgs) { + const workflowSource = await readBundledWorkflowSource(args.workspaceRoot, name); + const response = await fetchWorkflow(`${base}/api/v1/workflows/run`, { + method: 'POST', + headers: workflowHeaders(token, true), + body: JSON.stringify({ + workflow: workflowSource, + fileType: 'ts', + sourceFileType: 'workflow', + runtime: { id: 'daytona' }, + metadata: { + invocationSlug: name, + invocationArgs: JSON.stringify(runArgs ?? {}) + } + }) + }); + if (!response.ok) { + throw await workflowError(response, `ctx.workflow.run("${name}")`); + } + const payload = await response.json().catch(() => ({})) as { runId?: unknown }; + if (typeof payload.runId !== 'string' || payload.runId.trim().length === 0) { + throw new Error(`ctx.workflow.run("${name}"): cloud response missing runId`); + } + const runId = payload.runId; + return { + runId, + completion: () => pollWorkflowCompletion({ base, token, runId }) + }; + }, + status(runId) { + return fetchWorkflowStatus({ base, token, runId }); + } + }; +} + +async function readBundledWorkflowSource(workspaceRoot: string, name: string): Promise { + const workflowFile = normalizeWorkflowName(name); + const roots = uniqueStrings([process.cwd(), workspaceRoot]); + const candidates = roots.flatMap((root) => [ + path.resolve(root, 'workflows', workflowFile), + path.resolve(root, workflowFile) + ]); + for (const candidate of candidates) { + try { + return await readFile(candidate, 'utf8'); + } catch (err) { + if (!isNoEntry(err)) throw err; + } + } + throw new Error( + `ctx.workflow.run("${name}") could not find bundled workflow source; expected ${candidates.join(' or ')}` + ); +} + +function normalizeWorkflowName(name: string): string { + const trimmed = name.trim(); + if (!trimmed) { + throw new Error('ctx.workflow.run() requires a non-empty workflow name'); + } + if (path.isAbsolute(trimmed) || trimmed.split(/[\\/]/).some((segment) => segment === '..' || segment === '')) { + throw new Error(`ctx.workflow.run("${name}") workflow name must be a safe relative path`); + } + return trimmed.endsWith('.ts') ? trimmed : `${trimmed}.ts`; +} + +async function pollWorkflowCompletion(args: { + base: string; + token: string; + runId: string; +}): Promise<{ output: unknown; status: 'success' | 'failure' }> { + const deadline = Date.now() + WORKFLOW_COMPLETION_TIMEOUT_MS; + let transientErrors = 0; + let lastTransientError: unknown; + while (Date.now() < deadline) { + let status: Awaited>; + try { + status = await fetchWorkflowStatus(args); + transientErrors = 0; + lastTransientError = undefined; + } catch (err) { + if (err instanceof WorkflowRequestError && err.retryable && transientErrors < WORKFLOW_COMPLETION_MAX_TRANSIENT_ERRORS) { + transientErrors += 1; + lastTransientError = err; + await delay(WORKFLOW_COMPLETION_POLL_MS); + continue; + } + throw err; + } + if (status.status === 'success') { + return { status: 'success', output: status.output }; + } + if (status.status === 'failure') { + return { status: 'failure', output: status.output ?? status.error }; + } + await delay(WORKFLOW_COMPLETION_POLL_MS); + } + if (lastTransientError instanceof Error) { + throw new Error( + `ctx.workflow.run("${args.runId}").completion(): timed out after ${WORKFLOW_COMPLETION_TIMEOUT_MS}ms; last status poll error: ${lastTransientError.message}` + ); + } + throw new Error(`ctx.workflow.run("${args.runId}").completion(): timed out after ${WORKFLOW_COMPLETION_TIMEOUT_MS}ms`); +} + +async function fetchWorkflowStatus(args: { + base: string; + token: string; + runId: string; +}): Promise<{ status: 'pending' | 'running' | 'success' | 'failure'; output?: unknown; error?: string; patches?: unknown }> { + const runId = args.runId.trim(); + if (!runId) { + throw new Error('ctx.workflow.status() requires a non-empty runId'); + } + const response = await fetchWorkflow(`${args.base}/api/v1/workflows/runs/${encodeURIComponent(runId)}`, { + method: 'GET', + headers: workflowHeaders(args.token, false) + }); + if (!response.ok) { + throw await workflowError(response, `ctx.workflow.status("${runId}")`); + } + const body = await response.json().catch(() => ({})) as Record; + const status = normalizeWorkflowStatus(body.status); + const output = normalizeWorkflowOutput(body); + return { + status, + ...(output !== undefined ? { output } : {}), + ...(body.patches !== undefined ? { patches: body.patches } : {}), + ...(typeof body.error === 'string' ? { error: body.error } : {}) + }; +} + +function normalizeWorkflowStatus(value: unknown): 'pending' | 'running' | 'success' | 'failure' { + switch (value) { + case 'pending': + case 'queued': + case 'starting': + case 'created': + case 'submitted': + case 'dispatching': + return 'pending'; + case 'running': + case 'in_progress': + case 'in-progress': + return 'running'; + case 'success': + case 'succeeded': + case 'completed': + case 'complete': + return 'success'; + case 'failure': + case 'failed': + case 'cancelled': + case 'canceled': + case 'timed_out': + case 'timeout': + case 'timed-out': + return 'failure'; + default: + return 'pending'; + } +} + +function normalizeWorkflowOutput(body: Record): unknown { + if (body.output !== undefined) return body.output; + if (body.result !== undefined && body.patches !== undefined) { + return { result: body.result, patches: body.patches }; + } + if (body.result !== undefined) return body.result; + if (body.patches !== undefined) return { patches: body.patches }; + return undefined; +} + +function workflowHeaders(token: string, delegated: boolean): Record { + return { + accept: 'application/json', + 'content-type': 'application/json', + authorization: `Bearer ${token}`, + ...(delegated ? { [WORKFLOW_INVOCATION_HEADER]: 'true' } : {}) + }; +} + +async function workflowError(response: Response, label: string): Promise { + const body = await response.text().catch(() => ''); + const excerpt = body.length > 400 ? `${body.slice(0, 400)}...` : body; + return new WorkflowRequestError( + `${label}: ${response.status} ${response.statusText}${excerpt ? ` - ${excerpt}` : ''}`, + // Cloud workflow routes use 401 for missing/invalid/not-yet-propagated + // tokens, and 403 only for valid tokens that lack scope or workspace + // access. Retry 401 briefly for post-mint propagation; fail 403 fast so + // scope/tenant misconfiguration is not hidden behind retry noise. + response.status === 401 || response.status === 429 || response.status >= 500 + ); +} + +async function fetchWorkflow(input: string, init: RequestInit): Promise { + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), WORKFLOW_FETCH_TIMEOUT_MS); + try { + return await fetch(input, { ...init, signal: controller.signal }); + } catch (err) { + const message = err instanceof Error && err.name === 'AbortError' + ? `timeout after ${WORKFLOW_FETCH_TIMEOUT_MS}ms` + : err instanceof Error ? err.message : String(err); + throw new WorkflowRequestError(`workflow request failed: ${message}`, true); + } finally { + clearTimeout(timer); + } +} + +function normalizeBaseUrl(value: string): string { + return value.replace(/\/+$/, ''); +} + +function uniqueStrings(values: string[]): string[] { + return [...new Set(values)]; +} + +function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + function createProcessHarnessRunner(args: CloudDefaultOptions & { workspaceRoot: string; env: NodeJS.ProcessEnv; diff --git a/packages/runtime/src/runner.test.ts b/packages/runtime/src/runner.test.ts index 509e8f2..0cda576 100644 --- a/packages/runtime/src/runner.test.ts +++ b/packages/runtime/src/runner.test.ts @@ -7,6 +7,8 @@ import os from 'node:os'; import type { AddressInfo } from 'node:net'; import type { PersonaSpec } from '@agentworkforce/persona-kit'; import { startRunner } from './runner.js'; +import { createCloudRuntimeDefaults } from './cloud-defaults.js'; +import { buildCtx } from './ctx.js'; import { handler } from './handler.js'; import type { RawGatewayEnvelope } from './shim.js'; import type { @@ -171,7 +173,8 @@ test('startRunner supplies cloud github and harness defaults when generated runn 'WORKFORCE_SANDBOX_ROOT', 'RELAYFILE_MOUNT_ROOT', 'WORKFORCE_USAGE_URL', - 'WORKFORCE_DEPLOYMENT_TOKEN' + 'WORKFORCE_DEPLOYMENT_TOKEN', + 'WORKFORCE_RELAYFILE_WRITEBACK_TIMEOUT_MS' ]); try { await writeFakeHarness(binDir, 'claude', [ @@ -183,6 +186,7 @@ test('startRunner supplies cloud github and harness defaults when generated runn process.env.RELAYFILE_MOUNT_ROOT = workspaceRoot; process.env.WORKFORCE_USAGE_URL = usageUrl; process.env.WORKFORCE_DEPLOYMENT_TOKEN = 'deployment-token'; + process.env.WORKFORCE_RELAYFILE_WRITEBACK_TIMEOUT_MS = '0'; let harnessOutput = ''; let pullRequestUrl = ''; @@ -273,6 +277,191 @@ test('startRunner supplies cloud github and harness defaults when generated runn } }); +test('createCloudRuntimeDefaults builds slack integrations and workflow when workspace cloud env is present', async () => { + const dir = await mkdtemp(path.join(os.tmpdir(), 'wf-runtime-cloud-workflow-')); + await mkdir(path.join(dir, 'workflows'), { recursive: true }); + await writeFile( + path.join(dir, 'workflows/cloud-small-issue-codex.ts'), + 'console.log("workflow body");\n', + 'utf8' + ); + const requests: Array<{ method?: string; url?: string; headers: http.IncomingHttpHeaders; body?: unknown }> = []; + const server = http.createServer((req, res) => { + let body = ''; + req.setEncoding('utf8'); + req.on('data', (chunk) => { + body += chunk; + }); + req.on('end', () => { + requests.push({ + method: req.method, + url: req.url, + headers: req.headers, + body: body ? JSON.parse(body) : undefined + }); + if (req.method === 'POST' && req.url === '/api/v1/workflows/run') { + res.writeHead(200, { 'content-type': 'application/json' }).end(JSON.stringify({ runId: 'run-123', status: 'pending' })); + return; + } + if (req.method === 'GET' && req.url === '/api/v1/workflows/runs/run-123') { + res.writeHead(200, { 'content-type': 'application/json' }).end(JSON.stringify({ + status: 'completed', + result: { summary: 'done' }, + patches: { + main: { + pushedTo: { + prUrl: 'https://example.test/pr/1' + } + } + } + })); + return; + } + res.writeHead(404).end(); + }); + }); + await new Promise((resolve) => server.listen(0, '127.0.0.1', resolve)); + const address = server.address(); + assert.ok(address && typeof address === 'object'); + + try { + const defaults = createCloudRuntimeDefaults({ + persona: { + ...persona, + integrations: { + slack: { triggers: [{ on: 'message' }] }, + linear: { triggers: [{ on: 'Issue' }] }, + notion: { triggers: [{ on: 'page.created' }] }, + jira: { triggers: [{ on: 'issue.created' }] } + } + }, + agent: runtimeAgent, + deployment: runtimeDeployment, + workspaceId: 'ws-test', + log: () => { + /* keep test output quiet */ + }, + env: { + WORKFORCE_SANDBOX_ROOT: dir, + RELAYFILE_MOUNT_ROOT: dir, + WORKFORCE_WORKSPACE_TOKEN: 'workspace-token', + WORKFORCE_CLOUD_BASE_URL: `http://127.0.0.1:${address.port}`, + WORKFORCE_RELAYFILE_WRITEBACK_TIMEOUT_MS: '0' + } + }); + + assert.ok(defaults.integrations?.slack, 'slack client should be attached'); + assert.ok(defaults.integrations?.linear, 'linear client should be attached'); + assert.ok(defaults.integrations?.notion, 'notion client should be attached'); + assert.ok(defaults.integrations?.jira, 'jira client should be attached'); + assert.ok(defaults.workflow, 'workflow context should be attached'); + + const handle = await defaults.workflow.run('cloud-small-issue-codex', { issue: 1028 }); + assert.equal(handle.runId, 'run-123'); + assert.deepEqual(await handle.completion(), { + status: 'success', + output: { + result: { summary: 'done' }, + patches: { + main: { + pushedTo: { + prUrl: 'https://example.test/pr/1' + } + } + } + } + }); + const status = await defaults.workflow.status(' run-123 '); + assert.deepEqual(status, { + status: 'success', + output: { + result: { summary: 'done' }, + patches: { + main: { + pushedTo: { + prUrl: 'https://example.test/pr/1' + } + } + } + }, + patches: { + main: { + pushedTo: { + prUrl: 'https://example.test/pr/1' + } + } + } + }); + + const post = requests.find((request) => request.method === 'POST'); + assert.equal(post?.headers.authorization, 'Bearer workspace-token'); + assert.equal(post?.headers['x-agentworkforce-workspace-workflow-invocation'], 'true'); + assert.deepEqual(post?.body, { + workflow: 'console.log("workflow body");\n', + fileType: 'ts', + sourceFileType: 'workflow', + runtime: { id: 'daytona' }, + metadata: { + invocationSlug: 'cloud-small-issue-codex', + invocationArgs: JSON.stringify({ issue: 1028 }) + } + }); + const get = requests.find((request) => request.method === 'GET'); + assert.equal(get?.headers.authorization, 'Bearer workspace-token'); + } finally { + await new Promise((resolve) => server.close(() => resolve())); + await rm(dir, { recursive: true, force: true }); + } +}); + +test('createCloudRuntimeDefaults omits token-backed slack and workflow when workspace cloud env is absent', async () => { + const dir = await mkdtemp(path.join(os.tmpdir(), 'wf-runtime-cloud-missing-token-')); + try { + const defaults = createCloudRuntimeDefaults({ + persona: { + ...persona, + integrations: { + slack: { triggers: [{ on: 'message' }] }, + jira: { triggers: [{ on: 'issue.created' }] } + } + }, + agent: runtimeAgent, + deployment: runtimeDeployment, + workspaceId: 'ws-test', + log: () => { + /* keep test output quiet */ + }, + env: { + WORKFORCE_SANDBOX_ROOT: dir, + RELAYFILE_MOUNT_ROOT: dir, + WORKFORCE_RELAYFILE_WRITEBACK_TIMEOUT_MS: '0' + } + }); + + assert.equal(defaults.integrations, undefined); + assert.equal(defaults.workflow, undefined); + + const ctx = buildCtx({ + persona, + agent: runtimeAgent, + deployment: runtimeDeployment, + workspaceId: 'ws-test', + sandbox: defaults.sandbox, + files: defaults.files, + harnessRunner: async () => ({ output: '', exitCode: 0, durationMs: 0 }), + log: () => { + /* keep test output quiet */ + } + }); + await assert.rejects( + () => ctx.workflow.status('run-123'), + /ctx.workflow is unavailable: the runner is not connected to the workforce workflows API/ + ); + } finally { + await rm(dir, { recursive: true, force: true }); + } +}); + test('buildCtx rejects integrations that collide with core fields', async () => { const { buildCtx } = await import('./ctx.js'); assert.throws( diff --git a/packages/runtime/src/runner.ts b/packages/runtime/src/runner.ts index df43cc8..365509d 100644 --- a/packages/runtime/src/runner.ts +++ b/packages/runtime/src/runner.ts @@ -113,7 +113,9 @@ export async function startRunner(options: StartRunnerOptions): Promise { harnessRunner: options.harnessRunner ?? cloudDefaults.harnessRunner, ...(options.subsystems?.llm ? { llm: options.subsystems.llm } : {}), ...(options.subsystems?.memory ? { memory: options.subsystems.memory } : {}), - ...(options.subsystems?.workflow ? { workflow: options.subsystems.workflow } : {}), + ...(options.subsystems?.workflow ?? cloudDefaults.workflow + ? { workflow: options.subsystems?.workflow ?? cloudDefaults.workflow } + : {}), ...(options.subsystems?.schedule ? { schedule: options.subsystems.schedule } : {}), ...(options.subsystems?.log ? { log: options.subsystems.log } : {}), ...(Object.keys(integrations).length > 0 ? { integrations } : {}) diff --git a/packages/runtime/src/types.ts b/packages/runtime/src/types.ts index 9929744..e656826 100644 --- a/packages/runtime/src/types.ts +++ b/packages/runtime/src/types.ts @@ -161,7 +161,7 @@ export interface WorkflowRunHandle { export interface WorkflowContext { run(name: string, args?: Record): Promise; - status(runId: string): Promise<{ status: 'pending' | 'success' | 'failure'; output?: unknown; error?: string }>; + status(runId: string): Promise<{ status: 'pending' | 'running' | 'success' | 'failure'; output?: unknown; error?: string; patches?: unknown }>; } export interface ScheduleContext {