Skip to content
7 changes: 6 additions & 1 deletion packages/workflow-executor/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Front ◀──▶ Orchestrator ◀──pull/push──▶ Executor ──
```
src/
├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError, NoWritableFieldsError, NoActionsError, StepPersistenceError, NoRelationshipFieldsError, RelatedRecordNotFoundError
├── runner.ts # Runner class — main entry point (start/stop/triggerPoll, HTTP server wiring)
├── runner.ts # Runner class — main entry point (start/stop/triggerPoll, HTTP server wiring, graceful drain)
├── types/ # Core type definitions (@draft)
│ ├── step-definition.ts # StepType enum + step definition interfaces
│ ├── step-outcome.ts # Step outcome tracking types (StepOutcome, sent to orchestrator)
Expand All @@ -54,6 +54,10 @@ src/
│ ├── agent-port.ts # Interface to the Forest Admin agent (datasource)
│ ├── workflow-port.ts # Interface to the orchestrator
│ └── run-store.ts # Interface for persisting run state (scoped to a run)
├── stores/ # RunStore implementations
│ ├── in-memory-store.ts # InMemoryStore — Map-based, for tests
│ ├── database-store.ts # DatabaseStore — Sequelize + umzug migrations
│ └── build-run-store.ts # Factory functions: buildDatabaseRunStore, buildInMemoryRunStore
├── adapters/ # Port implementations
│ ├── agent-client-agent-port.ts # AgentPort via @forestadmin/agent-client
│ └── forest-server-workflow-port.ts # WorkflowPort via HTTP (forestadmin-client ServerUtils)
Expand Down Expand Up @@ -83,6 +87,7 @@ src/
- **displayName in AI tools** — All `DynamicStructuredTool` schemas and system message prompts must use `displayName`, never `fieldName`. `displayName` is a Forest Admin frontend feature that replaces the technical field/relation/action name with a product-oriented label configured by the Forest Admin admin. End users write their workflow prompts using these display names, not the underlying technical names. After an AI tool call returns display names, map them back to `fieldName`/`name` before using them in datasource operations (e.g. filtering record values, calling `getRecord`).
- **No recovery/retry** — Once the executor returns a step result to the orchestrator, the step is considered executed. There is no mechanism to re-dispatch a step, so executors must NOT include recovery checks (e.g. checking the RunStore for cached results before executing). Each step executes exactly once.
- **Fetched steps must be executed** — Any step retrieved from the orchestrator via `getPendingStepExecutions()` must be executed. Silently discarding a fetched step (e.g. filtering it out by `runId` after fetching) violates the executor contract: the orchestrator assumes execution is guaranteed once the step is dispatched. The only valid filter before executing is deduplication via `inFlightSteps` (to avoid running the same step twice concurrently).
- **Graceful shutdown** — `stop()` drains in-flight steps before closing resources. The `state` getter exposes the lifecycle: `idle → running → draining → stopped`. `stopTimeoutMs` (default 30s) prevents `stop()` from hanging forever if a step is stuck. The HTTP server stays up during drain so the frontend can still query run status. Signal handling (`SIGTERM`/`SIGINT`) is the consumer's responsibility — the Runner is a library class.

## Commands

Expand Down
4 changes: 4 additions & 0 deletions packages/workflow-executor/src/adapters/console-logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@ export default class ConsoleLogger implements Logger {
error(message: string, context: Record<string, unknown>): void {
console.error(JSON.stringify({ message, timestamp: new Date().toISOString(), ...context }));
}

info(message: string, context: Record<string, unknown>): void {
console.info(JSON.stringify({ message, timestamp: new Date().toISOString(), ...context }));
}
}
108 changes: 101 additions & 7 deletions packages/workflow-executor/src/build-workflow-executor.ts
Original file line number Diff line number Diff line change
@@ -1,41 +1,48 @@
import type { Logger } from './ports/logger-port';
import type { RunnerState } from './runner';
import type { AiConfiguration } from '@forestadmin/ai-proxy';
import type { Options as SequelizeOptions } from 'sequelize';

import { AiClient } from '@forestadmin/ai-proxy';
import { Sequelize } from 'sequelize';

import AgentClientAgentPort from './adapters/agent-client-agent-port';
import ConsoleLogger from './adapters/console-logger';
import ForestServerWorkflowPort from './adapters/forest-server-workflow-port';
import ExecutorHttpServer from './http/executor-http-server';
import Runner from './runner';
import SchemaCache from './schema-cache';
import DatabaseStore from './stores/database-store';
import InMemoryStore from './stores/in-memory-store';

const DEFAULT_FOREST_SERVER_URL = 'https://api.forestadmin.com';
const DEFAULT_POLLING_INTERVAL_MS = 5000;
const FORCE_EXIT_DELAY_MS = 5000;

export interface WorkflowExecutor {
start(): Promise<void>;
stop(): Promise<void>;
readonly state: RunnerState;
}

export interface ExecutorOptions {
envSecret: string;
authSecret: string;
agentUrl: string;
httpPort: number;
forestServerUrl?: string;
aiConfigurations: AiConfiguration[];
pollingIntervalMs?: number;
httpPort?: number;
logger?: Logger;
stopTimeoutMs?: number;
}

export type DatabaseExecutorOptions = ExecutorOptions &
({ database: SequelizeOptions & { uri: string } } | { database: SequelizeOptions });

function buildCommonDependencies(options: ExecutorOptions) {
const forestServerUrl = options.forestServerUrl ?? DEFAULT_FOREST_SERVER_URL;
const logger = options.logger ?? new ConsoleLogger();

const workflowPort = new ForestServerWorkflowPort({
envSecret: options.envSecret,
Expand All @@ -59,27 +66,114 @@ function buildCommonDependencies(options: ExecutorOptions) {
schemaCache,
workflowPort,
aiClient,
logger,
pollingIntervalMs: options.pollingIntervalMs ?? DEFAULT_POLLING_INTERVAL_MS,
envSecret: options.envSecret,
authSecret: options.authSecret,
httpPort: options.httpPort,
logger: options.logger,
stopTimeoutMs: options.stopTimeoutMs,
};
}

function createWorkflowExecutor(
runner: Runner,
server: ExecutorHttpServer,
logger: Logger,
): WorkflowExecutor {
let shutdownPromise: Promise<void> | null = null;

const shutdown = async () => {
try {
await server.stop();
} catch (err) {
logger.error('HTTP server close failed during shutdown', {
error: err instanceof Error ? err.message : String(err),
});
}

await runner.stop();
};

const onSignal = async () => {
logger.info?.('Received shutdown signal, stopping gracefully...', {});

try {
if (!shutdownPromise) shutdownPromise = shutdown();
await shutdownPromise;
process.exitCode = 0;
} catch (error) {
logger.error('Graceful shutdown failed', {
error: error instanceof Error ? error.message : String(error),
});
process.exitCode = 1;
}

// Safety net: force exit if the event loop doesn't drain
// eslint-disable-next-line no-console
setTimeout(() => {
logger.error('Process did not exit after shutdown — forcing exit', {});
process.exit(process.exitCode ?? 1);
}, FORCE_EXIT_DELAY_MS).unref();
};

return {
get state() {
return runner.state;
},

async start() {
await runner.start();
await server.start();

process.on('SIGTERM', onSignal);
process.on('SIGINT', onSignal);
},

async stop() {
process.removeListener('SIGTERM', onSignal);
process.removeListener('SIGINT', onSignal);

if (!shutdownPromise) shutdownPromise = shutdown();
await shutdownPromise;
},
};
}

export function buildInMemoryExecutor(options: ExecutorOptions): WorkflowExecutor {
return new Runner({
...buildCommonDependencies(options),
const deps = buildCommonDependencies(options);

const runner = new Runner({
...deps,
runStore: new InMemoryStore(),
});

const server = new ExecutorHttpServer({
port: options.httpPort,
runner,
authSecret: options.authSecret,
workflowPort: deps.workflowPort,
logger: deps.logger,
});

return createWorkflowExecutor(runner, server, deps.logger);
}

export function buildDatabaseExecutor(options: DatabaseExecutorOptions): WorkflowExecutor {
const deps = buildCommonDependencies(options);
const { uri, ...sequelizeOptions } = options.database as SequelizeOptions & { uri?: string };
const sequelize = uri ? new Sequelize(uri, sequelizeOptions) : new Sequelize(sequelizeOptions);

return new Runner({
...buildCommonDependencies(options),
const runner = new Runner({
...deps,
runStore: new DatabaseStore({ sequelize }),
});

const server = new ExecutorHttpServer({
port: options.httpPort,
runner,
authSecret: options.authSecret,
workflowPort: deps.workflowPort,
logger: deps.logger,
});

return createWorkflowExecutor(runner, server, deps.logger);
}
13 changes: 13 additions & 0 deletions packages/workflow-executor/src/http/executor-http-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ export default class ExecutorHttpServer {
}
});

// Health endpoint — before JWT so it's publicly accessible (infra probes don't send tokens)
this.app.use(async (ctx, next) => {
if (ctx.method === 'GET' && ctx.path === '/health') {
const { state } = this.options.runner;
ctx.status = state === 'running' || state === 'draining' ? 200 : 503;
ctx.body = { state };

return;
}

await next();
});

this.app.use(bodyParser());

// JWT middleware — validates Bearer token using authSecret
Expand Down
2 changes: 1 addition & 1 deletion packages/workflow-executor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ export { default as ForestServerWorkflowPort } from './adapters/forest-server-wo
export { default as ExecutorHttpServer } from './http/executor-http-server';
export type { ExecutorHttpServerOptions } from './http/executor-http-server';
export { default as Runner } from './runner';
export type { RunnerConfig } from './runner';
export type { RunnerConfig, RunnerState } from './runner';
export { default as validateSecrets } from './validate-secrets';
export { default as SchemaCache } from './schema-cache';
export { default as InMemoryStore } from './stores/in-memory-store';
Expand Down
1 change: 1 addition & 0 deletions packages/workflow-executor/src/ports/logger-port.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export interface Logger {
error(message: string, context: Record<string, unknown>): void;
info?(message: string, context: Record<string, unknown>): void;
}
Loading
Loading