From bd7ac009a84fd1b3ff174314879fe3880f40423e Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 25 Mar 2026 16:23:43 +0100 Subject: [PATCH 1/8] feat(workflow-executor): add graceful shutdown with in-flight step drain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - stop() now drains in-flight steps before closing resources - Add Runner.state getter: idle → running → draining → stopped - Add stopTimeoutMs config (default 30s) to prevent hanging on stuck steps - Convert inFlightSteps from Set to Map to track step promises - HTTP server stays up during drain for frontend access - Add Logger.info optional method for drain status messages - 7 new tests: drain, timeout, state transitions, log messages fixes PRD-241 Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/workflow-executor/CLAUDE.md | 7 +- .../src/adapters/console-logger.ts | 4 + packages/workflow-executor/src/index.ts | 2 +- .../src/ports/logger-port.ts | 1 + packages/workflow-executor/src/runner.ts | 55 ++++- .../workflow-executor/test/runner.test.ts | 189 +++++++++++++++++- 6 files changed, 249 insertions(+), 9 deletions(-) diff --git a/packages/workflow-executor/CLAUDE.md b/packages/workflow-executor/CLAUDE.md index f3822a22d0..8eed02ca00 100644 --- a/packages/workflow-executor/CLAUDE.md +++ b/packages/workflow-executor/CLAUDE.md @@ -43,7 +43,7 @@ Front ◀──▶ Orchestrator ◀──pull/push──▶ Executor ── ``` src/ ├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError, NoWritableFieldsError, NoActionsError, StepPersistenceError, NoRelationshipFieldsError, RelatedRecordNotFoundError -├── runner.ts # Runner class — main entry point (start/stop/triggerPoll, HTTP server wiring) +├── runner.ts # Runner class — main entry point (start/stop/triggerPoll, HTTP server wiring, graceful drain) ├── types/ # Core type definitions (@draft) │ ├── step-definition.ts # StepType enum + step definition interfaces │ ├── step-outcome.ts # Step outcome tracking types (StepOutcome, sent to orchestrator) @@ -54,6 +54,10 @@ src/ │ ├── agent-port.ts # Interface to the Forest Admin agent (datasource) │ ├── workflow-port.ts # Interface to the orchestrator │ └── run-store.ts # Interface for persisting run state (scoped to a run) +├── stores/ # RunStore implementations +│ ├── in-memory-store.ts # InMemoryStore — Map-based, for tests +│ ├── database-store.ts # DatabaseStore — Sequelize + umzug migrations +│ └── build-run-store.ts # Factory functions: buildDatabaseRunStore, buildInMemoryRunStore ├── adapters/ # Port implementations │ ├── agent-client-agent-port.ts # AgentPort via @forestadmin/agent-client │ └── forest-server-workflow-port.ts # WorkflowPort via HTTP (forestadmin-client ServerUtils) @@ -83,6 +87,7 @@ src/ - **displayName in AI tools** — All `DynamicStructuredTool` schemas and system message prompts must use `displayName`, never `fieldName`. `displayName` is a Forest Admin frontend feature that replaces the technical field/relation/action name with a product-oriented label configured by the Forest Admin admin. End users write their workflow prompts using these display names, not the underlying technical names. After an AI tool call returns display names, map them back to `fieldName`/`name` before using them in datasource operations (e.g. filtering record values, calling `getRecord`). - **No recovery/retry** — Once the executor returns a step result to the orchestrator, the step is considered executed. There is no mechanism to re-dispatch a step, so executors must NOT include recovery checks (e.g. checking the RunStore for cached results before executing). Each step executes exactly once. - **Fetched steps must be executed** — Any step retrieved from the orchestrator via `getPendingStepExecutions()` must be executed. Silently discarding a fetched step (e.g. filtering it out by `runId` after fetching) violates the executor contract: the orchestrator assumes execution is guaranteed once the step is dispatched. The only valid filter before executing is deduplication via `inFlightSteps` (to avoid running the same step twice concurrently). +- **Graceful shutdown** — `stop()` drains in-flight steps before closing resources. The `state` getter exposes the lifecycle: `idle → running → draining → stopped`. `stopTimeoutMs` (default 30s) prevents `stop()` from hanging forever if a step is stuck. The HTTP server stays up during drain so the frontend can still query run status. Signal handling (`SIGTERM`/`SIGINT`) is the consumer's responsibility — the Runner is a library class. ## Commands diff --git a/packages/workflow-executor/src/adapters/console-logger.ts b/packages/workflow-executor/src/adapters/console-logger.ts index cbe989ab33..cb9ca735a1 100644 --- a/packages/workflow-executor/src/adapters/console-logger.ts +++ b/packages/workflow-executor/src/adapters/console-logger.ts @@ -4,4 +4,8 @@ export default class ConsoleLogger implements Logger { error(message: string, context: Record): void { console.error(JSON.stringify({ message, timestamp: new Date().toISOString(), ...context })); } + + info(message: string, context: Record): void { + console.info(JSON.stringify({ message, timestamp: new Date().toISOString(), ...context })); + } } diff --git a/packages/workflow-executor/src/index.ts b/packages/workflow-executor/src/index.ts index 189f0a3b47..65f9f4a175 100644 --- a/packages/workflow-executor/src/index.ts +++ b/packages/workflow-executor/src/index.ts @@ -101,7 +101,7 @@ export { default as ForestServerWorkflowPort } from './adapters/forest-server-wo export { default as ExecutorHttpServer } from './http/executor-http-server'; export type { ExecutorHttpServerOptions } from './http/executor-http-server'; export { default as Runner } from './runner'; -export type { RunnerConfig } from './runner'; +export type { RunnerConfig, RunnerState } from './runner'; export { default as validateSecrets } from './validate-secrets'; export { default as SchemaCache } from './schema-cache'; export { default as InMemoryStore } from './stores/in-memory-store'; diff --git a/packages/workflow-executor/src/ports/logger-port.ts b/packages/workflow-executor/src/ports/logger-port.ts index 017f8742ab..ed2acd3930 100644 --- a/packages/workflow-executor/src/ports/logger-port.ts +++ b/packages/workflow-executor/src/ports/logger-port.ts @@ -1,3 +1,4 @@ export interface Logger { error(message: string, context: Record): void; + info?(message: string, context: Record): void; } diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index 2b6bb42320..e25a7cb41b 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -21,6 +21,8 @@ import ExecutorHttpServer from './http/executor-http-server'; import patchBodySchemas from './pending-data-validators'; import validateSecrets from './validate-secrets'; +export type RunnerState = 'idle' | 'running' | 'draining' | 'stopped'; + export interface RunnerConfig { agentPort: AgentPort; workflowPort: WorkflowPort; @@ -32,15 +34,19 @@ export interface RunnerConfig { authSecret: string; logger?: Logger; httpPort?: number; + stopTimeoutMs?: number; } +const DEFAULT_STOP_TIMEOUT_MS = 30_000; + export default class Runner { private readonly config: RunnerConfig; private httpServer: ExecutorHttpServer | null = null; private pollingTimer: NodeJS.Timeout | null = null; - private readonly inFlightSteps = new Set(); + private readonly inFlightSteps = new Map>(); private isRunning = false; private readonly logger: Logger; + private _state: RunnerState = 'idle'; private static stepKey(step: PendingStepExecution): string { return `${step.runId}:${step.stepId}`; @@ -51,12 +57,17 @@ export default class Runner { this.logger = config.logger ?? new ConsoleLogger(); } + get state(): RunnerState { + return this._state; + } + async start(): Promise { if (this.isRunning) return; validateSecrets({ envSecret: this.config.envSecret, authSecret: this.config.authSecret }); this.isRunning = true; + this._state = 'running'; try { await this.config.runStore.init(this.logger); @@ -74,6 +85,7 @@ export default class Runner { } } catch (error) { this.isRunning = false; + this._state = 'idle'; throw error; } @@ -81,6 +93,7 @@ export default class Runner { } async stop(): Promise { + this._state = 'draining'; this.isRunning = false; if (this.pollingTimer !== null) { @@ -88,6 +101,32 @@ export default class Runner { this.pollingTimer = null; } + // Drain in-flight steps + if (this.inFlightSteps.size > 0) { + this.logger.info?.('Draining in-flight steps', { + count: this.inFlightSteps.size, + steps: [...this.inFlightSteps.keys()], + }); + + const timeout = this.config.stopTimeoutMs ?? DEFAULT_STOP_TIMEOUT_MS; + const drainResult = await Promise.race([ + Promise.allSettled(this.inFlightSteps.values()).then(() => 'drained' as const), + new Promise<'timeout'>(resolve => { + setTimeout(() => resolve('timeout'), timeout); + }), + ]); + + if (drainResult === 'timeout') { + this.logger.error('Drain timeout — steps still in flight', { + remainingSteps: [...this.inFlightSteps.keys()], + timeoutMs: timeout, + }); + } else { + this.logger.info?.('All in-flight steps drained', {}); + } + } + + // Close resources after drain if (this.httpServer) { await this.httpServer.stop(); this.httpServer = null; @@ -98,7 +137,7 @@ export default class Runner { this.config.runStore.close(this.logger), ]); - // TODO: graceful drain of in-flight steps (out of scope PRD-223) + this._state = 'stopped'; } async getRunStepExecutions(runId: string): Promise { @@ -189,10 +228,18 @@ export default class Runner { return this.config.aiClient.loadRemoteTools(mergedConfig); } - private async executeStep(step: PendingStepExecution): Promise { + private executeStep(step: PendingStepExecution): Promise { const key = Runner.stepKey(step); - this.inFlightSteps.add(key); + const promise = this.doExecuteStep(step, key); + this.inFlightSteps.set(key, promise); + return promise; + } + + private async doExecuteStep( + step: PendingStepExecution, + key: string, + ): Promise { let result: StepExecutionResult; try { diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index e6d4cee424..f79bd3c5d5 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -62,8 +62,8 @@ function createMockAiClient() { }; } -function createMockLogger(): jest.Mocked { - return { error: jest.fn() }; +function createMockLogger(): jest.Mocked> { + return { error: jest.fn(), info: jest.fn() }; } const VALID_ENV_SECRET = 'a'.repeat(64); @@ -89,6 +89,7 @@ function createRunnerConfig( envSecret: string; authSecret: string; schemaCache: SchemaCache; + stopTimeoutMs: number; }> = {}, ) { return { @@ -178,7 +179,7 @@ beforeEach(() => { afterEach(async () => { // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (runner) { + if (runner && runner.state !== 'stopped') { await runner.stop(); (runner as Runner | undefined) = undefined; } @@ -285,6 +286,188 @@ describe('stop', () => { }); }); +// --------------------------------------------------------------------------- +// Graceful shutdown +// --------------------------------------------------------------------------- + +describe('graceful shutdown', () => { + it('state transitions: idle → running → draining → stopped', async () => { + runner = new Runner(createRunnerConfig()); + + expect(runner.state).toBe('idle'); + + await runner.start(); + expect(runner.state).toBe('running'); + + const stopPromise = runner.stop(); + expect(runner.state).toBe('draining'); + + await stopPromise; + expect(runner.state).toBe('stopped'); + }); + + it('state resets to idle on start failure', async () => { + const config = createRunnerConfig(); + (config.runStore.init as jest.Mock).mockRejectedValueOnce(new Error('init failed')); + runner = new Runner(config); + + await expect(runner.start()).rejects.toThrow('init failed'); + expect(runner.state).toBe('idle'); + }); + + it('stop() waits for in-flight steps before resolving', async () => { + let resolveStep!: () => void; + const stepPromise = new Promise(resolve => { + resolveStep = resolve; + }); + + const workflowPort = createMockWorkflowPort(); + workflowPort.getPendingStepExecutions.mockResolvedValueOnce([ + makePendingStep({ runId: 'run-1', stepId: 'step-1' }), + ]); + + jest.spyOn(StepExecutorFactory, 'create').mockResolvedValueOnce({ + execute: () => + stepPromise.then(() => ({ + stepOutcome: { type: 'condition', stepId: 'step-1', stepIndex: 0, status: 'success' }, + })), + } as never); + + runner = new Runner(createRunnerConfig({ workflowPort })); + await runner.start(); + + jest.advanceTimersByTime(POLLING_INTERVAL_MS); + await flushPromises(); + + let stopResolved = false; + const stopPromise = runner.stop().then(() => { + stopResolved = true; + }); + + // stop() should not resolve while step is in flight + await flushPromises(); + expect(stopResolved).toBe(false); + + // Resolve the step + resolveStep(); + await stopPromise; + expect(stopResolved).toBe(true); + }); + + it('stop() resolves after timeout when step is stuck', async () => { + const workflowPort = createMockWorkflowPort(); + const logger = createMockLogger(); + workflowPort.getPendingStepExecutions.mockResolvedValueOnce([ + makePendingStep({ runId: 'run-1', stepId: 'stuck-step' }), + ]); + + jest.spyOn(StepExecutorFactory, 'create').mockResolvedValueOnce({ + execute: () => new Promise(() => {}), // never resolves + } as never); + + runner = new Runner(createRunnerConfig({ workflowPort, logger, stopTimeoutMs: 50 })); + await runner.start(); + + jest.advanceTimersByTime(POLLING_INTERVAL_MS); + await flushPromises(); + + jest.useRealTimers(); + await runner.stop(); + jest.useFakeTimers(); + + expect(logger.error).toHaveBeenCalledWith( + 'Drain timeout — steps still in flight', + expect.objectContaining({ + remainingSteps: ['run-1:stuck-step'], + timeoutMs: 50, + }), + ); + expect(runner.state).toBe('stopped'); + }); + + it('stop() resolves immediately when no steps are in flight', async () => { + const logger = createMockLogger(); + runner = new Runner(createRunnerConfig({ logger })); + await runner.start(); + await runner.stop(); + + expect(logger.info).not.toHaveBeenCalledWith('Draining in-flight steps', expect.anything()); + expect(runner.state).toBe('stopped'); + }); + + it('HTTP server is closed after drain completes', async () => { + let resolveStep!: () => void; + const stepPromise = new Promise(resolve => { + resolveStep = resolve; + }); + + const workflowPort = createMockWorkflowPort(); + workflowPort.getPendingStepExecutions.mockResolvedValueOnce([ + makePendingStep({ runId: 'run-1', stepId: 'step-1' }), + ]); + + jest.spyOn(StepExecutorFactory, 'create').mockResolvedValueOnce({ + execute: () => + stepPromise.then(() => ({ + stepOutcome: { type: 'condition', stepId: 'step-1', stepIndex: 0, status: 'success' }, + })), + } as never); + + runner = new Runner(createRunnerConfig({ workflowPort, httpPort: 3100 })); + await runner.start(); + + jest.advanceTimersByTime(POLLING_INTERVAL_MS); + await flushPromises(); + + const stopPromise = runner.stop(); + await flushPromises(); + + // HTTP server should NOT have been stopped yet (drain in progress) + expect(MockedExecutorHttpServer.prototype.stop).not.toHaveBeenCalled(); + + resolveStep(); + await stopPromise; + + // Now HTTP server should be stopped + expect(MockedExecutorHttpServer.prototype.stop).toHaveBeenCalled(); + }); + + it('logs drain info when steps are in flight', async () => { + let resolveStep!: () => void; + const stepPromise = new Promise(resolve => { + resolveStep = resolve; + }); + + const workflowPort = createMockWorkflowPort(); + const logger = createMockLogger(); + workflowPort.getPendingStepExecutions.mockResolvedValueOnce([ + makePendingStep({ runId: 'run-1', stepId: 'step-1' }), + ]); + + jest.spyOn(StepExecutorFactory, 'create').mockResolvedValueOnce({ + execute: () => + stepPromise.then(() => ({ + stepOutcome: { type: 'condition', stepId: 'step-1', stepIndex: 0, status: 'success' }, + })), + } as never); + + runner = new Runner(createRunnerConfig({ workflowPort, logger })); + await runner.start(); + + jest.advanceTimersByTime(POLLING_INTERVAL_MS); + await flushPromises(); + + resolveStep(); + await runner.stop(); + + expect(logger.info).toHaveBeenCalledWith('Draining in-flight steps', { + count: 1, + steps: ['run-1:step-1'], + }); + expect(logger.info).toHaveBeenCalledWith('All in-flight steps drained', {}); + }); +}); + // --------------------------------------------------------------------------- // Polling loop // --------------------------------------------------------------------------- From 371befcd51cbc4ba68eaf8f2cbbec4e6bc16b445 Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 25 Mar 2026 17:06:55 +0100 Subject: [PATCH 2/8] feat(workflow-executor): add GET /health endpoint for ops monitoring Public endpoint (no JWT required) that returns the Runner state: - 200 { state: 'running' } or { state: 'draining' } - 503 { state: 'stopped' } or { state: 'idle' } Usable as k8s readiness probe, ECS health check, or manual curl. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/http/executor-http-server.ts | 13 +++++ .../test/http/executor-http-server.test.ts | 47 +++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/packages/workflow-executor/src/http/executor-http-server.ts b/packages/workflow-executor/src/http/executor-http-server.ts index 1cc287060b..e9f3cec9b6 100644 --- a/packages/workflow-executor/src/http/executor-http-server.ts +++ b/packages/workflow-executor/src/http/executor-http-server.ts @@ -62,6 +62,19 @@ export default class ExecutorHttpServer { } }); + // Health endpoint — before JWT so it's publicly accessible (infra probes don't send tokens) + this.app.use(async (ctx, next) => { + if (ctx.method === 'GET' && ctx.path === '/health') { + const { state } = this.options.runner; + ctx.status = state === 'running' || state === 'draining' ? 200 : 503; + ctx.body = { state }; + + return; + } + + await next(); + }); + this.app.use(bodyParser()); // JWT middleware — validates Bearer token using authSecret diff --git a/packages/workflow-executor/test/http/executor-http-server.test.ts b/packages/workflow-executor/test/http/executor-http-server.test.ts index 407c11ae54..6fb5067c04 100644 --- a/packages/workflow-executor/test/http/executor-http-server.test.ts +++ b/packages/workflow-executor/test/http/executor-http-server.test.ts @@ -20,6 +20,7 @@ function signToken(payload: object, secret = AUTH_SECRET, options?: jsonwebtoken function createMockRunner(overrides: Partial = {}): Runner { return { + state: 'running', start: jest.fn().mockResolvedValue(undefined), stop: jest.fn().mockResolvedValue(undefined), triggerPoll: jest.fn().mockResolvedValue(undefined), @@ -156,6 +157,52 @@ describe('ExecutorHttpServer', () => { }); }); + describe('GET /health', () => { + it('returns 200 with state when runner is running', async () => { + const server = createServer({ runner: createMockRunner({ state: 'running' } as never) }); + + const response = await request(server.callback).get('/health'); + + expect(response.status).toBe(200); + expect(response.body).toEqual({ state: 'running' }); + }); + + it('returns 200 with state when runner is draining', async () => { + const server = createServer({ runner: createMockRunner({ state: 'draining' } as never) }); + + const response = await request(server.callback).get('/health'); + + expect(response.status).toBe(200); + expect(response.body).toEqual({ state: 'draining' }); + }); + + it('returns 503 when runner is stopped', async () => { + const server = createServer({ runner: createMockRunner({ state: 'stopped' } as never) }); + + const response = await request(server.callback).get('/health'); + + expect(response.status).toBe(503); + expect(response.body).toEqual({ state: 'stopped' }); + }); + + it('returns 503 when runner is idle', async () => { + const server = createServer({ runner: createMockRunner({ state: 'idle' } as never) }); + + const response = await request(server.callback).get('/health'); + + expect(response.status).toBe(503); + expect(response.body).toEqual({ state: 'idle' }); + }); + + it('does not require JWT authentication', async () => { + const server = createServer(); + + const response = await request(server.callback).get('/health'); + + expect(response.status).toBe(200); + }); + }); + describe('run access authorization', () => { it('returns 403 when hasRunAccess returns false on GET /runs/:runId', async () => { const workflowPort = createMockWorkflowPort({ From 3dd82febb0dfba5d96036246ea71b7d4fbdf967d Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Fri, 27 Mar 2026 09:20:00 +0100 Subject: [PATCH 3/8] feat(workflow-executor): handle SIGTERM/SIGINT for graceful shutdown Register process signal handlers in start(), remove them in stop(). On SIGTERM or SIGINT, the runner drains in-flight steps then exits. If stop() fails, exits with code 1. Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/workflow-executor/src/runner.ts | 25 ++++++++++++--- .../workflow-executor/test/runner.test.ts | 31 +++++++++++++++++++ 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index e25a7cb41b..c00ae5edf1 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -48,6 +48,21 @@ export default class Runner { private readonly logger: Logger; private _state: RunnerState = 'idle'; + private readonly shutdownHandler = async () => { + this.logger.info?.('Received shutdown signal, stopping gracefully...', {}); + + try { + await this.stop(); + } catch (error) { + this.logger.error('Graceful shutdown failed', { + error: error instanceof Error ? error.message : String(error), + }); + process.exit(1); + } + + process.exit(0); + }; + private static stepKey(step: PendingStepExecution): string { return `${step.runId}:${step.stepId}`; } @@ -89,10 +104,15 @@ export default class Runner { throw error; } + process.on('SIGTERM', this.shutdownHandler); + process.on('SIGINT', this.shutdownHandler); + this.schedulePoll(); } async stop(): Promise { + process.removeListener('SIGTERM', this.shutdownHandler); + process.removeListener('SIGINT', this.shutdownHandler); this._state = 'draining'; this.isRunning = false; @@ -236,10 +256,7 @@ export default class Runner { return promise; } - private async doExecuteStep( - step: PendingStepExecution, - key: string, - ): Promise { + private async doExecuteStep(step: PendingStepExecution, key: string): Promise { let result: StepExecutionResult; try { diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index f79bd3c5d5..47d36cdfa7 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -468,6 +468,37 @@ describe('graceful shutdown', () => { }); }); +// --------------------------------------------------------------------------- +// Signal handlers (SIGTERM / SIGINT) +// --------------------------------------------------------------------------- + +describe('signal handlers', () => { + it('registers SIGTERM and SIGINT handlers on start', async () => { + const onSpy = jest.spyOn(process, 'on'); + runner = new Runner(createRunnerConfig()); + + await runner.start(); + + expect(onSpy).toHaveBeenCalledWith('SIGTERM', expect.any(Function)); + expect(onSpy).toHaveBeenCalledWith('SIGINT', expect.any(Function)); + + onSpy.mockRestore(); + }); + + it('removes signal handlers on stop', async () => { + const removeSpy = jest.spyOn(process, 'removeListener'); + runner = new Runner(createRunnerConfig()); + + await runner.start(); + await runner.stop(); + + expect(removeSpy).toHaveBeenCalledWith('SIGTERM', expect.any(Function)); + expect(removeSpy).toHaveBeenCalledWith('SIGINT', expect.any(Function)); + + removeSpy.mockRestore(); + }); +}); + // --------------------------------------------------------------------------- // Polling loop // --------------------------------------------------------------------------- From 1ecd8107ad741bfa3963cc5e769b8e4e1013ea9d Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Fri, 27 Mar 2026 09:59:33 +0100 Subject: [PATCH 4/8] refactor(workflow-executor): decouple Runner from HTTP and signal handlers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Runner is now a pure library: no HTTP server, no process signals - Factory functions (buildInMemoryExecutor/buildDatabaseExecutor) compose Runner + ExecutorHttpServer + SIGTERM/SIGINT handlers - Fix review issues: - Re-entrancy guard in stop() (idle/stopped/draining → return) - Clear drain timer when drain succeeds - Inspect Promise.allSettled results and log failures - process.exitCode instead of process.exit() - HTTP server stop wrapped in try-catch - WorkflowExecutor interface now exposes state getter Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/build-workflow-executor.ts | 110 ++++++++++++++- packages/workflow-executor/src/runner.ts | 69 +++------ .../test/build-workflow-executor.test.ts | 16 ++- .../workflow-executor/test/runner.test.ts | 133 ++---------------- 4 files changed, 149 insertions(+), 179 deletions(-) diff --git a/packages/workflow-executor/src/build-workflow-executor.ts b/packages/workflow-executor/src/build-workflow-executor.ts index e789d54499..823d23329e 100644 --- a/packages/workflow-executor/src/build-workflow-executor.ts +++ b/packages/workflow-executor/src/build-workflow-executor.ts @@ -1,4 +1,5 @@ import type { Logger } from './ports/logger-port'; +import type { RunnerState } from './runner'; import type { AiConfiguration } from '@forestadmin/ai-proxy'; import type { Options as SequelizeOptions } from 'sequelize'; @@ -6,7 +7,9 @@ import { AiClient } from '@forestadmin/ai-proxy'; import { Sequelize } from 'sequelize'; import AgentClientAgentPort from './adapters/agent-client-agent-port'; +import ConsoleLogger from './adapters/console-logger'; import ForestServerWorkflowPort from './adapters/forest-server-workflow-port'; +import ExecutorHttpServer from './http/executor-http-server'; import Runner from './runner'; import SchemaCache from './schema-cache'; import DatabaseStore from './stores/database-store'; @@ -18,6 +21,7 @@ const DEFAULT_POLLING_INTERVAL_MS = 5000; export interface WorkflowExecutor { start(): Promise; stop(): Promise; + readonly state: RunnerState; } export interface ExecutorOptions { @@ -29,6 +33,7 @@ export interface ExecutorOptions { pollingIntervalMs?: number; httpPort?: number; logger?: Logger; + stopTimeoutMs?: number; } export type DatabaseExecutorOptions = ExecutorOptions & @@ -36,6 +41,7 @@ export type DatabaseExecutorOptions = ExecutorOptions & function buildCommonDependencies(options: ExecutorOptions) { const forestServerUrl = options.forestServerUrl ?? DEFAULT_FOREST_SERVER_URL; + const logger = options.logger ?? new ConsoleLogger(); const workflowPort = new ForestServerWorkflowPort({ envSecret: options.envSecret, @@ -59,27 +65,119 @@ function buildCommonDependencies(options: ExecutorOptions) { schemaCache, workflowPort, aiClient, + logger, pollingIntervalMs: options.pollingIntervalMs ?? DEFAULT_POLLING_INTERVAL_MS, envSecret: options.envSecret, authSecret: options.authSecret, - httpPort: options.httpPort, - logger: options.logger, + stopTimeoutMs: options.stopTimeoutMs, + }; +} + +function createWorkflowExecutor( + runner: Runner, + server: ExecutorHttpServer | null, + logger: Logger, +): WorkflowExecutor { + let stopping = false; + + const shutdown = async () => { + if (stopping) return; + stopping = true; + + if (server) { + try { + await server.stop(); + } catch (err) { + logger.error('HTTP server close failed during shutdown', { + error: err instanceof Error ? err.message : String(err), + }); + } + } + + await runner.stop(); + }; + + const onSignal = async () => { + logger.info('Received shutdown signal, stopping gracefully...', {}); + + try { + await shutdown(); + } catch (error) { + logger.error('Graceful shutdown failed', { + error: error instanceof Error ? error.message : String(error), + }); + process.exitCode = 1; + + return; + } + + process.exitCode = 0; + }; + + return { + get state() { + return runner.state; + }, + + async start() { + await runner.start(); + + if (server) { + await server.start(); + } + + process.on('SIGTERM', onSignal); + process.on('SIGINT', onSignal); + }, + + async stop() { + process.removeListener('SIGTERM', onSignal); + process.removeListener('SIGINT', onSignal); + await shutdown(); + }, }; } export function buildInMemoryExecutor(options: ExecutorOptions): WorkflowExecutor { - return new Runner({ - ...buildCommonDependencies(options), + const deps = buildCommonDependencies(options); + + const runner = new Runner({ + ...deps, runStore: new InMemoryStore(), }); + + const server = options.httpPort + ? new ExecutorHttpServer({ + port: options.httpPort, + runner, + authSecret: options.authSecret, + workflowPort: deps.workflowPort, + logger: deps.logger, + }) + : null; + + return createWorkflowExecutor(runner, server, deps.logger); } export function buildDatabaseExecutor(options: DatabaseExecutorOptions): WorkflowExecutor { + const deps = buildCommonDependencies(options); const { uri, ...sequelizeOptions } = options.database as SequelizeOptions & { uri?: string }; const sequelize = uri ? new Sequelize(uri, sequelizeOptions) : new Sequelize(sequelizeOptions); - return new Runner({ - ...buildCommonDependencies(options), + const runner = new Runner({ + ...deps, runStore: new DatabaseStore({ sequelize }), }); + + const server = options.httpPort + ? new ExecutorHttpServer({ + port: options.httpPort, + runner, + authSecret: options.authSecret, + workflowPort: deps.workflowPort, + logger: deps.logger, + }) + : null; + + return createWorkflowExecutor(runner, server, deps.logger); } diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index c00ae5edf1..59425b3106 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -17,7 +17,6 @@ import { causeMessage, } from './errors'; import StepExecutorFactory from './executors/step-executor-factory'; -import ExecutorHttpServer from './http/executor-http-server'; import patchBodySchemas from './pending-data-validators'; import validateSecrets from './validate-secrets'; @@ -33,7 +32,6 @@ export interface RunnerConfig { envSecret: string; authSecret: string; logger?: Logger; - httpPort?: number; stopTimeoutMs?: number; } @@ -41,28 +39,12 @@ const DEFAULT_STOP_TIMEOUT_MS = 30_000; export default class Runner { private readonly config: RunnerConfig; - private httpServer: ExecutorHttpServer | null = null; private pollingTimer: NodeJS.Timeout | null = null; private readonly inFlightSteps = new Map>(); private isRunning = false; - private readonly logger: Logger; + readonly logger: Logger; private _state: RunnerState = 'idle'; - private readonly shutdownHandler = async () => { - this.logger.info?.('Received shutdown signal, stopping gracefully...', {}); - - try { - await this.stop(); - } catch (error) { - this.logger.error('Graceful shutdown failed', { - error: error instanceof Error ? error.message : String(error), - }); - process.exit(1); - } - - process.exit(0); - }; - private static stepKey(step: PendingStepExecution): string { return `${step.runId}:${step.stepId}`; } @@ -86,33 +68,18 @@ export default class Runner { try { await this.config.runStore.init(this.logger); - - if (this.config.httpPort !== undefined && !this.httpServer) { - const server = new ExecutorHttpServer({ - port: this.config.httpPort, - runner: this, - authSecret: this.config.authSecret, - workflowPort: this.config.workflowPort, - logger: this.logger, - }); - await server.start(); - this.httpServer = server; - } } catch (error) { this.isRunning = false; this._state = 'idle'; throw error; } - process.on('SIGTERM', this.shutdownHandler); - process.on('SIGINT', this.shutdownHandler); - this.schedulePoll(); } async stop(): Promise { - process.removeListener('SIGTERM', this.shutdownHandler); - process.removeListener('SIGINT', this.shutdownHandler); + if (this._state === 'idle' || this._state === 'stopped' || this._state === 'draining') return; + this._state = 'draining'; this.isRunning = false; @@ -123,16 +90,21 @@ export default class Runner { // Drain in-flight steps if (this.inFlightSteps.size > 0) { - this.logger.info?.('Draining in-flight steps', { + this.logger.info('Draining in-flight steps', { count: this.inFlightSteps.size, steps: [...this.inFlightSteps.keys()], }); const timeout = this.config.stopTimeoutMs ?? DEFAULT_STOP_TIMEOUT_MS; + let drainTimer: NodeJS.Timeout; const drainResult = await Promise.race([ - Promise.allSettled(this.inFlightSteps.values()).then(() => 'drained' as const), + Promise.allSettled(this.inFlightSteps.values()).then(() => { + clearTimeout(drainTimer); + + return 'drained' as const; + }), new Promise<'timeout'>(resolve => { - setTimeout(() => resolve('timeout'), timeout); + drainTimer = setTimeout(() => resolve('timeout'), timeout); }), ]); @@ -142,21 +114,24 @@ export default class Runner { timeoutMs: timeout, }); } else { - this.logger.info?.('All in-flight steps drained', {}); + this.logger.info('All in-flight steps drained', {}); } } - // Close resources after drain - if (this.httpServer) { - await this.httpServer.stop(); - this.httpServer = null; - } - - await Promise.allSettled([ + // Close resources — log failures instead of silently swallowing + const results = await Promise.allSettled([ this.config.aiClient.closeConnections(), this.config.runStore.close(this.logger), ]); + for (const result of results) { + if (result.status === 'rejected') { + this.logger.error('Resource cleanup failed during shutdown', { + error: result.reason instanceof Error ? result.reason.message : String(result.reason), + }); + } + } + this._state = 'stopped'; } diff --git a/packages/workflow-executor/test/build-workflow-executor.test.ts b/packages/workflow-executor/test/build-workflow-executor.test.ts index 6ed51b9325..0bced8a9d1 100644 --- a/packages/workflow-executor/test/build-workflow-executor.test.ts +++ b/packages/workflow-executor/test/build-workflow-executor.test.ts @@ -33,10 +33,12 @@ beforeEach(() => { }); describe('buildInMemoryExecutor', () => { - it('returns a WorkflowExecutor backed by a Runner', () => { + it('returns a WorkflowExecutor with start, stop, and state', () => { const executor = buildInMemoryExecutor(BASE_OPTIONS); - expect(executor).toBeInstanceOf(Runner); + expect(executor).toHaveProperty('start'); + expect(executor).toHaveProperty('stop'); + expect(executor).toHaveProperty('state'); }); it('creates an InMemoryStore as runStore', () => { @@ -121,10 +123,10 @@ describe('buildInMemoryExecutor', () => { ); }); - it('passes optional httpPort', () => { + it('does not pass httpPort to Runner (HTTP is composed externally)', () => { buildInMemoryExecutor({ ...BASE_OPTIONS, httpPort: 3000 }); - expect(MockedRunner).toHaveBeenCalledWith(expect.objectContaining({ httpPort: 3000 })); + expect(MockedRunner).toHaveBeenCalledWith(expect.not.objectContaining({ httpPort: 3000 })); }); }); @@ -137,10 +139,12 @@ describe('buildDatabaseExecutor', () => { database: { uri: 'postgres://localhost/mydb', dialect: 'postgres' as const }, }; - it('returns a WorkflowExecutor backed by a Runner', () => { + it('returns a WorkflowExecutor with start, stop, and state', () => { const executor = buildDatabaseExecutor(DB_OPTIONS); - expect(executor).toBeInstanceOf(Runner); + expect(executor).toHaveProperty('start'); + expect(executor).toHaveProperty('stop'); + expect(executor).toHaveProperty('state'); }); it('creates a DatabaseStore as runStore', () => { diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index 47d36cdfa7..963f2890b0 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -22,15 +22,10 @@ import ReadRecordStepExecutor from '../src/executors/read-record-step-executor'; import StepExecutorFactory from '../src/executors/step-executor-factory'; import TriggerRecordActionStepExecutor from '../src/executors/trigger-record-action-step-executor'; import UpdateRecordStepExecutor from '../src/executors/update-record-step-executor'; -import ExecutorHttpServer from '../src/http/executor-http-server'; import Runner from '../src/runner'; import SchemaCache from '../src/schema-cache'; import { StepType } from '../src/types/step-definition'; -jest.mock('../src/http/executor-http-server'); - -const MockedExecutorHttpServer = ExecutorHttpServer as jest.MockedClass; - // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- @@ -85,7 +80,6 @@ function createRunnerConfig( runStore: RunStore; aiClient: AiClient; logger: Logger; - httpPort: number; envSecret: string; authSecret: string; schemaCache: SchemaCache; @@ -169,9 +163,6 @@ beforeEach(() => { jest.clearAllMocks(); jest.clearAllTimers(); - MockedExecutorHttpServer.prototype.start = jest.fn().mockResolvedValue(undefined); - MockedExecutorHttpServer.prototype.stop = jest.fn().mockResolvedValue(undefined); - executeSpy = jest.spyOn(BaseStepExecutor.prototype, 'execute').mockResolvedValue({ stepOutcome: { type: 'record-task', stepId: 'step-1', stepIndex: 0, status: 'success' }, }); @@ -188,47 +179,24 @@ afterEach(async () => { }); // --------------------------------------------------------------------------- -// HTTP server (existing tests, kept passing) +// start // --------------------------------------------------------------------------- describe('start', () => { - it('should start the HTTP server when httpPort is configured', async () => { - const config = createRunnerConfig({ httpPort: 3100 }); + it('should call runStore.init() on start', async () => { + const config = createRunnerConfig(); runner = new Runner(config); await runner.start(); - expect(MockedExecutorHttpServer).toHaveBeenCalledWith({ - port: 3100, - runner, - authSecret: VALID_AUTH_SECRET, - workflowPort: config.workflowPort, - logger: config.logger, - }); - expect(MockedExecutorHttpServer.prototype.start).toHaveBeenCalled(); - }); - - it('should not start the HTTP server when httpPort is not configured', async () => { - runner = new Runner(createRunnerConfig()); - - await runner.start(); - - expect(MockedExecutorHttpServer).not.toHaveBeenCalled(); - }); - - it('should not create a second HTTP server if already started', async () => { - runner = new Runner(createRunnerConfig({ httpPort: 3100 })); - - await runner.start(); - await runner.start(); - - expect(MockedExecutorHttpServer).toHaveBeenCalledTimes(1); + expect(config.runStore.init).toHaveBeenCalledTimes(1); }); - it('should call runStore.init() on start', async () => { + it('should not call runStore.init() twice if already started', async () => { const config = createRunnerConfig(); runner = new Runner(config); + await runner.start(); await runner.start(); expect(config.runStore.init).toHaveBeenCalledTimes(1); @@ -250,15 +218,6 @@ describe('start', () => { }); describe('stop', () => { - it('should stop the HTTP server when running', async () => { - runner = new Runner(createRunnerConfig({ httpPort: 3100 })); - - await runner.start(); - await runner.stop(); - - expect(MockedExecutorHttpServer.prototype.stop).toHaveBeenCalled(); - }); - it('should call runStore.close() on stop', async () => { const config = createRunnerConfig(); runner = new Runner(config); @@ -269,20 +228,22 @@ describe('stop', () => { expect(config.runStore.close).toHaveBeenCalledTimes(1); }); - it('should handle stop when no HTTP server is running', async () => { + it('should no-op when called on an idle runner', async () => { runner = new Runner(createRunnerConfig()); await expect(runner.stop()).resolves.toBeUndefined(); + expect(runner.state).toBe('idle'); }); - it('should allow restarting after stop', async () => { - runner = new Runner(createRunnerConfig({ httpPort: 3100 })); + it('should no-op when called twice on a running runner', async () => { + const config = createRunnerConfig(); + runner = new Runner(config); await runner.start(); await runner.stop(); - await runner.start(); + await runner.stop(); - expect(MockedExecutorHttpServer).toHaveBeenCalledTimes(2); + expect(config.runStore.close).toHaveBeenCalledTimes(1); }); }); @@ -395,43 +356,6 @@ describe('graceful shutdown', () => { expect(runner.state).toBe('stopped'); }); - it('HTTP server is closed after drain completes', async () => { - let resolveStep!: () => void; - const stepPromise = new Promise(resolve => { - resolveStep = resolve; - }); - - const workflowPort = createMockWorkflowPort(); - workflowPort.getPendingStepExecutions.mockResolvedValueOnce([ - makePendingStep({ runId: 'run-1', stepId: 'step-1' }), - ]); - - jest.spyOn(StepExecutorFactory, 'create').mockResolvedValueOnce({ - execute: () => - stepPromise.then(() => ({ - stepOutcome: { type: 'condition', stepId: 'step-1', stepIndex: 0, status: 'success' }, - })), - } as never); - - runner = new Runner(createRunnerConfig({ workflowPort, httpPort: 3100 })); - await runner.start(); - - jest.advanceTimersByTime(POLLING_INTERVAL_MS); - await flushPromises(); - - const stopPromise = runner.stop(); - await flushPromises(); - - // HTTP server should NOT have been stopped yet (drain in progress) - expect(MockedExecutorHttpServer.prototype.stop).not.toHaveBeenCalled(); - - resolveStep(); - await stopPromise; - - // Now HTTP server should be stopped - expect(MockedExecutorHttpServer.prototype.stop).toHaveBeenCalled(); - }); - it('logs drain info when steps are in flight', async () => { let resolveStep!: () => void; const stepPromise = new Promise(resolve => { @@ -468,37 +392,6 @@ describe('graceful shutdown', () => { }); }); -// --------------------------------------------------------------------------- -// Signal handlers (SIGTERM / SIGINT) -// --------------------------------------------------------------------------- - -describe('signal handlers', () => { - it('registers SIGTERM and SIGINT handlers on start', async () => { - const onSpy = jest.spyOn(process, 'on'); - runner = new Runner(createRunnerConfig()); - - await runner.start(); - - expect(onSpy).toHaveBeenCalledWith('SIGTERM', expect.any(Function)); - expect(onSpy).toHaveBeenCalledWith('SIGINT', expect.any(Function)); - - onSpy.mockRestore(); - }); - - it('removes signal handlers on stop', async () => { - const removeSpy = jest.spyOn(process, 'removeListener'); - runner = new Runner(createRunnerConfig()); - - await runner.start(); - await runner.stop(); - - expect(removeSpy).toHaveBeenCalledWith('SIGTERM', expect.any(Function)); - expect(removeSpy).toHaveBeenCalledWith('SIGINT', expect.any(Function)); - - removeSpy.mockRestore(); - }); -}); - // --------------------------------------------------------------------------- // Polling loop // --------------------------------------------------------------------------- From fb68843938f61544ba5bfe621551e58786c93c1d Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Fri, 27 Mar 2026 10:12:20 +0100 Subject: [PATCH 5/8] fix(workflow-executor): address all review issues on graceful shutdown - httpPort now required in ExecutorOptions (executor always needs HTTP) - start() rejects on stopped Runner (cannot restart after stop) - stop() uses finally block to guarantee state reaches 'stopped' - logger.info called with ?. (info is optional on Logger interface) - drainTimer initialized as undefined, cleared on success - Shared shutdown promise (concurrent callers await the same shutdown) - Safety net: force exit after 5s if event loop doesn't drain (.unref()) - HTTP server stop wrapped in try-catch (failure doesn't block drain) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/build-workflow-executor.ts | 80 ++++++++-------- packages/workflow-executor/src/runner.ts | 94 +++++++++---------- .../test/build-workflow-executor.test.ts | 1 + 3 files changed, 86 insertions(+), 89 deletions(-) diff --git a/packages/workflow-executor/src/build-workflow-executor.ts b/packages/workflow-executor/src/build-workflow-executor.ts index 823d23329e..f09c01b9d8 100644 --- a/packages/workflow-executor/src/build-workflow-executor.ts +++ b/packages/workflow-executor/src/build-workflow-executor.ts @@ -17,6 +17,7 @@ import InMemoryStore from './stores/in-memory-store'; const DEFAULT_FOREST_SERVER_URL = 'https://api.forestadmin.com'; const DEFAULT_POLLING_INTERVAL_MS = 5000; +const FORCE_EXIT_DELAY_MS = 5000; export interface WorkflowExecutor { start(): Promise; @@ -28,10 +29,10 @@ export interface ExecutorOptions { envSecret: string; authSecret: string; agentUrl: string; + httpPort: number; forestServerUrl?: string; aiConfigurations: AiConfiguration[]; pollingIntervalMs?: number; - httpPort?: number; logger?: Logger; stopTimeoutMs?: number; } @@ -75,43 +76,43 @@ function buildCommonDependencies(options: ExecutorOptions) { function createWorkflowExecutor( runner: Runner, - server: ExecutorHttpServer | null, + server: ExecutorHttpServer, logger: Logger, ): WorkflowExecutor { - let stopping = false; + let shutdownPromise: Promise | null = null; const shutdown = async () => { - if (stopping) return; - stopping = true; - - if (server) { - try { - await server.stop(); - } catch (err) { - logger.error('HTTP server close failed during shutdown', { - error: err instanceof Error ? err.message : String(err), - }); - } + try { + await server.stop(); + } catch (err) { + logger.error('HTTP server close failed during shutdown', { + error: err instanceof Error ? err.message : String(err), + }); } await runner.stop(); }; const onSignal = async () => { - logger.info('Received shutdown signal, stopping gracefully...', {}); + logger.info?.('Received shutdown signal, stopping gracefully...', {}); try { - await shutdown(); + if (!shutdownPromise) shutdownPromise = shutdown(); + await shutdownPromise; + process.exitCode = 0; } catch (error) { logger.error('Graceful shutdown failed', { error: error instanceof Error ? error.message : String(error), }); process.exitCode = 1; - - return; } - process.exitCode = 0; + // Safety net: force exit if the event loop doesn't drain + // eslint-disable-next-line no-console + setTimeout(() => { + logger.error('Process did not exit after shutdown — forcing exit', {}); + process.exit(process.exitCode ?? 1); + }, FORCE_EXIT_DELAY_MS).unref(); }; return { @@ -121,10 +122,7 @@ function createWorkflowExecutor( async start() { await runner.start(); - - if (server) { - await server.start(); - } + await server.start(); process.on('SIGTERM', onSignal); process.on('SIGINT', onSignal); @@ -133,7 +131,9 @@ function createWorkflowExecutor( async stop() { process.removeListener('SIGTERM', onSignal); process.removeListener('SIGINT', onSignal); - await shutdown(); + + if (!shutdownPromise) shutdownPromise = shutdown(); + await shutdownPromise; }, }; } @@ -146,15 +146,13 @@ export function buildInMemoryExecutor(options: ExecutorOptions): WorkflowExecuto runStore: new InMemoryStore(), }); - const server = options.httpPort - ? new ExecutorHttpServer({ - port: options.httpPort, - runner, - authSecret: options.authSecret, - workflowPort: deps.workflowPort, - logger: deps.logger, - }) - : null; + const server = new ExecutorHttpServer({ + port: options.httpPort, + runner, + authSecret: options.authSecret, + workflowPort: deps.workflowPort, + logger: deps.logger, + }); return createWorkflowExecutor(runner, server, deps.logger); } @@ -169,15 +167,13 @@ export function buildDatabaseExecutor(options: DatabaseExecutorOptions): Workflo runStore: new DatabaseStore({ sequelize }), }); - const server = options.httpPort - ? new ExecutorHttpServer({ - port: options.httpPort, - runner, - authSecret: options.authSecret, - workflowPort: deps.workflowPort, - logger: deps.logger, - }) - : null; + const server = new ExecutorHttpServer({ + port: options.httpPort, + runner, + authSecret: options.authSecret, + workflowPort: deps.workflowPort, + logger: deps.logger, + }); return createWorkflowExecutor(runner, server, deps.logger); } diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index 59425b3106..9d6096b130 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -42,7 +42,7 @@ export default class Runner { private pollingTimer: NodeJS.Timeout | null = null; private readonly inFlightSteps = new Map>(); private isRunning = false; - readonly logger: Logger; + private readonly logger: Logger; private _state: RunnerState = 'idle'; private static stepKey(step: PendingStepExecution): string { @@ -59,6 +59,10 @@ export default class Runner { } async start(): Promise { + if (this._state === 'stopped') { + throw new Error('Runner has been stopped and cannot be restarted'); + } + if (this.isRunning) return; validateSecrets({ envSecret: this.config.envSecret, authSecret: this.config.authSecret }); @@ -88,51 +92,53 @@ export default class Runner { this.pollingTimer = null; } - // Drain in-flight steps - if (this.inFlightSteps.size > 0) { - this.logger.info('Draining in-flight steps', { - count: this.inFlightSteps.size, - steps: [...this.inFlightSteps.keys()], - }); - - const timeout = this.config.stopTimeoutMs ?? DEFAULT_STOP_TIMEOUT_MS; - let drainTimer: NodeJS.Timeout; - const drainResult = await Promise.race([ - Promise.allSettled(this.inFlightSteps.values()).then(() => { - clearTimeout(drainTimer); - - return 'drained' as const; - }), - new Promise<'timeout'>(resolve => { - drainTimer = setTimeout(() => resolve('timeout'), timeout); - }), - ]); - - if (drainResult === 'timeout') { - this.logger.error('Drain timeout — steps still in flight', { - remainingSteps: [...this.inFlightSteps.keys()], - timeoutMs: timeout, + try { + // Drain in-flight steps + if (this.inFlightSteps.size > 0) { + this.logger.info?.('Draining in-flight steps', { + count: this.inFlightSteps.size, + steps: [...this.inFlightSteps.keys()], }); - } else { - this.logger.info('All in-flight steps drained', {}); + + const timeout = this.config.stopTimeoutMs ?? DEFAULT_STOP_TIMEOUT_MS; + let drainTimer: NodeJS.Timeout | undefined; + const drainResult = await Promise.race([ + Promise.allSettled(this.inFlightSteps.values()).then(() => { + if (drainTimer) clearTimeout(drainTimer); + + return 'drained' as const; + }), + new Promise<'timeout'>(resolve => { + drainTimer = setTimeout(() => resolve('timeout'), timeout); + }), + ]); + + if (drainResult === 'timeout') { + this.logger.error('Drain timeout — steps still in flight', { + remainingSteps: [...this.inFlightSteps.keys()], + timeoutMs: timeout, + }); + } else { + this.logger.info?.('All in-flight steps drained', {}); + } } - } - // Close resources — log failures instead of silently swallowing - const results = await Promise.allSettled([ - this.config.aiClient.closeConnections(), - this.config.runStore.close(this.logger), - ]); + // Close resources — log failures instead of silently swallowing + const results = await Promise.allSettled([ + this.config.aiClient.closeConnections(), + this.config.runStore.close(this.logger), + ]); - for (const result of results) { - if (result.status === 'rejected') { - this.logger.error('Resource cleanup failed during shutdown', { - error: result.reason instanceof Error ? result.reason.message : String(result.reason), - }); + for (const result of results) { + if (result.status === 'rejected') { + this.logger.error('Resource cleanup failed during shutdown', { + error: result.reason instanceof Error ? result.reason.message : String(result.reason), + }); + } } + } finally { + this._state = 'stopped'; } - - this._state = 'stopped'; } async getRunStepExecutions(runId: string): Promise { @@ -144,8 +150,6 @@ export default class Runner { const execution = stepExecutions.find(e => e.stepIndex === stepIndex); const schema = execution ? patchBodySchemas[execution.type] : undefined; - // pendingData is typed as T | undefined; null is not expected (RunStore never persists null) - // but `== null` guards against both for safety. if (!execution || !schema || !('pendingData' in execution) || execution.pendingData == null) { throw new PendingDataNotFoundError(runId, stepIndex); } @@ -162,8 +166,6 @@ export default class Runner { ); } - // Cast is safe: the type guard above ensures `execution` is the correct union branch, - // and patchBodySchemas[execution.type] only accepts keys valid for that branch. await this.config.runStore.saveStepExecution(runId, { ...execution, pendingData: { ...(execution.pendingData as object), ...(parsed.data as object) }, @@ -240,15 +242,13 @@ export default class Runner { ); result = await executor.execute(); } catch (error) { - // This block should never execute: the factory and executor contracts guarantee no rejection. - // It guards against future regressions. this.logger.error('FATAL: executor contract violated — step outcome not reported', { runId: step.runId, stepId: step.stepId, error: error instanceof Error ? error.message : String(error), }); - return; // Cannot report an outcome: the orchestrator will timeout on this step + return; } finally { this.inFlightSteps.delete(key); } diff --git a/packages/workflow-executor/test/build-workflow-executor.test.ts b/packages/workflow-executor/test/build-workflow-executor.test.ts index 0bced8a9d1..3ab197677b 100644 --- a/packages/workflow-executor/test/build-workflow-executor.test.ts +++ b/packages/workflow-executor/test/build-workflow-executor.test.ts @@ -23,6 +23,7 @@ const BASE_OPTIONS = { envSecret: 'a'.repeat(64), authSecret: 'test-secret', agentUrl: 'http://localhost:3310', + httpPort: 3100, aiConfigurations: [ { name: 'default', provider: 'openai' as const, model: 'gpt-4o', apiKey: 'sk-test' }, ], From d5585406a306923bcec9a3ba377ea3675ce4b91a Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Fri, 27 Mar 2026 10:21:58 +0100 Subject: [PATCH 6/8] fix(workflow-executor): guard start() against draining state Prevents calling start() while stop() is draining, which would reinitialize the store and resume polling against closing resources. Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/workflow-executor/src/runner.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index 9d6096b130..470c55a3a5 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -59,7 +59,7 @@ export default class Runner { } async start(): Promise { - if (this._state === 'stopped') { + if (this._state === 'stopped' || this._state === 'draining') { throw new Error('Runner has been stopped and cannot be restarted'); } From 4d38d15599aba4dea01160e6423ea8da0cf9881c Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Fri, 27 Mar 2026 10:41:08 +0100 Subject: [PATCH 7/8] test(workflow-executor): add coverage tests for WorkflowExecutor lifecycle Test the composeur created by buildInMemoryExecutor: - start() calls runner.start + registers SIGTERM/SIGINT - stop() removes signals + calls runner.stop - Concurrent stop() calls share the same promise - HTTP server close failure doesn't prevent runner.stop - state getter returns runner state Co-Authored-By: Claude Opus 4.6 (1M context) --- .../test/build-workflow-executor.test.ts | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/packages/workflow-executor/test/build-workflow-executor.test.ts b/packages/workflow-executor/test/build-workflow-executor.test.ts index 3ab197677b..e89f9b0ef0 100644 --- a/packages/workflow-executor/test/build-workflow-executor.test.ts +++ b/packages/workflow-executor/test/build-workflow-executor.test.ts @@ -10,6 +10,7 @@ jest.mock('../src/stores/in-memory-store'); jest.mock('../src/stores/database-store'); jest.mock('../src/adapters/agent-client-agent-port'); jest.mock('../src/adapters/forest-server-workflow-port'); +jest.mock('../src/http/executor-http-server'); jest.mock('@forestadmin/ai-proxy', () => ({ AiClient: jest.fn(), })); @@ -209,3 +210,90 @@ describe('buildDatabaseExecutor', () => { ); }); }); + +// --------------------------------------------------------------------------- +// WorkflowExecutor composeur (start, stop, signal handlers, shutdown) +// --------------------------------------------------------------------------- + +describe('WorkflowExecutor lifecycle', () => { + let executor: ReturnType; + let onSpy: jest.SpyInstance; + let removeSpy: jest.SpyInstance; + + beforeEach(() => { + // Setup Runner mock to have start/stop/state + MockedRunner.prototype.start = jest.fn().mockResolvedValue(undefined); + MockedRunner.prototype.stop = jest.fn().mockResolvedValue(undefined); + Object.defineProperty(MockedRunner.prototype, 'state', { + get: () => 'running', + configurable: true, + }); + + onSpy = jest.spyOn(process, 'on'); + removeSpy = jest.spyOn(process, 'removeListener'); + + executor = buildInMemoryExecutor(BASE_OPTIONS); + }); + + afterEach(() => { + onSpy.mockRestore(); + removeSpy.mockRestore(); + }); + + it('start() calls runner.start and server.start', async () => { + await executor.start(); + + expect(MockedRunner.prototype.start).toHaveBeenCalled(); + }); + + it('start() registers SIGTERM and SIGINT handlers', async () => { + await executor.start(); + + expect(onSpy).toHaveBeenCalledWith('SIGTERM', expect.any(Function)); + expect(onSpy).toHaveBeenCalledWith('SIGINT', expect.any(Function)); + }); + + it('stop() removes signal handlers', async () => { + await executor.start(); + await executor.stop(); + + expect(removeSpy).toHaveBeenCalledWith('SIGTERM', expect.any(Function)); + expect(removeSpy).toHaveBeenCalledWith('SIGINT', expect.any(Function)); + }); + + it('stop() calls runner.stop', async () => { + await executor.start(); + await executor.stop(); + + expect(MockedRunner.prototype.stop).toHaveBeenCalled(); + }); + + it('concurrent stop() calls share the same promise', async () => { + await executor.start(); + + const [r1, r2] = await Promise.all([executor.stop(), executor.stop()]); + + expect(r1).toBeUndefined(); + expect(r2).toBeUndefined(); + // runner.stop is only called once (shared promise) + expect(MockedRunner.prototype.stop).toHaveBeenCalledTimes(1); + }); + + it('stop() continues if HTTP server close fails', async () => { + // eslint-disable-next-line @typescript-eslint/no-var-requires, global-require + const { default: MockedHttpServer } = require('../src/http/executor-http-server'); + MockedHttpServer.prototype.stop = jest.fn().mockRejectedValue(new Error('server error')); + MockedHttpServer.prototype.start = jest.fn().mockResolvedValue(undefined); + + const exec = buildInMemoryExecutor(BASE_OPTIONS); + await exec.start(); + await exec.stop(); + + // runner.stop still called despite server.stop failure + expect(MockedRunner.prototype.stop).toHaveBeenCalled(); + }); + + it('state getter returns runner state', () => { + expect(executor.state).toBe('running'); + }); +}); From ba45db975da348d421da6265d7f11b0b17b22626 Mon Sep 17 00:00:00 2001 From: Matt Date: Fri, 27 Mar 2026 11:15:53 +0100 Subject: [PATCH 8/8] test(workflow-executor): improve coverage for shutdown, stores, and logger MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Runner: test start() after stop() throws, resource cleanup failure log - DatabaseStore: test migration failure logging, close() error handling - build-run-store: test both factory functions (SQLite + in-memory) - ConsoleLogger: test info() method Coverage improvements: console-logger 67% → 100% build-run-store 33% → 100% database-store 85% → 96% runner 97% → 99% Co-Authored-By: Claude Opus 4.6 (1M context) --- .../test/adapters/console-logger.test.ts | 22 ++++++++++ .../workflow-executor/test/runner.test.ts | 23 ++++++++++ .../test/stores/build-run-store.test.ts | 42 +++++++++++++++++++ .../test/stores/database-store.test.ts | 30 +++++++++++++ 4 files changed, 117 insertions(+) create mode 100644 packages/workflow-executor/test/adapters/console-logger.test.ts create mode 100644 packages/workflow-executor/test/stores/build-run-store.test.ts diff --git a/packages/workflow-executor/test/adapters/console-logger.test.ts b/packages/workflow-executor/test/adapters/console-logger.test.ts new file mode 100644 index 0000000000..ba354fdf2f --- /dev/null +++ b/packages/workflow-executor/test/adapters/console-logger.test.ts @@ -0,0 +1,22 @@ +import ConsoleLogger from '../../src/adapters/console-logger'; + +describe('ConsoleLogger', () => { + let logger: ConsoleLogger; + + beforeEach(() => { + logger = new ConsoleLogger(); + }); + + it('info() writes to console.info as JSON', () => { + const spy = jest.spyOn(console, 'info').mockImplementation(); + + logger.info('test message', { key: 'value' }); + + expect(spy).toHaveBeenCalledTimes(1); + const output = JSON.parse(spy.mock.calls[0][0]); + expect(output).toMatchObject({ message: 'test message', key: 'value' }); + expect(output.timestamp).toBeDefined(); + + spy.mockRestore(); + }); +}); diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index 963f2890b0..fadb7c6efa 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -267,6 +267,29 @@ describe('graceful shutdown', () => { expect(runner.state).toBe('stopped'); }); + it('throws when start() is called after stop()', async () => { + runner = new Runner(createRunnerConfig()); + await runner.start(); + await runner.stop(); + + await expect(runner.start()).rejects.toThrow('Runner has been stopped and cannot be restarted'); + }); + + it('logs resource cleanup failure during stop', async () => { + const logger = createMockLogger(); + const aiClient = createMockAiClient(); + aiClient.closeConnections.mockRejectedValueOnce(new Error('connection leak')); + + runner = new Runner(createRunnerConfig({ logger, aiClient: aiClient as unknown as AiClient })); + await runner.start(); + await runner.stop(); + + expect(logger.error).toHaveBeenCalledWith( + 'Resource cleanup failed during shutdown', + expect.objectContaining({ error: 'connection leak' }), + ); + }); + it('state resets to idle on start failure', async () => { const config = createRunnerConfig(); (config.runStore.init as jest.Mock).mockRejectedValueOnce(new Error('init failed')); diff --git a/packages/workflow-executor/test/stores/build-run-store.test.ts b/packages/workflow-executor/test/stores/build-run-store.test.ts new file mode 100644 index 0000000000..7c0f821679 --- /dev/null +++ b/packages/workflow-executor/test/stores/build-run-store.test.ts @@ -0,0 +1,42 @@ +import { buildDatabaseRunStore, buildInMemoryRunStore } from '../../src/stores/build-run-store'; +import InMemoryStore from '../../src/stores/in-memory-store'; + +describe('buildInMemoryRunStore', () => { + it('returns an initialized InMemoryStore', async () => { + const store = await buildInMemoryRunStore(); + + expect(store).toBeInstanceOf(InMemoryStore); + // Verify it works + await store.saveStepExecution('run-1', { + type: 'condition', + stepIndex: 0, + executionParams: { answer: 'yes' }, + } as never); + const result = await store.getStepExecutions('run-1'); + expect(result).toHaveLength(1); + }); +}); + +describe('buildDatabaseRunStore', () => { + it('returns an initialized DatabaseStore (SQLite)', async () => { + const store = await buildDatabaseRunStore({ dialect: 'sqlite', storage: ':memory:' }); + + // Verify it works (migration ran) + await store.saveStepExecution('run-1', { + type: 'condition', + stepIndex: 0, + executionParams: { answer: 'yes' }, + } as never); + const result = await store.getStepExecutions('run-1'); + expect(result).toHaveLength(1); + + await store.close(); + }); + + it('closes sequelize connection if init fails', async () => { + // Invalid storage path to force a failure + await expect( + buildDatabaseRunStore({ dialect: 'sqlite', storage: '/nonexistent/path/db.sqlite' }), + ).rejects.toThrow(); + }); +}); diff --git a/packages/workflow-executor/test/stores/database-store.test.ts b/packages/workflow-executor/test/stores/database-store.test.ts index 21050e9125..4f60ea60ee 100644 --- a/packages/workflow-executor/test/stores/database-store.test.ts +++ b/packages/workflow-executor/test/stores/database-store.test.ts @@ -108,4 +108,34 @@ describe('DatabaseStore (SQLite)', () => { // Running init a second time should not fail await expect(store.init()).resolves.toBeUndefined(); }); + + it('logs and rethrows when migration fails', async () => { + const badSequelize = new Sequelize({ dialect: 'sqlite', storage: ':memory:', logging: false }); + const badStore = new DatabaseStore({ sequelize: badSequelize }); + + // Break the query interface so createTable fails + jest + .spyOn(badSequelize.getQueryInterface(), 'createTable') + .mockRejectedValueOnce(new Error('disk full')); + + const logger = { error: jest.fn() }; + await expect(badStore.init(logger)).rejects.toThrow('disk full'); + expect(logger.error).toHaveBeenCalledWith( + 'Database migration failed', + expect.objectContaining({ error: 'disk full' }), + ); + + await badSequelize.close(); + }); + + it('close() catches and logs errors instead of throwing', async () => { + const logger = { error: jest.fn() }; + jest.spyOn(sequelize, 'close').mockRejectedValueOnce(new Error('close failed')); + + await expect(store.close(logger)).resolves.toBeUndefined(); + expect(logger.error).toHaveBeenCalledWith( + 'Failed to close database connection', + expect.objectContaining({ error: 'close failed' }), + ); + }); });