Skip to content

grafikui/saga_engine

Repository files navigation

Saga Engine

Crash-resilient saga executor for Node.js. Postgres-backed. Best-effort compensation. Hard 15-minute limit. No magic.

npm version License: MIT


What Saga Engine Is

A library for multi-step workflows with automatic rollback. When step 3 fails, steps 2 and 1 are compensated in reverse order. State survives process crashes via Postgres.

Use cases:

  • Order fulfillment: inventory → payment → shipping (rollback on failure)
  • Multi-system updates: CRM → billing → email (compensate on partial failure)
  • Legacy integrations: SOAP APIs without idempotency (track what succeeded)

What Saga Engine Is NOT

  • A workflow orchestration platform (use Temporal)
  • A job queue (use BullMQ)
  • A scheduler (use node-cron)
  • A distributed transaction coordinator
  • An exactly-once delivery system

If you need workflows longer than 15 minutes, use Temporal. We explicitly refuse to support them.


Installation

npm install saga-engine

Quick Start

import { Transaction, PostgresStorage, PostgresLock } from 'saga-engine';
import { Pool } from 'pg';

const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const storage = new PostgresStorage(pool);
const lock = new PostgresLock(pool);

const tx = new Transaction('order-123', storage, {
  idempotencyKey: 'order-abc-fulfillment',  // REQUIRED
  lock,
  input: { orderId: 'abc', amount: 9999 },
});

await tx.run(async (t) => {
  // Step 1: Reserve inventory
  const reservation = await t.step('reserve-inventory', {
    idempotencyKey: 'order-abc-reserve',  // REQUIRED
    execute: () => inventory.reserve(items),
    compensate: (res) => inventory.release(res.id),
  });

  // Step 2: Charge payment
  const charge = await t.step('charge-payment', {
    idempotencyKey: 'order-abc-charge',
    execute: () => stripe.charge(amount, {
      idempotency_key: 'order-abc-charge'  // Pass to external API too!
    }),
    compensate: (ch) => stripe.refund(ch.id, {
      idempotency_key: 'order-abc-refund'
    }),
  });

  // Step 3: Create shipment
  await t.step('create-shipment', {
    idempotencyKey: 'order-abc-ship',
    execute: () => shipping.create(address),
    compensate: (shipment) => shipping.cancel(shipment.id),
  });

  return { reservation, charge };
});

If create-shipment fails:

  1. charge-payment compensation runs (refund)
  2. reserve-inventory compensation runs (release)
  3. Original error is thrown

Warning: External Idempotency Is Your Responsibility

Saga Engine persists step results after execution. If your process crashes after an external API call but before Saga Engine writes to Postgres, the step will re-execute on resumption.

Your execute functions must pass idempotency keys to external providers:

// WRONG: May charge twice on crash recovery
execute: () => stripe.charge(amount),

// RIGHT: External provider deduplicates the call
execute: () => stripe.charge(amount, {
  idempotency_key: 'order-abc-charge'
}),

Saga Engine enforces that you provide idempotency keys. It cannot enforce that your external calls use them.


Hard Guarantees

Guarantee Enforcement
Step persistence before proceeding Storage interface
Compensation triggered on failure Transaction.run()
Resumption skips completed steps Step execution logic
Idempotency required at Transaction AND Step level Runtime validation (throws IdempotencyRequiredError)
Locking prevents concurrent execution Postgres advisory locks
Maximum execution time: 15 minutes Wall-clock check before each step (throws ExecutionTimeoutError)

Explicit Refusals

What We Don't Do Why
Guarantee compensation success compensate() is best-effort. Failures → dead_letter state
External consistency If you call Stripe and crash before persisting, Stripe was charged. Use their idempotency keys.
Distributed transactions Single-process, single-database only
Long-running workflows 15-minute hard limit. Use Temporal for hours/days.
Auto-recovery from dead_letter Terminal state. Manual CLI intervention required.

Infrastructure Notes

Postgres Lock Safety

Saga Engine uses session-level Postgres advisory locks to prevent concurrent execution of the same workflow.

Connection Setup Compatible
Direct pg.Pool connection Yes
PgBouncer in session mode Yes
PgBouncer in transaction mode No — lock ownership is lost between queries
Supabase Pooler (transaction mode) No — use the direct connection string

If your Node process is killed (SIGKILL), Postgres automatically releases the advisory lock when the TCP connection drops. No zombie locks.


Database Setup

Run this schema in your Postgres database:

CREATE TABLE IF NOT EXISTS transactions (
  id VARCHAR(255) PRIMARY KEY,
  status VARCHAR(50) NOT NULL DEFAULT 'pending',
  step_stack JSONB NOT NULL DEFAULT '[]',
  input JSONB NOT NULL DEFAULT '{}',
  retry_count INT NOT NULL DEFAULT 0,
  error JSONB,
  created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
  updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX IF NOT EXISTS idx_transactions_status ON transactions(status);
CREATE INDEX IF NOT EXISTS idx_transactions_created_at ON transactions(created_at);
CREATE INDEX IF NOT EXISTS idx_transactions_error ON transactions USING GIN (error);

Workflow States

                     ┌───────────┐
                     │  pending  │
                     └─────┬─────┘
                           │
             ┌─────────────┼─────────────┐
             │             │             │
             ▼             ▼             ▼
      ┌───────────┐ ┌───────────┐ ┌─────────────┐
      │ completed │ │  failed   │ │ dead_letter │
      └───────────┘ └───────────┘ └──────┬──────┘
                                         │
                                         │ saga-admin retry
                                         ▼
                                  ┌───────────┐
                                  │  pending  │
                                  └───────────┘
Status Meaning Action
pending In progress or resumable Will continue on next run
completed All steps succeeded Terminal
failed Step failed, all compensations succeeded Safe to retry with new workflow
dead_letter Compensation failed or 15-min timeout Manual intervention required via saga-admin retry

How dead_letter is reached:

  • A compensate() function fails after exhausting its retry policy
  • The 15-minute wall-clock limit is exceeded (checked before each step and on resumption)

CLI Administration

# Retry a dead_letter workflow (atomic, race-safe)
DATABASE_URL=postgres://... npx saga-admin retry order-123

# Override 10-retry limit
DATABASE_URL=postgres://... npx saga-admin retry --force order-123

# List workflows by status
DATABASE_URL=postgres://... npx saga-admin list dead_letter

# Get workflow statistics
DATABASE_URL=postgres://... npx saga-admin stats

Retry Policies

await t.step('flaky-api', {
  idempotencyKey: 'order-abc-api',
  execute: () => legacyApi.call(),
  compensate: () => legacyApi.rollback(),
  retry: {
    attempts: 3,
    backoffMs: 1000,
  },
});

Step Timeouts

await t.step('slow-operation', {
  idempotencyKey: 'order-abc-slow',
  execute: () => slowService.process(),
  compensate: () => slowService.cancel(),
  timeout: 30000,  // 30 second timeout per attempt
});

Compensation Policies

await t.step('critical-step', {
  idempotencyKey: 'order-abc-critical',
  execute: () => criticalService.do(),
  compensate: () => criticalService.undo(),
  compensationPolicy: {
    retry: { attempts: 5, backoffMs: 2000 },
    timeout: 60000,
  },
});

Observability Events

import { Transaction, TransactionEvents } from 'saga-engine';

const events: TransactionEvents = {
  onTransactionStart: (id, input) => logger.info(`Started: ${id}`),
  onTransactionComplete: (id) => logger.info(`Completed: ${id}`),
  onTransactionFailed: (id, error) => logger.error(`Failed: ${id}`, error),
  onStepComplete: (name, result, durationMs) => metrics.record(name, durationMs),
  onCompensationStart: (name) => logger.warn(`Compensating: ${name}`),
  onCompensationFailed: (name, error) => alerting.page(`Compensation failed: ${name}`),
  onDeadLetter: (id, error) => alerting.critical(`Dead letter: ${id}`),
};

const tx = new Transaction('order-123', storage, {
  idempotencyKey: 'order-abc',
  lock,
  events,
});

Available Events

Event When
onTransactionStart Transaction begins
onTransactionComplete All steps succeeded
onTransactionFailed Failed (with or without compensation)
onStepStart Step execution begins
onStepComplete Step succeeded
onStepFailed Step failed (before retry)
onStepRetry Step retrying
onStepSkipped Step skipped (already completed)
onStepTimeout Step exceeded timeout
onCompensationStart Compensation begins
onCompensationComplete Compensation succeeded
onCompensationFailed Compensation failed
onDeadLetter Workflow entered dead_letter state

Error Types

import {
  ExecutionTimeoutError,
  IdempotencyRequiredError,
  CompensationFailedError,
  DeadLetterError,
} from 'saga-engine';

try {
  await tx.run(workflow);
} catch (error) {
  if (error instanceof ExecutionTimeoutError) {
    // Workflow exceeded 15-minute limit
    console.log(`Timed out after ${error.elapsedMs}ms`);
  }
  if (error instanceof IdempotencyRequiredError) {
    // Missing idempotency key
    console.log(`Missing key for ${error.level}: ${error.identifier}`);
  }
  if (error instanceof CompensationFailedError) {
    // Compensation threw during rollback
    console.log(`Step ${error.failedStep} compensation failed`);
    console.log(`Original error: ${error.originalError.message}`);
    console.log(`Compensation error: ${error.compensationError.message}`);
  }
}

Testing

Test utilities are available via a separate import:

import { MemoryStorage, MockLock, createEventSpy } from 'saga-engine/testing';

describe('Order Workflow', () => {
  it('compensates on failure', async () => {
    const storage = new MemoryStorage();
    const lock = new MockLock();
    const eventSpy = createEventSpy();

    const tx = new Transaction('test-order', storage, {
      idempotencyKey: 'test-order-key',
      lock,
      events: eventSpy.events,
    });

    await expect(tx.run(async (t) => {
      await t.step('step-1', {
        idempotencyKey: 's1',
        execute: () => 'result',
        compensate: () => { /* called on failure */ },
      });
      throw new Error('Trigger compensation');
    })).rejects.toThrow('Trigger compensation');

    expect(eventSpy.wasCalled('onCompensationComplete')).toBe(true);
  });
});

Querying Workflows

// Read-only queries for observability
const deadLetters = await storage.query({
  status: 'dead_letter',
  limit: 100,
});

const recent = await storage.query({
  status: 'failed',
  createdAfter: new Date(Date.now() - 24 * 60 * 60 * 1000),
});

// Get specific workflow
const workflow = await storage.getWorkflow('order-123');
console.log(workflow?.status);  // 'completed' | 'failed' | 'dead_letter' | 'pending'
console.log(workflow?.error);   // { stepName, error, compensationError?, timestamp }

Failure Modes

See docs/FAILURE_MODES.md for detailed documentation of:

  • What we guarantee
  • What we explicitly refuse to handle
  • Recovery procedures for each failure mode

When to Use Something Else

Scenario Use Instead
Workflows > 15 minutes Temporal
Need job queues BullMQ
Need cron scheduling node-cron
Need distributed coordination Temporal
Need managed platform Inngest
Need exactly-once external delivery Outbox pattern + Debezium

License

MIT

About

Crash-resilient saga orchestration engine for Node.js. Typescript implementation with PostgreSQL-backed execution

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors