Skip to content

Postgres Adapter

SametGoktepe edited this page Jun 17, 2026 · 1 revision

PostgreSQL 12+ store, migration SQL, and three relay flavors (polling, notify-waker, streaming). This page covers configuration, the schema, the claim query semantics, and the trade-offs between the relay variants.


Install

npm i @eventferry/postgres pg
# Optional, only for PostgresStreamingRelay:
npm i pg-logical-replication

pg and pg-logical-replication are optional peer dependencies — you install what you actually use.


Migration

import { createMigrationSql } from "@eventferry/postgres";

await pool.query(createMigrationSql("outbox"));

createMigrationSql(table) returns a single SQL string covering the table + indexes. Run it in your migration tool (Prisma, Drizzle, Kysely, plain pg, whatever).

What the table looks like

CREATE TABLE outbox (
  id              BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
  message_id      TEXT NOT NULL,                  -- producer-side idempotency key
  topic           TEXT NOT NULL,
  aggregate_type  TEXT NOT NULL,
  aggregate_id    TEXT NOT NULL,
  partition_key   TEXT,                           -- optional explicit partition key
  payload         JSONB NOT NULL,
  headers         JSONB NOT NULL DEFAULT '{}'::jsonb,
  trace_id        TEXT,
  status          SMALLINT NOT NULL DEFAULT 0,    -- 0=pending 1=processing 2=done 3=failed 4=dead
  attempts        INT NOT NULL DEFAULT 0,
  claimed_at      TIMESTAMPTZ,
  next_retry_at   TIMESTAMPTZ,
  created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
  processed_at    TIMESTAMPTZ
);

Indexes drop in alongside the table:

  • idx_outbox_claim — partial index over status IN (0,3) ordered by (aggregate_id, id). The claim query rides this.
  • idx_outbox_aggregate_head(aggregate_id, id) for the head-of-aggregate filter.
  • idx_outbox_done_processed_at — partial over status = 2, used by purgeDone.

The table name is yours to pick. Multiple outboxes per database (e.g. outbox_orders, outbox_users) are fine — each gets its own PostgresStore instance.


PostgresStore

import { PostgresStore } from "@eventferry/postgres";

const store = new PostgresStore({
  pool,                        // pg.Pool
  table: "outbox",             // matches createMigrationSql arg
  claimTimeoutMs: 60_000,      // default — see below
  claimFailedOnly: false,      // default — claim pending OR due-failed
});
Option Default What it does
pool required pg's Pool (or anything pool-shaped). The store calls pool.connect for the reaper window and pool.query for everything else.
table "outbox" The table name passed to createMigrationSql. Quoted automatically.
claimTimeoutMs 60_000 Rows whose claimed_at is older than this are eligible for reclaim. Tune above your slowest typical publish; otherwise a slow batch gets reclaimed mid-flight.
claimFailedOnly false If true, the claim query only matches status=3 AND next_retry_at <= now(). Useful for a dedicated "retry-only" relay instance.

Enqueue inside your transaction

const client = await pool.connect();
try {
  await client.query("BEGIN");

  // ...business changes...

  await store.enqueue(client, {
    topic: "orders.created",
    aggregateType: "order",
    aggregateId: order.id,
    payload: { orderId: order.id, total: order.total },
    headers: { "x-tenant": tenantId },         // optional
    key: order.customerId,                      // optional, becomes Kafka message key
    messageId: crypto.randomUUID(),             // optional, generated if absent
    traceId: getCurrentTraceId(),               // optional
  });

  await client.query("COMMIT");
} finally {
  client.release();
}

enqueue takes the same client that owns your business transaction. The outbox row commits or rolls back with it — no separate transaction, no risk of one side surviving the other.


The claim query

The heart of strict head-of-aggregate ordering. Roughly:

SELECT id, ...
FROM outbox o
WHERE
  -- Eligible: pending, or failed-and-due
  (o.status = 0 OR (o.status = 3 AND o.next_retry_at <= now()))
  -- Reaper: pick up rows whose claim expired
  AND (o.claimed_at IS NULL OR o.claimed_at < now() - interval '60 seconds')
  -- Strict head-of-aggregate: skip if an earlier row for the same aggregate
  -- is still in flight
  AND NOT EXISTS (
    SELECT 1 FROM outbox e
    WHERE e.aggregate_id = o.aggregate_id
      AND e.id < o.id
      AND e.status IN (1, 3)
  )
ORDER BY o.aggregate_id, o.id
LIMIT $1
FOR UPDATE SKIP LOCKED;

UPDATE outbox SET status = 1, claimed_at = now()
WHERE id = ANY($claimed_ids);

Things to notice:

  • SKIP LOCKED means concurrent relays never block each other and never see the same row twice.
  • The NOT EXISTS clause is what enforces per-aggregate ordering. If aggregate A's event #1 is processing, event #2 stays unclaimable until #1 lands in done (or is reaped).
  • The reaper window is server-side (now() - interval '60 seconds') — application-clock skew between the relay process and the DB host is irrelevant.
  • claimFailedOnly: true adds AND o.status = 3 to specialize for the retry-only path.

Three relays — pick by latency vs setup cost

Relay (polling, from @eventferry/core)

The default. Wakes every pollIntervalMs, runs the claim query, processes the batch, sleeps.

import { Relay } from "@eventferry/core";

const relay = new Relay({
  store,
  publisher,
  batchSize: 100,
  pollIntervalMs: 200,
  claimTimeoutMs: 60_000,
  retry: { maxAttempts: 5, initialBackoffMs: 1_000, factor: 2 },
});
await relay.start();

Use when: you can tolerate 100–500ms enqueue-to-publish latency. Zero cluster config required — works on any Postgres.

PostgresNotifyWaker

LISTEN/NOTIFY wakes the polling relay on INSERT. The polling claim path is unchanged; you just don't sit idle waiting for the next tick when work has arrived.

import { Relay } from "@eventferry/core";
import { PostgresStore, PostgresNotifyWaker } from "@eventferry/postgres";

const waker = new PostgresNotifyWaker({ pool, channel: "outbox_notify" });
const relay = new Relay({ store, publisher, waker });
await relay.start();

The createMigrationSql migration installs the trigger that fires pg_notify('outbox_notify', '') on INSERT. If you keep the default channel name, no extra setup. If you rename, pass the same name to both the migration helper and the waker.

Use when: you want sub-second latency on a vanilla Postgres without touching wal_level.

PostgresStreamingRelay

Logical replication. The relay subscribes to pgoutput and reacts on INSERT events in the outbox table — no claim query in the hot path, no polling at all.

import { PostgresStore, PostgresStreamingRelay } from "@eventferry/postgres";

const streaming = new PostgresStreamingRelay({
  store,
  publisher,
  slot: "outbox_slot",                    // physical replication slot name
  publication: "outbox_pub",              // logical publication name (auto-created)
  pollIntervalMs: 60_000,                 // fallback poll for missed events
});
await streaming.start();

Requirements:

  • wal_level=logical on the cluster (postgresql.conf or ALTER SYSTEM SET wal_level = logical).
  • A role with REPLICATION privilege.
  • max_wal_senders headroom for one extra slot.

Use when: sub-millisecond latency matters and you control the cluster config. The streaming relay still does the claim query as a fallback every pollIntervalMs so a missed replication event doesn't strand a row — set it to something high (60s+) since it's the safety net, not the hot path.


purgeDone retention

The store ships a batched delete for done rows older than a cutoff:

await store.purgeDone({
  olderThanMs: 7 * 24 * 60 * 60 * 1000,   // 7 days
  batchSize: 1_000,                       // delete in chunks to avoid long locks
});

Run it from cron / a sidecar / a startup task — eventferry has no built-in scheduler. The idx_outbox_done_processed_at partial index makes this cheap.

Why a hard delete instead of a partition? Most apps run small outboxes. If yours grows past 10M rows, the right move is partitioning by processed_at month + dropping old partitions; that's beyond what eventferry ships, but the schema supports it without changes.


Observability

PostgresStore and the relays don't log on their own. Pass a Logger (the same interface used across @eventferry/core) to route diagnostics:

const relay = new Relay({
  store,
  publisher,
  logger: pinoAdapter,                    // anything with { debug, info, warn, error }
  hooks: {
    onBatchClaimed: (n) => metrics.gauge("outbox.claim_size", n),
    onBatchPublished: (results) =>
      metrics.counter("outbox.publish_ok", results.filter((r) => r.ok).length),
    onFailed: (record, err, willRetry) =>
      metrics.counter("outbox.failed", 1, { willRetry }),
  },
});

Full hook list and tracing setup → Observability.


What's next

Clone this wiki locally