v2.0.0
Major Changes
-
a248675: Major release — unified vocabulary, child flows, blocking
handle.result(),AbortSignalin steps, paginatedlistRuns, retention auto-pruning, payload caps, metrics, and a richer schema. No backwards-compatible aliases — see migration below.Breaking changes
Schema
A
drizzle-kit generate && drizzle-kit migrateis required.- Column rename
step_key/hook_key→cursor_keyacrosssteps,timers,events,signals. - Table rename
workflow.hooks→workflow.signals. - New columns on
runs:parent_run_id,parent_cursor_key,tags text[](GIN-indexed). - Run statuses:
waiting→awaiting_signal; newretryingstatus (split out fromsleeping). - Event types:
hook_armed/hook_resolved/hook_timeout→signal_armed/signal_delivered/signal_timeout. - Error codes:
WORKFLOW_HOOK_TIMEOUT→SIGNAL_TIMEOUT;HOOK_PAYLOAD_INVALID→SIGNAL_PAYLOAD_INVALID;WORKFLOW_SUSPEND_IN_STEP→STEP_INVALID_AWAIT;UNKNOWN_WORKFLOW→FLOW_UNKNOWN;CANCELED→RUN_CANCELED;NON_DETERMINISTIC→REPLAY_NON_DETERMINISTIC;INCOMPATIBLE_VERSION→REPLAY_INCOMPATIBLE_VERSION. New:INVOKE_DEPTH_EXCEEDED,INVOKE_FANOUT_EXCEEDED,SCHEMA_MISMATCH.
The Postgres schema name
workflowis unchanged.API
-
ctx.hook(name)→ctx.signal(name)(and builder.hook()→.signal()). -
engine.start()→engine.listen(). -
engine.defineWorkflow({ run })→engine.register({ ..., body })(or use the builder; both go throughengine.register). -
Step functions now receive a structured argument:
// before await ctx.step("fetch", () => httpGet(url)); // after await ctx.step("fetch", ({ input, signal, attempt }) => httpGet(url, { signal }));
-
engine.signal(runId, name, payload)now returnsSignalDeliveryResultinstead ofvoid:const result = await engine.signal(runId, "approve", { ok: true }); switch (result.kind) { case "delivered": break; // the run was awaiting; now resumes case "buffered": break; // signal arrived first; consumed on arm case "duplicate": break; // already accepted; idempotent case "expired": break; // timeout already fired — reject the webhook }
-
Type renames:
WorkflowContext→FlowContext,WorkflowHandle→FlowHandle,WorkflowError→FlowError,WorkflowErrorCode→FlowErrorCode,WORKFLOW_ERROR_CODES→FLOW_ERROR_CODES,WorkflowRuntimeError→FlowRuntimeError,workflowError→flowError,toWorkflowError→toFlowError,workflowSchema→flowSchema,applyWorkflowSchema→applyFlowSchema,dropWorkflowSchema→dropFlowSchema,HookOpts→SignalOpts,HookNode→SignalNode,WorkflowSuspend→FlowSuspend,RuntimeWorkflowContext→RuntimeFlowContext,DefineWorkflowOpts(runfield) →DefineFlowOpts(bodyfield),SignalResult→SignalDeliveryResult. -
Source layout:
runtime/graphile.ts→adapters/graphile/;tracing.ts→util/tracing.ts. Internal task identifierworkflow:run→flow:run.
New features
Child flows —
ctx.invokeconst order = engine.register(flow("order").step(...).build()); const ship = engine.register(flow("ship").step(...).build()); const fulfill = flow("fulfill") .step("validate", ({ input, signal }) => validate(input, { signal })) .step("place", async ({ input, ctx }) => { const placedOrder = await ctx.invoke(order, input); return ctx.invoke(ship, placedOrder); }) .build();
Child flows have their own
runId, attempts, and snapshot. The parent suspends until the child terminates. Cursor-keyed so resumes don't re-spawn the child.Blocking
handle.result()const { runId } = await handle.start({ userId: "u_1" }); const output = await handle.result(runId, { timeoutMs: 60_000 });
Backed by Postgres
LISTEN flow_terminalwith a row-poll fallback. No more pollinghandle.output()in your code.AbortSignalin step functionsWires the configured
timeoutMsANDengine.cancel(runId)to a singleAbortSignal. Pass it tofetch,pg,undici, OpenAI SDKs..step("call-llm", async ({ input, signal }) => { const res = await fetch(url, { signal, body: input }); return res.json(); }, { timeoutMs: 30_000 })
engine.cancel(runId)now aborts the in-flight controller AND guardsmarkCompleted/markFailedfrom overwriting the canceled tombstone.Run listing —
engine.listRunsconst page = await engine.listRuns({ name: "onboard", status: ["failed", "awaiting_signal"], tag: "tenant:acme", since: new Date(Date.now() - 24 * 60 * 60_000), limit: 50, });
Keyset pagination on
(createdAt, id). Composes with the newtagscolumn (GIN-indexed).await handle.start(input, { tags: [`tenant:${tenantId}`, "priority:high"] });
Retention auto-pruning
createEngine({ db, pool, retention: { eventsOlderThan: "30d", runsOlderThan: "90d", schedule: "0 * * * *", // default hourly }, });
Payload size caps
createEngine({ db, pool, limits: { maxInputBytes: 256 * 1024, maxStepResultBytes: 256 * 1024, maxSignalPayloadBytes: 64 * 1024, }, });
Oversized values throw before they hit the database.
Metrics
createEngine({ db, pool, metrics: { runStarted: ({ name }) => counters.runs_started.inc({ name }), stepFinished: ({ status, durationMs }) => histograms.step.observe({ status }, durationMs), signalDelivered: ({ kind }) => counters.signals.inc({ kind }), }, });
All methods are optional; methods you don't supply are no-ops. Available:
runStarted,runCompleted,runFailed,runSuspended,stepFinished,signalDelivered,reconcilerSweep.Operational helpers
const engine = createEngine({ db, pool, logger: consoleLogger() }); engine.attachShutdownSignals(); // SIGTERM/SIGINT → engine.stop() await engine.listen(); const health = await engine.health(); // { ok, db, worker, startedAt }
loggeris now optional (defaults to a noop logger).Cron — timezone, overlap, jitter
engine.defineCron({ name: "nightly-report", schedule: "0 2 * * *", timezone: "America/Los_Angeles", overlap: "skip", // default — prevents concurrent runs via PG advisory lock jitterMs: 60_000, run: async () => generateReport(), });
Hard ceilings
createEngine({ db, pool, maxRunAttempts: 100, // hard ceiling — stops poison-pill loops defaultStepTimeoutMs: 30 * 60_000, // fallback when StepOpts.timeoutMs is not set });
Exhausted runs fail with
RUN_ATTEMPTS_EXHAUSTED.Schema fingerprint at boot
The engine reads
information_schemafor marker columns on firstlisten()/ firsthandle.start()and throwsSCHEMA_MISMATCHif the schema is at the wrong version. The error message tells you exactly which migration to run.// If the schema is at v1 (or not applied): // Error: SCHEMA_MISMATCH: schema is at v1, engine expects v2 — run `drizzle-kit generate && drizzle-kit migrate`
Eliminates the rolling-deploy class of "engine code expects v2 schema, DB is still v1, runs silently fail" failures.
Hard caps on
ctx.invokelimits.maxInvokeDepth(default10) andlimits.maxChildrenPerRun(default1000) stop accidental infinite recursion and runaway fan-out:createEngine({ db, pool, limits: { maxInvokeDepth: 10, // root = 1; throws INVOKE_DEPTH_EXCEEDED if exceeded maxChildrenPerRun: 1000, // throws INVOKE_FANOUT_EXCEEDED if exceeded }, });
Boot-time validators
createEnginenow fails fast on operator misconfiguration:logger— missingdebug/info/warn/errorthrows on construction.retention.runsOlderThan/eventsOlderThan— invalid durations throw on construction instead of failing at the first cron tick.pool.options.maxvsconcurrency— whenconcurrency > pool.max, the engine emitslogger.warn("flow.config.pool_too_small", { concurrency, poolMax }).defineCron({ schedule })— invalid cron patterns throw at registration time, not atlisten().
Bundle size budget
npm run size:checksums the gzipped sizes ofdist/*.jsand fails CI if the total exceeds the configured budget (default320 kB, override viaSIZE_BUDGET_KB). Current footprint is ~22 kB gzipped, so the budget is roomy on purpose — it's a regression guard, not a limit.Resilient LISTEN reconnect
The Postgres
LISTENsubscription that powershandle.result()/handle.wait()now reconnects on its own. Previously a single connection error would permanently degradehandle.result()to a row-poll fallback until the engine was restarted.- State machine:
idle → connecting → listening → reconnecting → stopped. - Multi-channel: subscribes to
flow_terminalANDflow_progressover a single connection. - Exponential backoff
1s → 30s(capped), with jitter. - Single in-flight loop guarded by an
AbortController; cancelled cleanly onengine.stop(). engine.health()reportslisten: booleanso probes can distinguish "engine up, LISTEN down" from "engine fully healthy".- Verified by an integration test that calls
pg_terminate_backend()on the LISTEN backend and checks that a freshpg_notifyround-trip still wakes its waiter. - Multi-instance coverage: a dedicated test suite spins up two engines against the same Postgres and verifies cross-instance
handle.result(),handle.wait(),engine.signal(), andengine.cancel()all fan out correctly viaLISTEN/pg_notify.
const h = await engine.health(); // { ok, db, worker, listen, startedAt }
Defensive callback wrappers
loggerandmetricsmethods you pass intocreateEngineare now wrapped at construction so a throwing method can never crash the engine.- A throwing
logger.<method>is suppressed for the rest of the engine's lifetime and surfaced once onprocess.stderr. - A throwing
metrics.<method>is suppressed for the rest of the engine's lifetime and surfaced once vialogger.warn("flow.metrics.threw", { method, message }).
Delivery-time signal payload validation
When you declare a schema on a builder-level
.signal(name, { schema }), the engine now validates incoming payloads atengine.signal(runId, name, payload)time — before they hit the database. Returns a new{ kind: "invalid_payload", issues }variant inSignalDeliveryResult:const result = await engine.signal(runId, "approve", payload); switch (result.kind) { case "delivered": case "buffered": case "duplicate": case "expired": break; case "invalid_payload": return res.status(400).json({ issues: result.issues }); }
Builders reject the same signal name declared with two different schemas at
.build()time.The existing replay-time validation via
ctx.signal(name, { schema })still applies and stays the source of truth for inlinedefineFlow({ body })users (who have no static node tree to scrape).Pre-built SQL migrations
The published package now ships a vetted, reviewable migration file at
node_modules/iterativeflow/migrations/0000_init.sql. Three apply paths:# 1. psql, no drizzle-kit required psql "$DATABASE_URL" -f node_modules/iterativeflow/migrations/0000_init.sql # 2. drizzle-kit, when you want migration tracking npx drizzle-kit generate && npx drizzle-kit migrate # 3. Programmatic, no SQL file or migration tooling await applyFlowSchema(db);
Statements are post-processed with
IF NOT EXISTS(and aDO $$ … END $$guard for foreign keys), so re-applying is a no-op.handle.wait— block until a specific in-flow eventGeneric blocking wait on a step finishing or a signal being delivered, distinct from
handle.result()(which only fires on terminal). Backed by a newLISTEN flow_progressPostgres channel, with aloadStep/loadSignalsubscribe-then-check race-free pattern.const { runId } = await handle.start({ orderId }); // Wait for the "validate" step to memoize (ok or failed_terminal) await handle.wait(runId, { until: { step: "validate" }, timeoutMs: 30_000 }); // Wait for the "approve" signal to be delivered await handle.wait(runId, { until: { signal: "approve" }, timeoutMs: 60_000 });
{ step: name }matches the first-occurrence cursor key (exactname).{ signal: name }matches the canonical signal cursor keysignal:<name>.timeoutMsrejects with ahandle.wait timed outerror. Does NOT auto-reject on terminal — callers who want either-or compose viaPromise.race(handle.result(...), handle.wait(...)).Public API contract via api-extractor
The package now tracks its published
.d.tssurface inetc/iterativeflow.api.md.npm run api:checkfails CI on any unintended addition / removal / rename. Update the baseline intentionally withnpm run api:updatewhen shipping breaking changes.Improvements
- Builder is fully immutable — every chain call returns a new
FlowBuilder; branches don't share state. .version(N)rejects non-positive integers and regressions.- Replay compatibility covers loop bodies — rename / kind-change inside a loop body now fires
REPLAY_INCOMPATIBLE_VERSION. Previously loops silently bypassed the compat check. engine.cancel(runId)aborts the in-flightAbortSignal(was tombstone-only).- Atomic
claimRuncloses the prior race betweenmarkRunningandloadSnapshot. engine.signalafter hook timeout returns{ kind: "expired" }instead of silently delivering.- Idempotency scoped to
(name, version, idempotencyKey)— multi-version flows no longer cross-dedupe. - Cron handlers rethrow on failure so graphile-worker retries (was silent-swallow).
toMs/toFireAtreject negative durations. Pass a pastDatefor "fire immediately" semantics.recordEventno longer silent-swallows DB errors.- Reconciler scans with
ORDER BY updated_at, idfor deterministic progress; partial indexes cover the new statuses. engine.stop()is graceful + idempotent. Drains in-flight tasks via graphile-worker's stop semantics.Promise.racetimeout retained alongside the newAbortControllerso step functions that ignore the signal still get a hard timeout error.
Bug fixes
- Numeric signal/hook names (
hook("42")) no longer mis-classify replay drift as count-change. - Cross-kind drift (e.g. a step named
"sleep"switched to actx.sleep()) is detected asREPLAY_INCOMPATIBLE_VERSION; previously could silently lose the step result on replay. baseOfcorrectly rejects:0and leading-zero suffixes the cursor never emits.- Builder
.step()after a fork no longer mutates the parent builder. - Loop bodies' dynamic occurrence count is no longer mis-flagged as drift.
- Tracer identifier (was
@aws-vod/workflowfrom a predecessor project) corrected toiterativeflow.
Setup
// drizzle.config.ts import { defineConfig } from "drizzle-kit"; import { createRequire } from "node:module"; const require = createRequire(import.meta.url); export default defineConfig({ dialect: "postgresql", schema: [require.resolve("iterativeflow/schema")], out: "./drizzle", dbCredentials: { url: process.env.DATABASE_URL! }, });
npx drizzle-kit generate && npx drizzle-kit migrateCompatibility
- Node
>=20. CI tests Node 20 + 22. - Peers:
drizzle-orm >=0.45,graphile-worker >=0.16,pg >=8.10.
- Column rename