diff --git a/package-lock.json b/package-lock.json index 0698fa2..17b376d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,7 @@ "version": "0.18.1", "license": "MIT", "dependencies": { - "@agentage/core": "0.9.0", + "@agentage/core": "^0.10.0", "@agentage/platform": "^0.5.0", "@supabase/supabase-js": "2.103.3", "ajv": "8.18.0", @@ -46,9 +46,9 @@ } }, "node_modules/@agentage/core": { - "version": "0.9.0", - "resolved": "https://registry.npmjs.org/@agentage/core/-/core-0.9.0.tgz", - "integrity": "sha512-dV7nfltYlbJL6Fme4/pRQOQnC9jNpLHMAXSnlfer5PRksc8ST+mNSBew61rzlxoN87YNY4gl23arvGYBAWQY2g==", + "version": "0.10.0", + "resolved": "https://registry.npmjs.org/@agentage/core/-/core-0.10.0.tgz", + "integrity": "sha512-pC3hKljpfyviobpHjf3abMImcFgCWSf7q5PA0sHgYhSGjB3YRTlpjZei5VKPnLseCSc7NfPLLJGtGAL6Wh8ZJw==", "license": "MIT", "engines": { "node": ">=22.0.0", diff --git a/package.json b/package.json index 8a4bc7a..96c4dd8 100644 --- a/package.json +++ b/package.json @@ -29,7 +29,7 @@ "prepublishOnly": "npm run verify" }, "dependencies": { - "@agentage/core": "0.9.0", + "@agentage/core": "^0.10.0", "@agentage/platform": "^0.5.0", "@supabase/supabase-js": "2.103.3", "ajv": "8.18.0", diff --git a/src/daemon/actions.test.ts b/src/daemon/actions.test.ts new file mode 100644 index 0000000..640fa35 --- /dev/null +++ b/src/daemon/actions.test.ts @@ -0,0 +1,42 @@ +import { describe, expect, it, beforeEach, vi } from 'vitest'; + +vi.mock('../utils/version.js', () => ({ VERSION: '0.99.9' })); +import type * as CoreModule from '@agentage/core'; + +vi.mock('@agentage/core', async (importActual) => { + const actual = await importActual(); + return { + ...actual, + shell: vi.fn(), + }; +}); + +import { getActionRegistry, resetActionRegistry } from './actions.js'; + +describe('action registry bootstrap', () => { + beforeEach(() => { + resetActionRegistry(); + }); + + it('registers the three built-in actions with expected manifests', () => { + const names = getActionRegistry() + .list() + .map((m) => m.name) + .sort(); + expect(names).toEqual(['agent:install', 'cli:update', 'project:addFromOrigin']); + }); + + it('each action declares a distinct capability and machine scope', () => { + const manifests = getActionRegistry().list(); + const caps = new Set(manifests.map((m) => m.capability)); + expect(caps.size).toBe(manifests.length); + for (const m of manifests) { + expect(m.scope).toBe('machine'); + expect(m.capability).toMatch(/\.(read|write)$/); + } + }); + + it('singleton returns the same registry across calls', () => { + expect(getActionRegistry()).toBe(getActionRegistry()); + }); +}); diff --git a/src/daemon/actions.ts b/src/daemon/actions.ts new file mode 100644 index 0000000..134fef4 --- /dev/null +++ b/src/daemon/actions.ts @@ -0,0 +1,36 @@ +import { createRegistry, shell, type ActionRegistry } from '@agentage/core'; +import { VERSION } from '../utils/version.js'; +import { createAgentInstallAction } from './actions/agent-install.js'; +import { createCliUpdateAction } from './actions/cli-update.js'; +import { createProjectAddFromOriginAction } from './actions/project-add-from-origin.js'; +import type { ShellExec } from './actions/types.js'; + +const shellExec: ShellExec = (command, options) => shell(command, options); + +const readCliVersion = async (): Promise => VERSION; + +let registrySingleton: ActionRegistry | null = null; + +/** + * Build the daemon's action registry with built-in control-plane actions. + * Reference actions all declare scope='machine' + require explicit capability; + * the transport layer decides which capabilities to grant per caller. + */ +export const getActionRegistry = (): ActionRegistry => { + if (registrySingleton) return registrySingleton; + + const registry = createRegistry(); + registry.register( + createCliUpdateAction({ shell: shellExec, readCurrentVersion: readCliVersion }) + ); + registry.register(createProjectAddFromOriginAction({ shell: shellExec })); + registry.register(createAgentInstallAction({ shell: shellExec })); + + registrySingleton = registry; + return registry; +}; + +/** Test-only: reset between tests. */ +export const resetActionRegistry = (): void => { + registrySingleton = null; +}; diff --git a/src/daemon/actions/actions.test.ts b/src/daemon/actions/actions.test.ts new file mode 100644 index 0000000..796a5b4 --- /dev/null +++ b/src/daemon/actions/actions.test.ts @@ -0,0 +1,119 @@ +import { describe, expect, it, vi } from 'vitest'; +import { createRegistry, output, result, type InvokeEvent } from '@agentage/core'; +import { createAgentInstallAction } from './agent-install.js'; +import { createCliUpdateAction } from './cli-update.js'; +import { createProjectAddFromOriginAction } from './project-add-from-origin.js'; +import type { ShellExec } from './types.js'; + +const fakeShell = (success = true, observe?: (cmd: string) => void): ShellExec => + async function* (command) { + observe?.(command); + yield output(`running: ${command}`); + yield result(success, success ? 'ok' : 'fail'); + }; + +const collect = async (gen: AsyncGenerator): Promise => { + const events: InvokeEvent[] = []; + for await (const e of gen) events.push(e); + return events; +}; + +describe('cli-update', () => { + it('installs target version and returns bump envelope', async () => { + const observed: string[] = []; + const reg = createRegistry(); + reg.register( + createCliUpdateAction({ + shell: fakeShell(true, (c) => observed.push(c)), + readCurrentVersion: async () => '0.17.0', + }) + ); + const events = await collect( + reg.invoke({ + action: 'cli:update', + input: { target: '0.18.0' }, + callerId: 'test', + capabilities: ['cli.write'], + }) + ); + expect(observed).toEqual(['npm install -g @agentage/cli@0.18.0']); + expect(events.at(-1)).toMatchObject({ + type: 'result', + data: { installed: '0.18.0', from: '0.17.0' }, + }); + }); + + it('rejects non-semver target', async () => { + const reg = createRegistry(); + reg.register( + createCliUpdateAction({ shell: fakeShell(), readCurrentVersion: async () => '0.17.0' }) + ); + const events = await collect( + reg.invoke({ + action: 'cli:update', + input: { target: 'master' }, + callerId: 'test', + capabilities: ['cli.write'], + }) + ); + expect(events.at(-1)).toMatchObject({ type: 'error', code: 'INVALID_INPUT' }); + }); +}); + +describe('project-add-from-origin', () => { + it('derives project name from remote and passes branch flag', async () => { + const spy = vi.fn(); + const reg = createRegistry(); + reg.register(createProjectAddFromOriginAction({ shell: fakeShell(true, spy) })); + await collect( + reg.invoke({ + action: 'project:addFromOrigin', + input: { + remote: 'git@github.com:agentage/cli.git', + parentDir: '/tmp/projects', + branch: 'develop', + }, + callerId: 'test', + capabilities: ['project.write'], + }) + ); + expect(spy).toHaveBeenCalledWith( + 'git clone -b develop git@github.com:agentage/cli.git /tmp/projects/cli' + ); + }); +}); + +describe('agent-install', () => { + it('runs npm install in workspaceDir', async () => { + const spy = vi.fn(); + const reg = createRegistry(); + reg.register(createAgentInstallAction({ shell: fakeShell(true, spy) })); + const events = await collect( + reg.invoke({ + action: 'agent:install', + input: { spec: '@agentage/agent-pr@1.0.0', workspaceDir: '/home/me/agents' }, + callerId: 'test', + capabilities: ['agent.write'], + }) + ); + expect(spy).toHaveBeenCalledWith('npm install @agentage/agent-pr@1.0.0'); + expect(events.at(-1)).toMatchObject({ + type: 'result', + data: { spec: '@agentage/agent-pr@1.0.0' }, + }); + }); + + it('emits EXECUTION_FAILED when install fails', async () => { + const reg = createRegistry(); + reg.register(createAgentInstallAction({ shell: fakeShell(false) })); + const events = await collect( + reg.invoke({ + action: 'agent:install', + input: { spec: 'bad-pkg', workspaceDir: '/tmp' }, + callerId: 'test', + capabilities: ['agent.write'], + }) + ); + expect(events.at(-1)).toMatchObject({ type: 'error', code: 'EXECUTION_FAILED' }); + }); +}); diff --git a/src/daemon/actions/agent-install.ts b/src/daemon/actions/agent-install.ts new file mode 100644 index 0000000..0c50390 --- /dev/null +++ b/src/daemon/actions/agent-install.ts @@ -0,0 +1,56 @@ +import { action, ActionError, type ActionDefinition } from '@agentage/core'; +import type { ActionProgress, ShellExec } from './types.js'; + +export interface AgentInstallInput { + spec: string; + workspaceDir: string; +} + +export interface AgentInstallOutput { + spec: string; + workspaceDir: string; + command: string; +} + +const validate = (raw: unknown): AgentInstallInput => { + if (!raw || typeof raw !== 'object') throw new Error('input must be an object'); + const { spec, workspaceDir } = raw as Record; + if (typeof spec !== 'string' || spec.length === 0) + throw new Error('spec must be a non-empty string'); + if (typeof workspaceDir !== 'string' || !workspaceDir.startsWith('/')) { + throw new Error('workspaceDir must be an absolute path'); + } + return { spec, workspaceDir }; +}; + +export const createAgentInstallAction = (deps: { + shell: ShellExec; +}): ActionDefinition => + action({ + manifest: { + name: 'agent:install', + version: '1.0', + title: 'Install agent', + description: 'Install an agent package into the agents workspace', + scope: 'machine', + capability: 'agent.write', + idempotent: false, + }, + validateInput: validate, + async *execute(ctx, input): AsyncGenerator { + const command = `npm install ${input.spec}`; + yield { step: 'install', detail: command }; + + let failed = false; + for await (const event of deps.shell(command, { + signal: ctx.signal, + cwd: input.workspaceDir, + })) { + if (event.data.type === 'result' && !event.data.success) failed = true; + } + if (failed) + throw new ActionError('EXECUTION_FAILED', `npm install failed: ${input.spec}`, true); + + return { spec: input.spec, workspaceDir: input.workspaceDir, command }; + }, + }); diff --git a/src/daemon/actions/cli-update.ts b/src/daemon/actions/cli-update.ts new file mode 100644 index 0000000..825400c --- /dev/null +++ b/src/daemon/actions/cli-update.ts @@ -0,0 +1,63 @@ +import { action, ActionError, type ActionDefinition } from '@agentage/core'; +import type { ActionProgress, ShellExec } from './types.js'; + +export interface CliUpdateInput { + target: string; + via?: 'npm'; +} + +export interface CliUpdateOutput { + installed: string; + from: string; + command: string; +} + +const SEMVER_OR_LATEST = /^(?:latest|\d+\.\d+\.\d+(?:-[\w.]+)?)$/; + +const validate = (raw: unknown): CliUpdateInput => { + if (!raw || typeof raw !== 'object') throw new Error('input must be an object'); + const { target, via } = raw as { target?: unknown; via?: unknown }; + if (typeof target !== 'string' || !SEMVER_OR_LATEST.test(target)) { + throw new Error('target must be "latest" or a semver string like "1.2.3"'); + } + if (via !== undefined && via !== 'npm') throw new Error('via must be "npm" when set'); + return { target, via: 'npm' }; +}; + +export const createCliUpdateAction = (deps: { + shell: ShellExec; + readCurrentVersion: () => Promise; +}): ActionDefinition => + action({ + manifest: { + name: 'cli:update', + version: '1.0', + title: 'Update CLI', + description: 'Install a specific version of @agentage/cli globally via npm', + scope: 'machine', + capability: 'cli.write', + idempotent: false, + }, + validateInput: validate, + async *execute(ctx, input): AsyncGenerator { + const from = await deps.readCurrentVersion(); + yield { step: 'resolve', detail: `current=${from} target=${input.target}` }; + + const pkg = + input.target === 'latest' ? '@agentage/cli@latest' : `@agentage/cli@${input.target}`; + const command = `npm install -g ${pkg}`; + yield { step: 'install', detail: command }; + + let lastError: string | undefined; + for await (const event of deps.shell(command, { signal: ctx.signal })) { + if (event.data.type === 'error') { + lastError = `${event.data.code}: ${event.data.message}`; + } + if (event.data.type === 'result' && !event.data.success) { + throw new ActionError('EXECUTION_FAILED', lastError ?? 'npm install failed', true); + } + } + + return { installed: input.target, from, command }; + }, + }); diff --git a/src/daemon/actions/index.ts b/src/daemon/actions/index.ts new file mode 100644 index 0000000..d9a99b2 --- /dev/null +++ b/src/daemon/actions/index.ts @@ -0,0 +1,10 @@ +export { createCliUpdateAction } from './cli-update.js'; +export type { CliUpdateInput, CliUpdateOutput } from './cli-update.js'; + +export { createProjectAddFromOriginAction } from './project-add-from-origin.js'; +export type { ProjectAddInput, ProjectAddOutput } from './project-add-from-origin.js'; + +export { createAgentInstallAction } from './agent-install.js'; +export type { AgentInstallInput, AgentInstallOutput } from './agent-install.js'; + +export type { ActionProgress, ShellExec } from './types.js'; diff --git a/src/daemon/actions/project-add-from-origin.ts b/src/daemon/actions/project-add-from-origin.ts new file mode 100644 index 0000000..9477569 --- /dev/null +++ b/src/daemon/actions/project-add-from-origin.ts @@ -0,0 +1,70 @@ +import { action, ActionError, type ActionDefinition } from '@agentage/core'; +import type { ActionProgress, ShellExec } from './types.js'; + +export interface ProjectAddInput { + remote: string; + parentDir: string; + branch?: string; + name?: string; +} + +export interface ProjectAddOutput { + name: string; + path: string; + remote: string; + branch: string; +} + +const REMOTE = /^(?:git@|https?:\/\/)[\w.@:/\-~]+\.git$/; + +const deriveName = (remote: string): string => { + const match = /([^/]+?)(?:\.git)?$/.exec(remote); + if (!match?.[1]) throw new Error(`cannot derive name from remote: ${remote}`); + return match[1]; +}; + +const validate = (raw: unknown): ProjectAddInput => { + if (!raw || typeof raw !== 'object') throw new Error('input must be an object'); + const { remote, parentDir, branch, name } = raw as Record; + if (typeof remote !== 'string' || !REMOTE.test(remote)) { + throw new Error('remote must be a valid git URL (git@ or https://, ending in .git)'); + } + if (typeof parentDir !== 'string' || !parentDir.startsWith('/')) { + throw new Error('parentDir must be an absolute path'); + } + if (branch !== undefined && typeof branch !== 'string') throw new Error('branch must be string'); + if (name !== undefined && typeof name !== 'string') throw new Error('name must be string'); + return { remote, parentDir, branch, name }; +}; + +export const createProjectAddFromOriginAction = (deps: { + shell: ShellExec; +}): ActionDefinition => + action({ + manifest: { + name: 'project:addFromOrigin', + version: '1.0', + title: 'Add project from git remote', + description: 'Clone a git remote into parentDir and register as a project', + scope: 'machine', + capability: 'project.write', + idempotent: true, + }, + validateInput: validate, + async *execute(ctx, input): AsyncGenerator { + const name = input.name ?? deriveName(input.remote); + const path = `${input.parentDir.replace(/\/$/, '')}/${name}`; + const branchFlag = input.branch ? ` -b ${input.branch}` : ''; + const command = `git clone${branchFlag} ${input.remote} ${path}`; + yield { step: 'clone', detail: command }; + + let failed = false; + for await (const event of deps.shell(command, { signal: ctx.signal })) { + if (event.data.type === 'result' && !event.data.success) failed = true; + } + if (failed) throw new ActionError('EXECUTION_FAILED', `git clone failed: ${command}`, true); + + yield { step: 'register', detail: path }; + return { name, path, remote: input.remote, branch: input.branch ?? 'default' }; + }, + }); diff --git a/src/daemon/actions/types.ts b/src/daemon/actions/types.ts new file mode 100644 index 0000000..f6a7de5 --- /dev/null +++ b/src/daemon/actions/types.ts @@ -0,0 +1,11 @@ +import type { RunEvent } from '@agentage/core'; + +export type ShellExec = ( + command: string, + options?: { signal?: AbortSignal; timeoutMs?: number; cwd?: string } +) => AsyncIterable; + +export interface ActionProgress { + step: string; + detail?: string; +} diff --git a/src/daemon/routes.actions.test.ts b/src/daemon/routes.actions.test.ts new file mode 100644 index 0000000..ec8431b --- /dev/null +++ b/src/daemon/routes.actions.test.ts @@ -0,0 +1,153 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import express from 'express'; +import { type Server } from 'node:http'; +import { + action, + createRegistry, + type ActionDefinition, + type ActionRegistry, + type InvokeEvent, +} from '@agentage/core'; + +vi.mock('./config.js', () => ({ loadConfig: vi.fn() })); +vi.mock('./run-manager.js', () => ({ + startRun: vi.fn(), + getRun: vi.fn(), + getRuns: vi.fn().mockReturnValue([]), + cancelRun: vi.fn(), + sendInput: vi.fn(), +})); +vi.mock('../hub/hub-sync.js', () => ({ + getHubSync: () => ({ + isConnected: () => false, + isConnecting: () => false, + start: vi.fn(), + stop: vi.fn(), + triggerHeartbeat: vi.fn().mockResolvedValue(undefined), + }), +})); +vi.mock('../hub/auth.js', () => ({ readAuth: () => null })); +vi.mock('../hub/hub-client.js', () => ({ createHubClient: vi.fn() })); +vi.mock('../utils/version.js', () => ({ VERSION: '0.0.0-test' })); +vi.mock('../projects/projects.js', () => ({ loadProjects: () => [] })); +vi.mock('../discovery/scanner.js', () => ({ getLastScanWarnings: () => [] })); + +let testRegistry: ActionRegistry; +vi.mock('./actions.js', () => ({ + getActionRegistry: () => testRegistry, + resetActionRegistry: vi.fn(), +})); + +const echoAction: ActionDefinition<{ msg: string }, { echoed: string }, { step: number }> = action({ + manifest: { + name: 'test:echo', + version: '1.0', + title: 'Echo', + description: 'Echo a message', + scope: 'machine', + capability: 'test.read', + idempotent: true, + }, + async *execute(_ctx, input) { + yield { step: 1 }; + yield { step: 2 }; + return { echoed: input.msg }; + }, +}); + +const parseSse = (text: string): InvokeEvent[] => + text + .split('\n\n') + .filter((chunk) => chunk.startsWith('data: ')) + .map((chunk) => JSON.parse(chunk.slice(6)) as InvokeEvent); + +describe('action routes', () => { + let server: Server; + + beforeEach(async () => { + const { loadConfig } = await import('./config.js'); + vi.mocked(loadConfig).mockReturnValue({ + machine: { id: 'm1', name: 'test' }, + daemon: { port: 4243 }, + agents: { default: '/tmp/agents', additional: [] }, + projects: { default: '/tmp/projects', additional: [] }, + sync: { events: {} }, + } as unknown as ReturnType); + + testRegistry = createRegistry({ idGenerator: () => 'inv-test' }); + testRegistry.register(echoAction); + + const { createRoutes } = await import('./routes.js'); + const app = express(); + app.use(createRoutes()); + server = await new Promise((resolve) => { + const s = app.listen(0, () => resolve(s)); + }); + }); + + afterEach(async () => { + await new Promise((resolve) => server.close(() => resolve())); + }); + + const baseUrl = (): string => { + const addr = server.address(); + const port = typeof addr === 'object' && addr ? addr.port : 0; + return `http://localhost:${port}`; + }; + + it('GET /api/actions lists registered manifests', async () => { + const res = await fetch(`${baseUrl()}/api/actions`); + expect(res.status).toBe(200); + const body = (await res.json()) as { success: boolean; data: unknown[] }; + expect(body.success).toBe(true); + expect(body.data).toHaveLength(1); + expect(body.data[0]).toMatchObject({ name: 'test:echo', capability: 'test.read' }); + }); + + it('POST /api/actions/:name streams accepted → progress → result as SSE', async () => { + const res = await fetch(`${baseUrl()}/api/actions/test:echo`, { + method: 'POST', + headers: { 'Content-Type': 'application/json', 'x-capabilities': 'test.read' }, + body: JSON.stringify({ input: { msg: 'hello' } }), + }); + expect(res.status).toBe(200); + expect(res.headers.get('content-type')).toContain('text/event-stream'); + const events = parseSse(await res.text()); + expect(events).toEqual([ + { type: 'accepted', invocationId: 'inv-test' }, + { type: 'progress', data: { step: 1 } }, + { type: 'progress', data: { step: 2 } }, + { type: 'result', data: { echoed: 'hello' } }, + ]); + }); + + it('emits UNAUTHORIZED error event when x-capabilities omits the required cap', async () => { + const res = await fetch(`${baseUrl()}/api/actions/test:echo`, { + method: 'POST', + headers: { 'Content-Type': 'application/json', 'x-capabilities': 'other.read' }, + body: JSON.stringify({ input: { msg: 'hi' } }), + }); + const events = parseSse(await res.text()); + expect(events.at(-1)).toMatchObject({ type: 'error', code: 'UNAUTHORIZED' }); + }); + + it('defaults to wildcard capability when x-capabilities header is absent', async () => { + const res = await fetch(`${baseUrl()}/api/actions/test:echo`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ input: { msg: 'hi' } }), + }); + const events = parseSse(await res.text()); + expect(events.at(-1)).toMatchObject({ type: 'result' }); + }); + + it('returns UNKNOWN_ACTION error for unregistered names', async () => { + const res = await fetch(`${baseUrl()}/api/actions/nope:nope`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ input: {} }), + }); + const events = parseSse(await res.text()); + expect(events[0]).toMatchObject({ type: 'error', code: 'UNKNOWN_ACTION' }); + }); +}); diff --git a/src/daemon/routes.ts b/src/daemon/routes.ts index 0a1f81f..f4def6d 100644 --- a/src/daemon/routes.ts +++ b/src/daemon/routes.ts @@ -1,11 +1,12 @@ -import { type Router, Router as createRouter, json } from 'express'; -import { type Agent, type JsonSchema } from '@agentage/core'; +import { type Router, Router as createRouter, json, type Request, type Response } from 'express'; +import { type Agent, type ActionRegistry, type JsonSchema } from '@agentage/core'; import { loadConfig } from './config.js'; import { cancelRun, getRun, getRuns, sendInput, startRun } from './run-manager.js'; import { getHubSync } from '../hub/hub-sync.js'; import { readAuth } from '../hub/auth.js'; import { createHubClient } from '../hub/hub-client.js'; import { getLastScanWarnings } from '../discovery/scanner.js'; +import { getActionRegistry } from './actions.js'; import { VERSION } from '../utils/version.js'; import { loadProjects } from '../projects/projects.js'; @@ -311,5 +312,74 @@ export const createRoutes = (): Router => { await withHubClient(res, (client) => client.runScheduleNow(req.params.id)); }); + wireActionRoutes(router, getActionRegistry()); + return router; }; + +const parseCapabilities = (req: Request): string[] => { + const header = req.header('x-capabilities'); + if (!header) return ['*']; + return header + .split(',') + .map((c) => c.trim()) + .filter(Boolean); +}; + +const streamInvocation = async ( + registry: ActionRegistry, + actionName: string, + body: Record, + req: Request, + res: Response +): Promise => { + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + res.flushHeaders(); + + const ac = new AbortController(); + let invocationComplete = false; + // Detect client disconnect on the socket rather than req 'close', which can + // fire as soon as the request body is fully consumed — not only on real aborts. + res.on('close', () => { + if (!invocationComplete) ac.abort(); + }); + + const gen = registry.invoke( + { + action: actionName, + version: typeof body.version === 'string' ? body.version : undefined, + input: body.input, + idempotencyKey: typeof body.idempotencyKey === 'string' ? body.idempotencyKey : undefined, + callerId: req.header('x-caller-id') ?? 'local', + capabilities: parseCapabilities(req), + }, + ac.signal + ); + + for await (const event of gen) { + res.write(`data: ${JSON.stringify(event)}\n\n`); + if (event.type === 'result' || event.type === 'error') { + invocationComplete = true; + } + } + invocationComplete = true; + res.end(); +}; + +export const wireActionRoutes = (router: Router, registry: ActionRegistry): void => { + router.get('/api/actions', (_req, res) => { + res.json({ success: true, data: registry.list() }); + }); + + router.post('/api/actions/:name', async (req, res) => { + await streamInvocation( + registry, + req.params.name, + (req.body ?? {}) as Record, + req, + res + ); + }); +}; diff --git a/src/daemon/websocket.ts b/src/daemon/websocket.ts index f54b3bd..a67cb5e 100644 --- a/src/daemon/websocket.ts +++ b/src/daemon/websocket.ts @@ -1,7 +1,8 @@ import { type Server } from 'node:http'; import { WebSocketServer, type WebSocket } from 'ws'; -import { type Run, type RunEvent } from '@agentage/core'; +import { type InvokeEvent, type Run, type RunEvent } from '@agentage/core'; import { onRunEvent, onRunStateChange } from './run-manager.js'; +import { getActionRegistry } from './actions.js'; import { logDebug, logInfo } from './logger.js'; interface SubscribeMessage { @@ -14,7 +15,57 @@ interface UnsubscribeMessage { runId: string; } -type ClientMessage = SubscribeMessage | UnsubscribeMessage; +interface InvokeMessage { + type: 'invoke'; + /** Echoed back on every action_event so clients can multiplex concurrent invocations. */ + requestId: string; + action: string; + input?: unknown; + version?: string; + idempotencyKey?: string; + capabilities?: string[]; +} + +interface CancelInvokeMessage { + type: 'cancel_invoke'; + requestId: string; +} + +type ClientMessage = SubscribeMessage | UnsubscribeMessage | InvokeMessage | CancelInvokeMessage; + +const activeInvocations = new Map(); + +const runInvocation = async (ws: WebSocket, msg: InvokeMessage): Promise => { + const ac = new AbortController(); + activeInvocations.set(msg.requestId, ac); + try { + const gen = getActionRegistry().invoke( + { + action: msg.action, + version: msg.version, + input: msg.input, + idempotencyKey: msg.idempotencyKey, + callerId: 'ws', + capabilities: msg.capabilities ?? ['*'], + }, + ac.signal + ); + for await (const event of gen) { + sendToClient(ws, { type: 'action_event', requestId: msg.requestId, event }); + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + const fallback: InvokeEvent = { + type: 'error', + code: 'EXECUTION_FAILED', + message, + retryable: true, + }; + sendToClient(ws, { type: 'action_event', requestId: msg.requestId, event: fallback }); + } finally { + activeInvocations.delete(msg.requestId); + } +}; type BufferedMessage = | { type: 'run_event'; runId: string; event: RunEvent } @@ -124,6 +175,19 @@ export const setupWebSocket = (server: Server): WebSocketServer => { clientSubscriptions.get(ws)?.delete(msg.runId); logDebug(`Client unsubscribed from run ${msg.runId}`); } + + if (msg.type === 'invoke') { + logDebug(`Client invoked action ${msg.action} (${msg.requestId})`); + void runInvocation(ws, msg); + } + + if (msg.type === 'cancel_invoke') { + const ac = activeInvocations.get(msg.requestId); + if (ac) { + ac.abort(); + logDebug(`Canceled invocation ${msg.requestId}`); + } + } } catch { logDebug('Invalid WebSocket message received'); }