Skip to content

MySQL Adapter

SametGoktepe edited this page Jun 17, 2026 · 1 revision

MySQL 8+ / MariaDB 10.6+ store, migration SQL, and the binlog-driven relay. Mirrors @eventferry/postgres surface — same OutboxStore contract, MySQL-flavored claim query and reaper.


Install

npm i @eventferry/mysql mysql2
# Optional, only for MysqlBinlogRelay:
npm i @vlasky/zongji

Both mysql2 and @vlasky/zongji are optional peer dependencies.


Why MySQL 8+ specifically

The claim query uses SELECT ... FOR UPDATE SKIP LOCKED. That's MySQL 8.0+ and MariaDB 10.6+. Older versions can't run the batched claim without blocking concurrent relays — and eventferry's whole concurrency story rests on SKIP LOCKED. If you're stuck on MySQL 5.7 or MariaDB <10.6, the right move is to upgrade rather than work around the missing feature.


Migration

import { createMigrationSql } from "@eventferry/mysql";

await connection.query(createMigrationSql("outbox"));

Returns DDL covering the table + indexes as a single statement string.

What the table looks like

CREATE TABLE `outbox` (
  `id`              BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
  `message_id`      VARCHAR(255) NOT NULL,
  `topic`           VARCHAR(255) NOT NULL,
  `aggregate_type`  VARCHAR(255) NOT NULL,
  `aggregate_id`    VARCHAR(255) NOT NULL,
  `partition_key`   VARCHAR(255),
  `payload`         JSON NOT NULL,
  `headers`         JSON NOT NULL,
  `trace_id`        VARCHAR(255),
  `status`          TINYINT UNSIGNED NOT NULL DEFAULT 0,
  `attempts`        INT UNSIGNED NOT NULL DEFAULT 0,
  `claimed_at`      DATETIME(3),
  `next_retry_at`   DATETIME(3),
  `created_at`      DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
  `processed_at`    DATETIME(3),
  INDEX `idx_outbox_claim` (`status`, `aggregate_id`, `id`),
  INDEX `idx_outbox_aggregate_head` (`aggregate_id`, `id`),
  INDEX `idx_outbox_done_processed_at` (`status`, `processed_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  • DATETIME(3) for millisecond precision on claimed_at / next_retry_at (the reaper window is tens of seconds; millisecond precision is for monitoring, not correctness).
  • utf8mb4 so unicode + emoji round-trip cleanly. The integration test suite hammers this with "merhaba — 你好 — שלום — 🎉" payloads on every run.
  • InnoDB is required for SKIP LOCKED. MyISAM is unsupported and unsafe for any concurrent workload.

MysqlStore

import { MysqlStore } from "@eventferry/mysql";

const store = new MysqlStore({
  pool,                        // mysql2/promise Pool
  table: "outbox",
  claimTimeoutMs: 60_000,
  claimFailedOnly: false,
});

Options mirror PostgresStore. The pool must be the mysql2/promise flavor — eventferry uses async / await throughout.

Enqueue inside your transaction

const conn = await pool.getConnection();
try {
  await conn.beginTransaction();

  // ...business changes...

  await store.enqueue(conn, {
    topic: "orders.created",
    aggregateType: "order",
    aggregateId: order.id,
    payload: { orderId: order.id, total: order.total },
    headers: { "x-tenant": tenantId },
    key: order.customerId,
  });

  await conn.commit();
} catch (err) {
  await conn.rollback();
  throw err;
} finally {
  conn.release();
}

The conn you pass to enqueue is your own transactional connection. Outbox row commits or rolls back atomically with the business change.


The claim query

Same shape as Postgres, MySQL flavor:

SELECT id, ...
FROM `outbox` o
WHERE
  (o.status = 0 OR (o.status = 3 AND o.next_retry_at <= NOW(3)))
  -- Server-side TZ-safe reaper: INTERVAL SECOND uses the DB's clock
  AND (o.claimed_at IS NULL OR o.claimed_at < NOW(3) - INTERVAL 60 SECOND)
  AND NOT EXISTS (
    SELECT 1 FROM `outbox` e
    WHERE e.aggregate_id = o.aggregate_id
      AND e.id < o.id
      AND e.status IN (1, 3)
  )
ORDER BY o.aggregate_id, o.id
LIMIT ?
FOR UPDATE SKIP LOCKED;

UPDATE `outbox` SET status = 1, claimed_at = NOW(3)
WHERE id IN (?);

Server-side INTERVAL SECOND — the reaper computes the cutoff using the database's clock, never the application's. This dodges a real failure mode where mysql2 returns a Date object that the application server converts in its local timezone before sending back as a query parameter — drift between the two timezones would either stall the reaper or trigger it too eagerly.

SKIP LOCKED gives you the same concurrency story as Postgres: parallel relays never block, never double-claim.


MysqlBinlogRelay — sub-millisecond latency

import { MysqlStore, MysqlBinlogRelay } from "@eventferry/mysql";

const relay = new MysqlBinlogRelay({
  store,
  publisher,
  mysql: {
    host: "db.internal",
    port: 3306,
    user: "eventferry",
    password: process.env.MYSQL_PASSWORD,
  },
  serverId: 1042,                              // unique replica id
  filter: { databases: ["app"], tables: ["outbox"] },
  pollIntervalMs: 60_000,                      // fallback safety net
});
await relay.start();

Driven by ROW-mode binlog events via @vlasky/zongji. As INSERTs land in the outbox table, the relay claims and publishes them with no polling tax.

Required MySQL config

[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL
gtid-mode=ON
enforce-gtid-consistency=ON

And the relay user needs replication grants:

GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'eventferry'@'%';
FLUSH PRIVILEGES;

The integration suite spins this up automatically via withCopyContentToContainer so the docker container boots with the right config — see packages/integration/test/setup/containers.ts for the canonical setup.

When NOT to use the binlog relay

  • Managed MySQL where you can't enable binlog (rare on RDS / Aurora / Cloud SQL — they support it, but require flag toggles).
  • A read-replica without binlog forwarding.
  • You're already running Debezium / Kafka Connect for CDC. eventferry's binlog relay is intentionally lighter (no Connect cluster), but if you have one running, integrate via Connect instead.

Polling Relay is always the fallback — same MysqlStore, just import { Relay } from "@eventferry/core".


purgeDone retention

await store.purgeDone({
  olderThanMs: 7 * 24 * 60 * 60 * 1000,
  batchSize: 1_000,
});

Server-side DATETIME(3) - INTERVAL ? SECOND, batched delete. Backed by idx_outbox_done_processed_at partial-ish index (MySQL doesn't have true partial indexes, but the index is leading on status so the planner uses an index range scan).

Schedule from cron / a sidecar.


Observability

Same Logger + hook interface as Postgres:

const relay = new Relay({
  store,
  publisher,
  logger,
  hooks: {
    onBatchClaimed: (n) => metrics.gauge("outbox.claim_size", n),
    onFailed: (record, err, willRetry) =>
      metrics.counter("outbox.failed", 1, { willRetry }),
  },
});

Full coverage → Observability.


What's next

Clone this wiki locally