Skip to content

Getting Started

SametGoktepe edited this page Jun 17, 2026 · 1 revision

A working transactional outbox in 30 seconds. This page walks through Postgres → Kafka because that's the most common pairing; MySQL works identically with @eventferry/mysql and @confluentinc/kafka-javascript swaps in for the Confluent driver.


1. Install

# Pick your database
npm i @eventferry/core @eventferry/postgres pg
# or:
# npm i @eventferry/core @eventferry/mysql mysql2

# Pick a Kafka client (kafkajs is pure-JS, confluent is librdkafka)
npm i @eventferry/kafka kafkajs
# or:
# npm i @eventferry/kafka @confluentinc/kafka-javascript

If you want the whole toolkit in one shot:

npm i @eventferry/all pg kafkajs

@eventferry/all is a meta-package that re-exports core + postgres + mysql + kafka + schema-registry. Convenient for getting started; in production prefer the individual packages so your install footprint matches your actual surface area.


2. Create the outbox table

eventferry ships the migration SQL as a function — no separate tool, no migration runner required. Run it inside your existing schema management.

Postgres

import { createMigrationSql } from "@eventferry/postgres";

await pool.query(createMigrationSql("outbox"));

This creates an outbox table with the columns the store needs, plus the indexes that make the claim query (SKIP LOCKED over pending + due-failed rows, head-of-aggregate filter) cheap.

MySQL

import { createMigrationSql } from "@eventferry/mysql";

await connection.query(createMigrationSql("outbox"));

The MySQL version uses an id BIGINT AUTO_INCREMENT primary key and JSON columns for payload + headers. Server-side TZ-safe — every reaper / due check uses INTERVAL SECOND so the application server's clock and the DB's time_zone setting can't drift.

You're free to pick the table name — pass anything to createMigrationSql(yourTable). Just remember to construct the store with the same name.


3. Enqueue inside your business transaction

The whole point of the outbox pattern: the event row lives in the same transaction as the business change. Either both commit, or neither does. No dual-write, no message loss.

import { PostgresStore } from "@eventferry/postgres";
import { defineOutbox } from "@eventferry/core";
import { z } from "zod";

const store = new PostgresStore({ pool, table: "outbox" });

// Optional: declare a typed registry. Same registry used by the consumer.
const events = defineOutbox(
  {
    "orders.created": {
      aggregateType: "order",
      schema: z.object({ orderId: z.string(), total: z.number() }),
    },
  },
  { store },
);

// Inside your business handler:
const client = await pool.connect();
try {
  await client.query("BEGIN");

  // 1. Business change
  const order = await client.query(
    "INSERT INTO orders (...) VALUES (...) RETURNING *",
    [...],
  );

  // 2. Outbox event — validated against the schema, written in the SAME tx
  await events.enqueue(client, "orders.created", {
    aggregateId: order.rows[0].id,
    payload: { orderId: order.rows[0].id, total: order.rows[0].total },
  });

  await client.query("COMMIT");
} finally {
  client.release();
}

If the INSERT fails or you ROLLBACK, the outbox row vanishes too. If the COMMIT succeeds, the event row is durable — whatever happens to the relay process next, the event reaches Kafka.


4. Wire the relay → publisher

In a separate process (or a sidecar in the same one — your choice):

import { Relay } from "@eventferry/core";
import { PostgresStore } from "@eventferry/postgres";
import { KafkaPublisher } from "@eventferry/kafka";

const store = new PostgresStore({ pool, table: "outbox" });

const publisher = new KafkaPublisher({
  brokers: ["broker:9092"],
  idempotent: true, // dedup + ordering on the wire
});
await publisher.connect();

const relay = new Relay({
  store,
  publisher,
  batchSize: 100,
  pollIntervalMs: 200,
});

await relay.start();
// process stays alive — Ctrl+C / SIGTERM triggers a graceful drain.

That's it. The relay claims a batch via SKIP LOCKED, hands it to the publisher, marks the rows done on success, schedules retries on transient failure, and routes to the DLQ on poison / max-attempts.


5. Consume what eventferry produced

eventferry is publisher-only — your consumer is whatever Kafka client you already use. Two helpers make the loop ergonomic:

import { Kafka } from "kafkajs";
import { decode, extractTraceContext } from "@eventferry/kafka/consume";
import { defineOutbox } from "@eventferry/core";
import { registry } from "./outbox-registry"; // SAME registry as the producer

const consumerEvents = defineOutbox(registry);

const consumer = new Kafka({ brokers: ["broker:9092"] }).consumer({
  groupId: "orders-worker",
});
await consumer.connect();
await consumer.subscribe({ topic: "orders.created" });

await consumer.run({
  eachMessage: async ({ message }) => {
    const m = decode(message, { decoder: "utf8" });
    const trace = extractTraceContext(message.headers);
    if (trace) startConsumerSpan(trace.traceId, trace.spanId);

    const event = await consumerEvents.decode("orders.created", m.value!);
    //    ^? { orderId: string; total: number }

    await handle(event);
  },
});

The typed event came straight from the same Standard Schema registry. The producer validated on enqueue; the consumer validates on decode. Schema drift between sides is impossible by construction.

See Consuming Events for the full consumer guide, Dead-Letter Queue for DLQ recipes.


Where next

Clone this wiki locally