-
-
Notifications
You must be signed in to change notification settings - Fork 0
Postgres Adapter
PostgreSQL 12+ store, migration SQL, and three relay flavors (polling, notify-waker, streaming). This page covers configuration, the schema, the claim query semantics, and the trade-offs between the relay variants.
npm i @eventferry/postgres pg
# Optional, only for PostgresStreamingRelay:
npm i pg-logical-replicationpg and pg-logical-replication are optional peer dependencies — you install what you actually use.
import { createMigrationSql } from "@eventferry/postgres";
await pool.query(createMigrationSql("outbox"));createMigrationSql(table) returns a single SQL string covering the table + indexes. Run it in your migration tool (Prisma, Drizzle, Kysely, plain pg, whatever).
CREATE TABLE outbox (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
message_id TEXT NOT NULL, -- producer-side idempotency key
topic TEXT NOT NULL,
aggregate_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
partition_key TEXT, -- optional explicit partition key
payload JSONB NOT NULL,
headers JSONB NOT NULL DEFAULT '{}'::jsonb,
trace_id TEXT,
status SMALLINT NOT NULL DEFAULT 0, -- 0=pending 1=processing 2=done 3=failed 4=dead
attempts INT NOT NULL DEFAULT 0,
claimed_at TIMESTAMPTZ,
next_retry_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
processed_at TIMESTAMPTZ
);Indexes drop in alongside the table:
-
idx_outbox_claim— partial index overstatus IN (0,3)ordered by(aggregate_id, id). The claim query rides this. -
idx_outbox_aggregate_head—(aggregate_id, id)for the head-of-aggregate filter. -
idx_outbox_done_processed_at— partial overstatus = 2, used bypurgeDone.
The table name is yours to pick. Multiple outboxes per database (e.g. outbox_orders, outbox_users) are fine — each gets its own PostgresStore instance.
import { PostgresStore } from "@eventferry/postgres";
const store = new PostgresStore({
pool, // pg.Pool
table: "outbox", // matches createMigrationSql arg
claimTimeoutMs: 60_000, // default — see below
claimFailedOnly: false, // default — claim pending OR due-failed
});| Option | Default | What it does |
|---|---|---|
pool |
required |
pg's Pool (or anything pool-shaped). The store calls pool.connect for the reaper window and pool.query for everything else. |
table |
"outbox" |
The table name passed to createMigrationSql. Quoted automatically. |
claimTimeoutMs |
60_000 |
Rows whose claimed_at is older than this are eligible for reclaim. Tune above your slowest typical publish; otherwise a slow batch gets reclaimed mid-flight. |
claimFailedOnly |
false |
If true, the claim query only matches status=3 AND next_retry_at <= now(). Useful for a dedicated "retry-only" relay instance. |
const client = await pool.connect();
try {
await client.query("BEGIN");
// ...business changes...
await store.enqueue(client, {
topic: "orders.created",
aggregateType: "order",
aggregateId: order.id,
payload: { orderId: order.id, total: order.total },
headers: { "x-tenant": tenantId }, // optional
key: order.customerId, // optional, becomes Kafka message key
messageId: crypto.randomUUID(), // optional, generated if absent
traceId: getCurrentTraceId(), // optional
});
await client.query("COMMIT");
} finally {
client.release();
}enqueue takes the same client that owns your business transaction. The outbox row commits or rolls back with it — no separate transaction, no risk of one side surviving the other.
The heart of strict head-of-aggregate ordering. Roughly:
SELECT id, ...
FROM outbox o
WHERE
-- Eligible: pending, or failed-and-due
(o.status = 0 OR (o.status = 3 AND o.next_retry_at <= now()))
-- Reaper: pick up rows whose claim expired
AND (o.claimed_at IS NULL OR o.claimed_at < now() - interval '60 seconds')
-- Strict head-of-aggregate: skip if an earlier row for the same aggregate
-- is still in flight
AND NOT EXISTS (
SELECT 1 FROM outbox e
WHERE e.aggregate_id = o.aggregate_id
AND e.id < o.id
AND e.status IN (1, 3)
)
ORDER BY o.aggregate_id, o.id
LIMIT $1
FOR UPDATE SKIP LOCKED;
UPDATE outbox SET status = 1, claimed_at = now()
WHERE id = ANY($claimed_ids);Things to notice:
-
SKIP LOCKEDmeans concurrent relays never block each other and never see the same row twice. - The
NOT EXISTSclause is what enforces per-aggregate ordering. If aggregateA's event #1 isprocessing, event #2 stays unclaimable until #1 lands indone(or is reaped). - The reaper window is server-side (
now() - interval '60 seconds') — application-clock skew between the relay process and the DB host is irrelevant. -
claimFailedOnly: trueaddsAND o.status = 3to specialize for the retry-only path.
The default. Wakes every pollIntervalMs, runs the claim query, processes the batch, sleeps.
import { Relay } from "@eventferry/core";
const relay = new Relay({
store,
publisher,
batchSize: 100,
pollIntervalMs: 200,
claimTimeoutMs: 60_000,
retry: { maxAttempts: 5, initialBackoffMs: 1_000, factor: 2 },
});
await relay.start();Use when: you can tolerate 100–500ms enqueue-to-publish latency. Zero cluster config required — works on any Postgres.
LISTEN/NOTIFY wakes the polling relay on INSERT. The polling claim path is unchanged; you just don't sit idle waiting for the next tick when work has arrived.
import { Relay } from "@eventferry/core";
import { PostgresStore, PostgresNotifyWaker } from "@eventferry/postgres";
const waker = new PostgresNotifyWaker({ pool, channel: "outbox_notify" });
const relay = new Relay({ store, publisher, waker });
await relay.start();The createMigrationSql migration installs the trigger that fires pg_notify('outbox_notify', '') on INSERT. If you keep the default channel name, no extra setup. If you rename, pass the same name to both the migration helper and the waker.
Use when: you want sub-second latency on a vanilla Postgres without touching wal_level.
Logical replication. The relay subscribes to pgoutput and reacts on INSERT events in the outbox table — no claim query in the hot path, no polling at all.
import { PostgresStore, PostgresStreamingRelay } from "@eventferry/postgres";
const streaming = new PostgresStreamingRelay({
store,
publisher,
slot: "outbox_slot", // physical replication slot name
publication: "outbox_pub", // logical publication name (auto-created)
pollIntervalMs: 60_000, // fallback poll for missed events
});
await streaming.start();Requirements:
-
wal_level=logicalon the cluster (postgresql.conforALTER SYSTEM SET wal_level = logical). - A role with
REPLICATIONprivilege. -
max_wal_sendersheadroom for one extra slot.
Use when: sub-millisecond latency matters and you control the cluster config. The streaming relay still does the claim query as a fallback every pollIntervalMs so a missed replication event doesn't strand a row — set it to something high (60s+) since it's the safety net, not the hot path.
The store ships a batched delete for done rows older than a cutoff:
await store.purgeDone({
olderThanMs: 7 * 24 * 60 * 60 * 1000, // 7 days
batchSize: 1_000, // delete in chunks to avoid long locks
});Run it from cron / a sidecar / a startup task — eventferry has no built-in scheduler. The idx_outbox_done_processed_at partial index makes this cheap.
Why a hard delete instead of a partition? Most apps run small outboxes. If yours grows past 10M rows, the right move is partitioning by processed_at month + dropping old partitions; that's beyond what eventferry ships, but the schema supports it without changes.
PostgresStore and the relays don't log on their own. Pass a Logger (the same interface used across @eventferry/core) to route diagnostics:
const relay = new Relay({
store,
publisher,
logger: pinoAdapter, // anything with { debug, info, warn, error }
hooks: {
onBatchClaimed: (n) => metrics.gauge("outbox.claim_size", n),
onBatchPublished: (results) =>
metrics.counter("outbox.publish_ok", results.filter((r) => r.ok).length),
onFailed: (record, err, willRetry) =>
metrics.counter("outbox.failed", 1, { willRetry }),
},
});Full hook list and tracing setup → Observability.
- The dual story for MySQL → MySQL Adapter
- Configure the publisher → Kafka Publisher
- Run multiple relays in production → Operations Guide
- Understand the claim invariants → Core Concepts
Repository · Issues · npm: @eventferry/all · MIT
Get going
Adapters
Type & schema
Security
Operational
- Transactions and EOS
- Admin Operations
- Observability
- Consuming Events
- Dead-Letter Queue
- Reliability and Error Handling
Operations
Reference