diff --git a/packages/workflow-executor/CLAUDE.md b/packages/workflow-executor/CLAUDE.md index f3822a22d0..8eed02ca00 100644 --- a/packages/workflow-executor/CLAUDE.md +++ b/packages/workflow-executor/CLAUDE.md @@ -43,7 +43,7 @@ Front ◀──▶ Orchestrator ◀──pull/push──▶ Executor ── ``` src/ ├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError, NoWritableFieldsError, NoActionsError, StepPersistenceError, NoRelationshipFieldsError, RelatedRecordNotFoundError -├── runner.ts # Runner class — main entry point (start/stop/triggerPoll, HTTP server wiring) +├── runner.ts # Runner class — main entry point (start/stop/triggerPoll, HTTP server wiring, graceful drain) ├── types/ # Core type definitions (@draft) │ ├── step-definition.ts # StepType enum + step definition interfaces │ ├── step-outcome.ts # Step outcome tracking types (StepOutcome, sent to orchestrator) @@ -54,6 +54,10 @@ src/ │ ├── agent-port.ts # Interface to the Forest Admin agent (datasource) │ ├── workflow-port.ts # Interface to the orchestrator │ └── run-store.ts # Interface for persisting run state (scoped to a run) +├── stores/ # RunStore implementations +│ ├── in-memory-store.ts # InMemoryStore — Map-based, for tests +│ ├── database-store.ts # DatabaseStore — Sequelize + umzug migrations +│ └── build-run-store.ts # Factory functions: buildDatabaseRunStore, buildInMemoryRunStore ├── adapters/ # Port implementations │ ├── agent-client-agent-port.ts # AgentPort via @forestadmin/agent-client │ └── forest-server-workflow-port.ts # WorkflowPort via HTTP (forestadmin-client ServerUtils) @@ -83,6 +87,7 @@ src/ - **displayName in AI tools** — All `DynamicStructuredTool` schemas and system message prompts must use `displayName`, never `fieldName`. `displayName` is a Forest Admin frontend feature that replaces the technical field/relation/action name with a product-oriented label configured by the Forest Admin admin. End users write their workflow prompts using these display names, not the underlying technical names. After an AI tool call returns display names, map them back to `fieldName`/`name` before using them in datasource operations (e.g. filtering record values, calling `getRecord`). - **No recovery/retry** — Once the executor returns a step result to the orchestrator, the step is considered executed. There is no mechanism to re-dispatch a step, so executors must NOT include recovery checks (e.g. checking the RunStore for cached results before executing). Each step executes exactly once. - **Fetched steps must be executed** — Any step retrieved from the orchestrator via `getPendingStepExecutions()` must be executed. Silently discarding a fetched step (e.g. filtering it out by `runId` after fetching) violates the executor contract: the orchestrator assumes execution is guaranteed once the step is dispatched. The only valid filter before executing is deduplication via `inFlightSteps` (to avoid running the same step twice concurrently). +- **Graceful shutdown** — `stop()` drains in-flight steps before closing resources. The `state` getter exposes the lifecycle: `idle → running → draining → stopped`. `stopTimeoutMs` (default 30s) prevents `stop()` from hanging forever if a step is stuck. The HTTP server stays up during drain so the frontend can still query run status. Signal handling (`SIGTERM`/`SIGINT`) is the consumer's responsibility — the Runner is a library class. ## Commands diff --git a/packages/workflow-executor/src/adapters/console-logger.ts b/packages/workflow-executor/src/adapters/console-logger.ts index cbe989ab33..cb9ca735a1 100644 --- a/packages/workflow-executor/src/adapters/console-logger.ts +++ b/packages/workflow-executor/src/adapters/console-logger.ts @@ -4,4 +4,8 @@ export default class ConsoleLogger implements Logger { error(message: string, context: Record): void { console.error(JSON.stringify({ message, timestamp: new Date().toISOString(), ...context })); } + + info(message: string, context: Record): void { + console.info(JSON.stringify({ message, timestamp: new Date().toISOString(), ...context })); + } } diff --git a/packages/workflow-executor/src/build-workflow-executor.ts b/packages/workflow-executor/src/build-workflow-executor.ts index e789d54499..f09c01b9d8 100644 --- a/packages/workflow-executor/src/build-workflow-executor.ts +++ b/packages/workflow-executor/src/build-workflow-executor.ts @@ -1,4 +1,5 @@ import type { Logger } from './ports/logger-port'; +import type { RunnerState } from './runner'; import type { AiConfiguration } from '@forestadmin/ai-proxy'; import type { Options as SequelizeOptions } from 'sequelize'; @@ -6,7 +7,9 @@ import { AiClient } from '@forestadmin/ai-proxy'; import { Sequelize } from 'sequelize'; import AgentClientAgentPort from './adapters/agent-client-agent-port'; +import ConsoleLogger from './adapters/console-logger'; import ForestServerWorkflowPort from './adapters/forest-server-workflow-port'; +import ExecutorHttpServer from './http/executor-http-server'; import Runner from './runner'; import SchemaCache from './schema-cache'; import DatabaseStore from './stores/database-store'; @@ -14,21 +17,24 @@ import InMemoryStore from './stores/in-memory-store'; const DEFAULT_FOREST_SERVER_URL = 'https://api.forestadmin.com'; const DEFAULT_POLLING_INTERVAL_MS = 5000; +const FORCE_EXIT_DELAY_MS = 5000; export interface WorkflowExecutor { start(): Promise; stop(): Promise; + readonly state: RunnerState; } export interface ExecutorOptions { envSecret: string; authSecret: string; agentUrl: string; + httpPort: number; forestServerUrl?: string; aiConfigurations: AiConfiguration[]; pollingIntervalMs?: number; - httpPort?: number; logger?: Logger; + stopTimeoutMs?: number; } export type DatabaseExecutorOptions = ExecutorOptions & @@ -36,6 +42,7 @@ export type DatabaseExecutorOptions = ExecutorOptions & function buildCommonDependencies(options: ExecutorOptions) { const forestServerUrl = options.forestServerUrl ?? DEFAULT_FOREST_SERVER_URL; + const logger = options.logger ?? new ConsoleLogger(); const workflowPort = new ForestServerWorkflowPort({ envSecret: options.envSecret, @@ -59,27 +66,114 @@ function buildCommonDependencies(options: ExecutorOptions) { schemaCache, workflowPort, aiClient, + logger, pollingIntervalMs: options.pollingIntervalMs ?? DEFAULT_POLLING_INTERVAL_MS, envSecret: options.envSecret, authSecret: options.authSecret, - httpPort: options.httpPort, - logger: options.logger, + stopTimeoutMs: options.stopTimeoutMs, + }; +} + +function createWorkflowExecutor( + runner: Runner, + server: ExecutorHttpServer, + logger: Logger, +): WorkflowExecutor { + let shutdownPromise: Promise | null = null; + + const shutdown = async () => { + try { + await server.stop(); + } catch (err) { + logger.error('HTTP server close failed during shutdown', { + error: err instanceof Error ? err.message : String(err), + }); + } + + await runner.stop(); + }; + + const onSignal = async () => { + logger.info?.('Received shutdown signal, stopping gracefully...', {}); + + try { + if (!shutdownPromise) shutdownPromise = shutdown(); + await shutdownPromise; + process.exitCode = 0; + } catch (error) { + logger.error('Graceful shutdown failed', { + error: error instanceof Error ? error.message : String(error), + }); + process.exitCode = 1; + } + + // Safety net: force exit if the event loop doesn't drain + // eslint-disable-next-line no-console + setTimeout(() => { + logger.error('Process did not exit after shutdown — forcing exit', {}); + process.exit(process.exitCode ?? 1); + }, FORCE_EXIT_DELAY_MS).unref(); + }; + + return { + get state() { + return runner.state; + }, + + async start() { + await runner.start(); + await server.start(); + + process.on('SIGTERM', onSignal); + process.on('SIGINT', onSignal); + }, + + async stop() { + process.removeListener('SIGTERM', onSignal); + process.removeListener('SIGINT', onSignal); + + if (!shutdownPromise) shutdownPromise = shutdown(); + await shutdownPromise; + }, }; } export function buildInMemoryExecutor(options: ExecutorOptions): WorkflowExecutor { - return new Runner({ - ...buildCommonDependencies(options), + const deps = buildCommonDependencies(options); + + const runner = new Runner({ + ...deps, runStore: new InMemoryStore(), }); + + const server = new ExecutorHttpServer({ + port: options.httpPort, + runner, + authSecret: options.authSecret, + workflowPort: deps.workflowPort, + logger: deps.logger, + }); + + return createWorkflowExecutor(runner, server, deps.logger); } export function buildDatabaseExecutor(options: DatabaseExecutorOptions): WorkflowExecutor { + const deps = buildCommonDependencies(options); const { uri, ...sequelizeOptions } = options.database as SequelizeOptions & { uri?: string }; const sequelize = uri ? new Sequelize(uri, sequelizeOptions) : new Sequelize(sequelizeOptions); - return new Runner({ - ...buildCommonDependencies(options), + const runner = new Runner({ + ...deps, runStore: new DatabaseStore({ sequelize }), }); + + const server = new ExecutorHttpServer({ + port: options.httpPort, + runner, + authSecret: options.authSecret, + workflowPort: deps.workflowPort, + logger: deps.logger, + }); + + return createWorkflowExecutor(runner, server, deps.logger); } diff --git a/packages/workflow-executor/src/http/executor-http-server.ts b/packages/workflow-executor/src/http/executor-http-server.ts index 1cc287060b..e9f3cec9b6 100644 --- a/packages/workflow-executor/src/http/executor-http-server.ts +++ b/packages/workflow-executor/src/http/executor-http-server.ts @@ -62,6 +62,19 @@ export default class ExecutorHttpServer { } }); + // Health endpoint — before JWT so it's publicly accessible (infra probes don't send tokens) + this.app.use(async (ctx, next) => { + if (ctx.method === 'GET' && ctx.path === '/health') { + const { state } = this.options.runner; + ctx.status = state === 'running' || state === 'draining' ? 200 : 503; + ctx.body = { state }; + + return; + } + + await next(); + }); + this.app.use(bodyParser()); // JWT middleware — validates Bearer token using authSecret diff --git a/packages/workflow-executor/src/index.ts b/packages/workflow-executor/src/index.ts index 189f0a3b47..65f9f4a175 100644 --- a/packages/workflow-executor/src/index.ts +++ b/packages/workflow-executor/src/index.ts @@ -101,7 +101,7 @@ export { default as ForestServerWorkflowPort } from './adapters/forest-server-wo export { default as ExecutorHttpServer } from './http/executor-http-server'; export type { ExecutorHttpServerOptions } from './http/executor-http-server'; export { default as Runner } from './runner'; -export type { RunnerConfig } from './runner'; +export type { RunnerConfig, RunnerState } from './runner'; export { default as validateSecrets } from './validate-secrets'; export { default as SchemaCache } from './schema-cache'; export { default as InMemoryStore } from './stores/in-memory-store'; diff --git a/packages/workflow-executor/src/ports/logger-port.ts b/packages/workflow-executor/src/ports/logger-port.ts index 017f8742ab..ed2acd3930 100644 --- a/packages/workflow-executor/src/ports/logger-port.ts +++ b/packages/workflow-executor/src/ports/logger-port.ts @@ -1,3 +1,4 @@ export interface Logger { error(message: string, context: Record): void; + info?(message: string, context: Record): void; } diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index 2b6bb42320..470c55a3a5 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -17,10 +17,11 @@ import { causeMessage, } from './errors'; import StepExecutorFactory from './executors/step-executor-factory'; -import ExecutorHttpServer from './http/executor-http-server'; import patchBodySchemas from './pending-data-validators'; import validateSecrets from './validate-secrets'; +export type RunnerState = 'idle' | 'running' | 'draining' | 'stopped'; + export interface RunnerConfig { agentPort: AgentPort; workflowPort: WorkflowPort; @@ -31,16 +32,18 @@ export interface RunnerConfig { envSecret: string; authSecret: string; logger?: Logger; - httpPort?: number; + stopTimeoutMs?: number; } +const DEFAULT_STOP_TIMEOUT_MS = 30_000; + export default class Runner { private readonly config: RunnerConfig; - private httpServer: ExecutorHttpServer | null = null; private pollingTimer: NodeJS.Timeout | null = null; - private readonly inFlightSteps = new Set(); + private readonly inFlightSteps = new Map>(); private isRunning = false; private readonly logger: Logger; + private _state: RunnerState = 'idle'; private static stepKey(step: PendingStepExecution): string { return `${step.runId}:${step.stepId}`; @@ -51,29 +54,27 @@ export default class Runner { this.logger = config.logger ?? new ConsoleLogger(); } + get state(): RunnerState { + return this._state; + } + async start(): Promise { + if (this._state === 'stopped' || this._state === 'draining') { + throw new Error('Runner has been stopped and cannot be restarted'); + } + if (this.isRunning) return; validateSecrets({ envSecret: this.config.envSecret, authSecret: this.config.authSecret }); this.isRunning = true; + this._state = 'running'; try { await this.config.runStore.init(this.logger); - - if (this.config.httpPort !== undefined && !this.httpServer) { - const server = new ExecutorHttpServer({ - port: this.config.httpPort, - runner: this, - authSecret: this.config.authSecret, - workflowPort: this.config.workflowPort, - logger: this.logger, - }); - await server.start(); - this.httpServer = server; - } } catch (error) { this.isRunning = false; + this._state = 'idle'; throw error; } @@ -81,6 +82,9 @@ export default class Runner { } async stop(): Promise { + if (this._state === 'idle' || this._state === 'stopped' || this._state === 'draining') return; + + this._state = 'draining'; this.isRunning = false; if (this.pollingTimer !== null) { @@ -88,17 +92,53 @@ export default class Runner { this.pollingTimer = null; } - if (this.httpServer) { - await this.httpServer.stop(); - this.httpServer = null; - } + try { + // Drain in-flight steps + if (this.inFlightSteps.size > 0) { + this.logger.info?.('Draining in-flight steps', { + count: this.inFlightSteps.size, + steps: [...this.inFlightSteps.keys()], + }); - await Promise.allSettled([ - this.config.aiClient.closeConnections(), - this.config.runStore.close(this.logger), - ]); + const timeout = this.config.stopTimeoutMs ?? DEFAULT_STOP_TIMEOUT_MS; + let drainTimer: NodeJS.Timeout | undefined; + const drainResult = await Promise.race([ + Promise.allSettled(this.inFlightSteps.values()).then(() => { + if (drainTimer) clearTimeout(drainTimer); + + return 'drained' as const; + }), + new Promise<'timeout'>(resolve => { + drainTimer = setTimeout(() => resolve('timeout'), timeout); + }), + ]); + + if (drainResult === 'timeout') { + this.logger.error('Drain timeout — steps still in flight', { + remainingSteps: [...this.inFlightSteps.keys()], + timeoutMs: timeout, + }); + } else { + this.logger.info?.('All in-flight steps drained', {}); + } + } - // TODO: graceful drain of in-flight steps (out of scope PRD-223) + // Close resources — log failures instead of silently swallowing + const results = await Promise.allSettled([ + this.config.aiClient.closeConnections(), + this.config.runStore.close(this.logger), + ]); + + for (const result of results) { + if (result.status === 'rejected') { + this.logger.error('Resource cleanup failed during shutdown', { + error: result.reason instanceof Error ? result.reason.message : String(result.reason), + }); + } + } + } finally { + this._state = 'stopped'; + } } async getRunStepExecutions(runId: string): Promise { @@ -110,8 +150,6 @@ export default class Runner { const execution = stepExecutions.find(e => e.stepIndex === stepIndex); const schema = execution ? patchBodySchemas[execution.type] : undefined; - // pendingData is typed as T | undefined; null is not expected (RunStore never persists null) - // but `== null` guards against both for safety. if (!execution || !schema || !('pendingData' in execution) || execution.pendingData == null) { throw new PendingDataNotFoundError(runId, stepIndex); } @@ -128,8 +166,6 @@ export default class Runner { ); } - // Cast is safe: the type guard above ensures `execution` is the correct union branch, - // and patchBodySchemas[execution.type] only accepts keys valid for that branch. await this.config.runStore.saveStepExecution(runId, { ...execution, pendingData: { ...(execution.pendingData as object), ...(parsed.data as object) }, @@ -189,10 +225,15 @@ export default class Runner { return this.config.aiClient.loadRemoteTools(mergedConfig); } - private async executeStep(step: PendingStepExecution): Promise { + private executeStep(step: PendingStepExecution): Promise { const key = Runner.stepKey(step); - this.inFlightSteps.add(key); + const promise = this.doExecuteStep(step, key); + this.inFlightSteps.set(key, promise); + + return promise; + } + private async doExecuteStep(step: PendingStepExecution, key: string): Promise { let result: StepExecutionResult; try { @@ -201,15 +242,13 @@ export default class Runner { ); result = await executor.execute(); } catch (error) { - // This block should never execute: the factory and executor contracts guarantee no rejection. - // It guards against future regressions. this.logger.error('FATAL: executor contract violated — step outcome not reported', { runId: step.runId, stepId: step.stepId, error: error instanceof Error ? error.message : String(error), }); - return; // Cannot report an outcome: the orchestrator will timeout on this step + return; } finally { this.inFlightSteps.delete(key); } diff --git a/packages/workflow-executor/test/adapters/console-logger.test.ts b/packages/workflow-executor/test/adapters/console-logger.test.ts new file mode 100644 index 0000000000..ba354fdf2f --- /dev/null +++ b/packages/workflow-executor/test/adapters/console-logger.test.ts @@ -0,0 +1,22 @@ +import ConsoleLogger from '../../src/adapters/console-logger'; + +describe('ConsoleLogger', () => { + let logger: ConsoleLogger; + + beforeEach(() => { + logger = new ConsoleLogger(); + }); + + it('info() writes to console.info as JSON', () => { + const spy = jest.spyOn(console, 'info').mockImplementation(); + + logger.info('test message', { key: 'value' }); + + expect(spy).toHaveBeenCalledTimes(1); + const output = JSON.parse(spy.mock.calls[0][0]); + expect(output).toMatchObject({ message: 'test message', key: 'value' }); + expect(output.timestamp).toBeDefined(); + + spy.mockRestore(); + }); +}); diff --git a/packages/workflow-executor/test/build-workflow-executor.test.ts b/packages/workflow-executor/test/build-workflow-executor.test.ts index 6ed51b9325..e89f9b0ef0 100644 --- a/packages/workflow-executor/test/build-workflow-executor.test.ts +++ b/packages/workflow-executor/test/build-workflow-executor.test.ts @@ -10,6 +10,7 @@ jest.mock('../src/stores/in-memory-store'); jest.mock('../src/stores/database-store'); jest.mock('../src/adapters/agent-client-agent-port'); jest.mock('../src/adapters/forest-server-workflow-port'); +jest.mock('../src/http/executor-http-server'); jest.mock('@forestadmin/ai-proxy', () => ({ AiClient: jest.fn(), })); @@ -23,6 +24,7 @@ const BASE_OPTIONS = { envSecret: 'a'.repeat(64), authSecret: 'test-secret', agentUrl: 'http://localhost:3310', + httpPort: 3100, aiConfigurations: [ { name: 'default', provider: 'openai' as const, model: 'gpt-4o', apiKey: 'sk-test' }, ], @@ -33,10 +35,12 @@ beforeEach(() => { }); describe('buildInMemoryExecutor', () => { - it('returns a WorkflowExecutor backed by a Runner', () => { + it('returns a WorkflowExecutor with start, stop, and state', () => { const executor = buildInMemoryExecutor(BASE_OPTIONS); - expect(executor).toBeInstanceOf(Runner); + expect(executor).toHaveProperty('start'); + expect(executor).toHaveProperty('stop'); + expect(executor).toHaveProperty('state'); }); it('creates an InMemoryStore as runStore', () => { @@ -121,10 +125,10 @@ describe('buildInMemoryExecutor', () => { ); }); - it('passes optional httpPort', () => { + it('does not pass httpPort to Runner (HTTP is composed externally)', () => { buildInMemoryExecutor({ ...BASE_OPTIONS, httpPort: 3000 }); - expect(MockedRunner).toHaveBeenCalledWith(expect.objectContaining({ httpPort: 3000 })); + expect(MockedRunner).toHaveBeenCalledWith(expect.not.objectContaining({ httpPort: 3000 })); }); }); @@ -137,10 +141,12 @@ describe('buildDatabaseExecutor', () => { database: { uri: 'postgres://localhost/mydb', dialect: 'postgres' as const }, }; - it('returns a WorkflowExecutor backed by a Runner', () => { + it('returns a WorkflowExecutor with start, stop, and state', () => { const executor = buildDatabaseExecutor(DB_OPTIONS); - expect(executor).toBeInstanceOf(Runner); + expect(executor).toHaveProperty('start'); + expect(executor).toHaveProperty('stop'); + expect(executor).toHaveProperty('state'); }); it('creates a DatabaseStore as runStore', () => { @@ -204,3 +210,90 @@ describe('buildDatabaseExecutor', () => { ); }); }); + +// --------------------------------------------------------------------------- +// WorkflowExecutor composeur (start, stop, signal handlers, shutdown) +// --------------------------------------------------------------------------- + +describe('WorkflowExecutor lifecycle', () => { + let executor: ReturnType; + let onSpy: jest.SpyInstance; + let removeSpy: jest.SpyInstance; + + beforeEach(() => { + // Setup Runner mock to have start/stop/state + MockedRunner.prototype.start = jest.fn().mockResolvedValue(undefined); + MockedRunner.prototype.stop = jest.fn().mockResolvedValue(undefined); + Object.defineProperty(MockedRunner.prototype, 'state', { + get: () => 'running', + configurable: true, + }); + + onSpy = jest.spyOn(process, 'on'); + removeSpy = jest.spyOn(process, 'removeListener'); + + executor = buildInMemoryExecutor(BASE_OPTIONS); + }); + + afterEach(() => { + onSpy.mockRestore(); + removeSpy.mockRestore(); + }); + + it('start() calls runner.start and server.start', async () => { + await executor.start(); + + expect(MockedRunner.prototype.start).toHaveBeenCalled(); + }); + + it('start() registers SIGTERM and SIGINT handlers', async () => { + await executor.start(); + + expect(onSpy).toHaveBeenCalledWith('SIGTERM', expect.any(Function)); + expect(onSpy).toHaveBeenCalledWith('SIGINT', expect.any(Function)); + }); + + it('stop() removes signal handlers', async () => { + await executor.start(); + await executor.stop(); + + expect(removeSpy).toHaveBeenCalledWith('SIGTERM', expect.any(Function)); + expect(removeSpy).toHaveBeenCalledWith('SIGINT', expect.any(Function)); + }); + + it('stop() calls runner.stop', async () => { + await executor.start(); + await executor.stop(); + + expect(MockedRunner.prototype.stop).toHaveBeenCalled(); + }); + + it('concurrent stop() calls share the same promise', async () => { + await executor.start(); + + const [r1, r2] = await Promise.all([executor.stop(), executor.stop()]); + + expect(r1).toBeUndefined(); + expect(r2).toBeUndefined(); + // runner.stop is only called once (shared promise) + expect(MockedRunner.prototype.stop).toHaveBeenCalledTimes(1); + }); + + it('stop() continues if HTTP server close fails', async () => { + // eslint-disable-next-line @typescript-eslint/no-var-requires, global-require + const { default: MockedHttpServer } = require('../src/http/executor-http-server'); + MockedHttpServer.prototype.stop = jest.fn().mockRejectedValue(new Error('server error')); + MockedHttpServer.prototype.start = jest.fn().mockResolvedValue(undefined); + + const exec = buildInMemoryExecutor(BASE_OPTIONS); + await exec.start(); + await exec.stop(); + + // runner.stop still called despite server.stop failure + expect(MockedRunner.prototype.stop).toHaveBeenCalled(); + }); + + it('state getter returns runner state', () => { + expect(executor.state).toBe('running'); + }); +}); diff --git a/packages/workflow-executor/test/http/executor-http-server.test.ts b/packages/workflow-executor/test/http/executor-http-server.test.ts index 407c11ae54..6fb5067c04 100644 --- a/packages/workflow-executor/test/http/executor-http-server.test.ts +++ b/packages/workflow-executor/test/http/executor-http-server.test.ts @@ -20,6 +20,7 @@ function signToken(payload: object, secret = AUTH_SECRET, options?: jsonwebtoken function createMockRunner(overrides: Partial = {}): Runner { return { + state: 'running', start: jest.fn().mockResolvedValue(undefined), stop: jest.fn().mockResolvedValue(undefined), triggerPoll: jest.fn().mockResolvedValue(undefined), @@ -156,6 +157,52 @@ describe('ExecutorHttpServer', () => { }); }); + describe('GET /health', () => { + it('returns 200 with state when runner is running', async () => { + const server = createServer({ runner: createMockRunner({ state: 'running' } as never) }); + + const response = await request(server.callback).get('/health'); + + expect(response.status).toBe(200); + expect(response.body).toEqual({ state: 'running' }); + }); + + it('returns 200 with state when runner is draining', async () => { + const server = createServer({ runner: createMockRunner({ state: 'draining' } as never) }); + + const response = await request(server.callback).get('/health'); + + expect(response.status).toBe(200); + expect(response.body).toEqual({ state: 'draining' }); + }); + + it('returns 503 when runner is stopped', async () => { + const server = createServer({ runner: createMockRunner({ state: 'stopped' } as never) }); + + const response = await request(server.callback).get('/health'); + + expect(response.status).toBe(503); + expect(response.body).toEqual({ state: 'stopped' }); + }); + + it('returns 503 when runner is idle', async () => { + const server = createServer({ runner: createMockRunner({ state: 'idle' } as never) }); + + const response = await request(server.callback).get('/health'); + + expect(response.status).toBe(503); + expect(response.body).toEqual({ state: 'idle' }); + }); + + it('does not require JWT authentication', async () => { + const server = createServer(); + + const response = await request(server.callback).get('/health'); + + expect(response.status).toBe(200); + }); + }); + describe('run access authorization', () => { it('returns 403 when hasRunAccess returns false on GET /runs/:runId', async () => { const workflowPort = createMockWorkflowPort({ diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index e6d4cee424..fadb7c6efa 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -22,15 +22,10 @@ import ReadRecordStepExecutor from '../src/executors/read-record-step-executor'; import StepExecutorFactory from '../src/executors/step-executor-factory'; import TriggerRecordActionStepExecutor from '../src/executors/trigger-record-action-step-executor'; import UpdateRecordStepExecutor from '../src/executors/update-record-step-executor'; -import ExecutorHttpServer from '../src/http/executor-http-server'; import Runner from '../src/runner'; import SchemaCache from '../src/schema-cache'; import { StepType } from '../src/types/step-definition'; -jest.mock('../src/http/executor-http-server'); - -const MockedExecutorHttpServer = ExecutorHttpServer as jest.MockedClass; - // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- @@ -62,8 +57,8 @@ function createMockAiClient() { }; } -function createMockLogger(): jest.Mocked { - return { error: jest.fn() }; +function createMockLogger(): jest.Mocked> { + return { error: jest.fn(), info: jest.fn() }; } const VALID_ENV_SECRET = 'a'.repeat(64); @@ -85,10 +80,10 @@ function createRunnerConfig( runStore: RunStore; aiClient: AiClient; logger: Logger; - httpPort: number; envSecret: string; authSecret: string; schemaCache: SchemaCache; + stopTimeoutMs: number; }> = {}, ) { return { @@ -168,9 +163,6 @@ beforeEach(() => { jest.clearAllMocks(); jest.clearAllTimers(); - MockedExecutorHttpServer.prototype.start = jest.fn().mockResolvedValue(undefined); - MockedExecutorHttpServer.prototype.stop = jest.fn().mockResolvedValue(undefined); - executeSpy = jest.spyOn(BaseStepExecutor.prototype, 'execute').mockResolvedValue({ stepOutcome: { type: 'record-task', stepId: 'step-1', stepIndex: 0, status: 'success' }, }); @@ -178,7 +170,7 @@ beforeEach(() => { afterEach(async () => { // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (runner) { + if (runner && runner.state !== 'stopped') { await runner.stop(); (runner as Runner | undefined) = undefined; } @@ -187,47 +179,24 @@ afterEach(async () => { }); // --------------------------------------------------------------------------- -// HTTP server (existing tests, kept passing) +// start // --------------------------------------------------------------------------- describe('start', () => { - it('should start the HTTP server when httpPort is configured', async () => { - const config = createRunnerConfig({ httpPort: 3100 }); + it('should call runStore.init() on start', async () => { + const config = createRunnerConfig(); runner = new Runner(config); await runner.start(); - expect(MockedExecutorHttpServer).toHaveBeenCalledWith({ - port: 3100, - runner, - authSecret: VALID_AUTH_SECRET, - workflowPort: config.workflowPort, - logger: config.logger, - }); - expect(MockedExecutorHttpServer.prototype.start).toHaveBeenCalled(); - }); - - it('should not start the HTTP server when httpPort is not configured', async () => { - runner = new Runner(createRunnerConfig()); - - await runner.start(); - - expect(MockedExecutorHttpServer).not.toHaveBeenCalled(); - }); - - it('should not create a second HTTP server if already started', async () => { - runner = new Runner(createRunnerConfig({ httpPort: 3100 })); - - await runner.start(); - await runner.start(); - - expect(MockedExecutorHttpServer).toHaveBeenCalledTimes(1); + expect(config.runStore.init).toHaveBeenCalledTimes(1); }); - it('should call runStore.init() on start', async () => { + it('should not call runStore.init() twice if already started', async () => { const config = createRunnerConfig(); runner = new Runner(config); + await runner.start(); await runner.start(); expect(config.runStore.init).toHaveBeenCalledTimes(1); @@ -249,39 +218,200 @@ describe('start', () => { }); describe('stop', () => { - it('should stop the HTTP server when running', async () => { - runner = new Runner(createRunnerConfig({ httpPort: 3100 })); + it('should call runStore.close() on stop', async () => { + const config = createRunnerConfig(); + runner = new Runner(config); await runner.start(); await runner.stop(); - expect(MockedExecutorHttpServer.prototype.stop).toHaveBeenCalled(); + expect(config.runStore.close).toHaveBeenCalledTimes(1); }); - it('should call runStore.close() on stop', async () => { + it('should no-op when called on an idle runner', async () => { + runner = new Runner(createRunnerConfig()); + + await expect(runner.stop()).resolves.toBeUndefined(); + expect(runner.state).toBe('idle'); + }); + + it('should no-op when called twice on a running runner', async () => { const config = createRunnerConfig(); runner = new Runner(config); await runner.start(); await runner.stop(); + await runner.stop(); expect(config.runStore.close).toHaveBeenCalledTimes(1); }); +}); + +// --------------------------------------------------------------------------- +// Graceful shutdown +// --------------------------------------------------------------------------- - it('should handle stop when no HTTP server is running', async () => { +describe('graceful shutdown', () => { + it('state transitions: idle → running → draining → stopped', async () => { runner = new Runner(createRunnerConfig()); - await expect(runner.stop()).resolves.toBeUndefined(); + expect(runner.state).toBe('idle'); + + await runner.start(); + expect(runner.state).toBe('running'); + + const stopPromise = runner.stop(); + expect(runner.state).toBe('draining'); + + await stopPromise; + expect(runner.state).toBe('stopped'); + }); + + it('throws when start() is called after stop()', async () => { + runner = new Runner(createRunnerConfig()); + await runner.start(); + await runner.stop(); + + await expect(runner.start()).rejects.toThrow('Runner has been stopped and cannot be restarted'); + }); + + it('logs resource cleanup failure during stop', async () => { + const logger = createMockLogger(); + const aiClient = createMockAiClient(); + aiClient.closeConnections.mockRejectedValueOnce(new Error('connection leak')); + + runner = new Runner(createRunnerConfig({ logger, aiClient: aiClient as unknown as AiClient })); + await runner.start(); + await runner.stop(); + + expect(logger.error).toHaveBeenCalledWith( + 'Resource cleanup failed during shutdown', + expect.objectContaining({ error: 'connection leak' }), + ); + }); + + it('state resets to idle on start failure', async () => { + const config = createRunnerConfig(); + (config.runStore.init as jest.Mock).mockRejectedValueOnce(new Error('init failed')); + runner = new Runner(config); + + await expect(runner.start()).rejects.toThrow('init failed'); + expect(runner.state).toBe('idle'); + }); + + it('stop() waits for in-flight steps before resolving', async () => { + let resolveStep!: () => void; + const stepPromise = new Promise(resolve => { + resolveStep = resolve; + }); + + const workflowPort = createMockWorkflowPort(); + workflowPort.getPendingStepExecutions.mockResolvedValueOnce([ + makePendingStep({ runId: 'run-1', stepId: 'step-1' }), + ]); + + jest.spyOn(StepExecutorFactory, 'create').mockResolvedValueOnce({ + execute: () => + stepPromise.then(() => ({ + stepOutcome: { type: 'condition', stepId: 'step-1', stepIndex: 0, status: 'success' }, + })), + } as never); + + runner = new Runner(createRunnerConfig({ workflowPort })); + await runner.start(); + + jest.advanceTimersByTime(POLLING_INTERVAL_MS); + await flushPromises(); + + let stopResolved = false; + const stopPromise = runner.stop().then(() => { + stopResolved = true; + }); + + // stop() should not resolve while step is in flight + await flushPromises(); + expect(stopResolved).toBe(false); + + // Resolve the step + resolveStep(); + await stopPromise; + expect(stopResolved).toBe(true); }); - it('should allow restarting after stop', async () => { - runner = new Runner(createRunnerConfig({ httpPort: 3100 })); + it('stop() resolves after timeout when step is stuck', async () => { + const workflowPort = createMockWorkflowPort(); + const logger = createMockLogger(); + workflowPort.getPendingStepExecutions.mockResolvedValueOnce([ + makePendingStep({ runId: 'run-1', stepId: 'stuck-step' }), + ]); + jest.spyOn(StepExecutorFactory, 'create').mockResolvedValueOnce({ + execute: () => new Promise(() => {}), // never resolves + } as never); + + runner = new Runner(createRunnerConfig({ workflowPort, logger, stopTimeoutMs: 50 })); await runner.start(); + + jest.advanceTimersByTime(POLLING_INTERVAL_MS); + await flushPromises(); + + jest.useRealTimers(); await runner.stop(); + jest.useFakeTimers(); + + expect(logger.error).toHaveBeenCalledWith( + 'Drain timeout — steps still in flight', + expect.objectContaining({ + remainingSteps: ['run-1:stuck-step'], + timeoutMs: 50, + }), + ); + expect(runner.state).toBe('stopped'); + }); + + it('stop() resolves immediately when no steps are in flight', async () => { + const logger = createMockLogger(); + runner = new Runner(createRunnerConfig({ logger })); await runner.start(); + await runner.stop(); - expect(MockedExecutorHttpServer).toHaveBeenCalledTimes(2); + expect(logger.info).not.toHaveBeenCalledWith('Draining in-flight steps', expect.anything()); + expect(runner.state).toBe('stopped'); + }); + + it('logs drain info when steps are in flight', async () => { + let resolveStep!: () => void; + const stepPromise = new Promise(resolve => { + resolveStep = resolve; + }); + + const workflowPort = createMockWorkflowPort(); + const logger = createMockLogger(); + workflowPort.getPendingStepExecutions.mockResolvedValueOnce([ + makePendingStep({ runId: 'run-1', stepId: 'step-1' }), + ]); + + jest.spyOn(StepExecutorFactory, 'create').mockResolvedValueOnce({ + execute: () => + stepPromise.then(() => ({ + stepOutcome: { type: 'condition', stepId: 'step-1', stepIndex: 0, status: 'success' }, + })), + } as never); + + runner = new Runner(createRunnerConfig({ workflowPort, logger })); + await runner.start(); + + jest.advanceTimersByTime(POLLING_INTERVAL_MS); + await flushPromises(); + + resolveStep(); + await runner.stop(); + + expect(logger.info).toHaveBeenCalledWith('Draining in-flight steps', { + count: 1, + steps: ['run-1:step-1'], + }); + expect(logger.info).toHaveBeenCalledWith('All in-flight steps drained', {}); }); }); diff --git a/packages/workflow-executor/test/stores/build-run-store.test.ts b/packages/workflow-executor/test/stores/build-run-store.test.ts new file mode 100644 index 0000000000..7c0f821679 --- /dev/null +++ b/packages/workflow-executor/test/stores/build-run-store.test.ts @@ -0,0 +1,42 @@ +import { buildDatabaseRunStore, buildInMemoryRunStore } from '../../src/stores/build-run-store'; +import InMemoryStore from '../../src/stores/in-memory-store'; + +describe('buildInMemoryRunStore', () => { + it('returns an initialized InMemoryStore', async () => { + const store = await buildInMemoryRunStore(); + + expect(store).toBeInstanceOf(InMemoryStore); + // Verify it works + await store.saveStepExecution('run-1', { + type: 'condition', + stepIndex: 0, + executionParams: { answer: 'yes' }, + } as never); + const result = await store.getStepExecutions('run-1'); + expect(result).toHaveLength(1); + }); +}); + +describe('buildDatabaseRunStore', () => { + it('returns an initialized DatabaseStore (SQLite)', async () => { + const store = await buildDatabaseRunStore({ dialect: 'sqlite', storage: ':memory:' }); + + // Verify it works (migration ran) + await store.saveStepExecution('run-1', { + type: 'condition', + stepIndex: 0, + executionParams: { answer: 'yes' }, + } as never); + const result = await store.getStepExecutions('run-1'); + expect(result).toHaveLength(1); + + await store.close(); + }); + + it('closes sequelize connection if init fails', async () => { + // Invalid storage path to force a failure + await expect( + buildDatabaseRunStore({ dialect: 'sqlite', storage: '/nonexistent/path/db.sqlite' }), + ).rejects.toThrow(); + }); +}); diff --git a/packages/workflow-executor/test/stores/database-store.test.ts b/packages/workflow-executor/test/stores/database-store.test.ts index 21050e9125..4f60ea60ee 100644 --- a/packages/workflow-executor/test/stores/database-store.test.ts +++ b/packages/workflow-executor/test/stores/database-store.test.ts @@ -108,4 +108,34 @@ describe('DatabaseStore (SQLite)', () => { // Running init a second time should not fail await expect(store.init()).resolves.toBeUndefined(); }); + + it('logs and rethrows when migration fails', async () => { + const badSequelize = new Sequelize({ dialect: 'sqlite', storage: ':memory:', logging: false }); + const badStore = new DatabaseStore({ sequelize: badSequelize }); + + // Break the query interface so createTable fails + jest + .spyOn(badSequelize.getQueryInterface(), 'createTable') + .mockRejectedValueOnce(new Error('disk full')); + + const logger = { error: jest.fn() }; + await expect(badStore.init(logger)).rejects.toThrow('disk full'); + expect(logger.error).toHaveBeenCalledWith( + 'Database migration failed', + expect.objectContaining({ error: 'disk full' }), + ); + + await badSequelize.close(); + }); + + it('close() catches and logs errors instead of throwing', async () => { + const logger = { error: jest.fn() }; + jest.spyOn(sequelize, 'close').mockRejectedValueOnce(new Error('close failed')); + + await expect(store.close(logger)).resolves.toBeUndefined(); + expect(logger.error).toHaveBeenCalledWith( + 'Failed to close database connection', + expect.objectContaining({ error: 'close failed' }), + ); + }); });