-
-
Notifications
You must be signed in to change notification settings - Fork 0
MySQL Adapter
MySQL 8+ / MariaDB 10.6+ store, migration SQL, and the binlog-driven relay. Mirrors @eventferry/postgres surface — same OutboxStore contract, MySQL-flavored claim query and reaper.
npm i @eventferry/mysql mysql2
# Optional, only for MysqlBinlogRelay:
npm i @vlasky/zongjiBoth mysql2 and @vlasky/zongji are optional peer dependencies.
The claim query uses SELECT ... FOR UPDATE SKIP LOCKED. That's MySQL 8.0+ and MariaDB 10.6+. Older versions can't run the batched claim without blocking concurrent relays — and eventferry's whole concurrency story rests on SKIP LOCKED. If you're stuck on MySQL 5.7 or MariaDB <10.6, the right move is to upgrade rather than work around the missing feature.
import { createMigrationSql } from "@eventferry/mysql";
await connection.query(createMigrationSql("outbox"));Returns DDL covering the table + indexes as a single statement string.
CREATE TABLE `outbox` (
`id` BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
`message_id` VARCHAR(255) NOT NULL,
`topic` VARCHAR(255) NOT NULL,
`aggregate_type` VARCHAR(255) NOT NULL,
`aggregate_id` VARCHAR(255) NOT NULL,
`partition_key` VARCHAR(255),
`payload` JSON NOT NULL,
`headers` JSON NOT NULL,
`trace_id` VARCHAR(255),
`status` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`attempts` INT UNSIGNED NOT NULL DEFAULT 0,
`claimed_at` DATETIME(3),
`next_retry_at` DATETIME(3),
`created_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`processed_at` DATETIME(3),
INDEX `idx_outbox_claim` (`status`, `aggregate_id`, `id`),
INDEX `idx_outbox_aggregate_head` (`aggregate_id`, `id`),
INDEX `idx_outbox_done_processed_at` (`status`, `processed_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;-
DATETIME(3)for millisecond precision onclaimed_at/next_retry_at(the reaper window is tens of seconds; millisecond precision is for monitoring, not correctness). -
utf8mb4so unicode + emoji round-trip cleanly. The integration test suite hammers this with"merhaba — 你好 — שלום — 🎉"payloads on every run. -
InnoDBis required forSKIP LOCKED. MyISAM is unsupported and unsafe for any concurrent workload.
import { MysqlStore } from "@eventferry/mysql";
const store = new MysqlStore({
pool, // mysql2/promise Pool
table: "outbox",
claimTimeoutMs: 60_000,
claimFailedOnly: false,
});Options mirror PostgresStore. The pool must be the mysql2/promise flavor — eventferry uses async / await throughout.
const conn = await pool.getConnection();
try {
await conn.beginTransaction();
// ...business changes...
await store.enqueue(conn, {
topic: "orders.created",
aggregateType: "order",
aggregateId: order.id,
payload: { orderId: order.id, total: order.total },
headers: { "x-tenant": tenantId },
key: order.customerId,
});
await conn.commit();
} catch (err) {
await conn.rollback();
throw err;
} finally {
conn.release();
}The conn you pass to enqueue is your own transactional connection. Outbox row commits or rolls back atomically with the business change.
Same shape as Postgres, MySQL flavor:
SELECT id, ...
FROM `outbox` o
WHERE
(o.status = 0 OR (o.status = 3 AND o.next_retry_at <= NOW(3)))
-- Server-side TZ-safe reaper: INTERVAL SECOND uses the DB's clock
AND (o.claimed_at IS NULL OR o.claimed_at < NOW(3) - INTERVAL 60 SECOND)
AND NOT EXISTS (
SELECT 1 FROM `outbox` e
WHERE e.aggregate_id = o.aggregate_id
AND e.id < o.id
AND e.status IN (1, 3)
)
ORDER BY o.aggregate_id, o.id
LIMIT ?
FOR UPDATE SKIP LOCKED;
UPDATE `outbox` SET status = 1, claimed_at = NOW(3)
WHERE id IN (?);Server-side INTERVAL SECOND — the reaper computes the cutoff using the database's clock, never the application's. This dodges a real failure mode where mysql2 returns a Date object that the application server converts in its local timezone before sending back as a query parameter — drift between the two timezones would either stall the reaper or trigger it too eagerly.
SKIP LOCKED gives you the same concurrency story as Postgres: parallel relays never block, never double-claim.
import { MysqlStore, MysqlBinlogRelay } from "@eventferry/mysql";
const relay = new MysqlBinlogRelay({
store,
publisher,
mysql: {
host: "db.internal",
port: 3306,
user: "eventferry",
password: process.env.MYSQL_PASSWORD,
},
serverId: 1042, // unique replica id
filter: { databases: ["app"], tables: ["outbox"] },
pollIntervalMs: 60_000, // fallback safety net
});
await relay.start();Driven by ROW-mode binlog events via @vlasky/zongji. As INSERTs land in the outbox table, the relay claims and publishes them with no polling tax.
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL
gtid-mode=ON
enforce-gtid-consistency=ONAnd the relay user needs replication grants:
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'eventferry'@'%';
FLUSH PRIVILEGES;The integration suite spins this up automatically via withCopyContentToContainer so the docker container boots with the right config — see packages/integration/test/setup/containers.ts for the canonical setup.
- Managed MySQL where you can't enable binlog (rare on RDS / Aurora / Cloud SQL — they support it, but require flag toggles).
- A read-replica without binlog forwarding.
- You're already running Debezium / Kafka Connect for CDC. eventferry's binlog relay is intentionally lighter (no Connect cluster), but if you have one running, integrate via Connect instead.
Polling Relay is always the fallback — same MysqlStore, just import { Relay } from "@eventferry/core".
await store.purgeDone({
olderThanMs: 7 * 24 * 60 * 60 * 1000,
batchSize: 1_000,
});Server-side DATETIME(3) - INTERVAL ? SECOND, batched delete. Backed by idx_outbox_done_processed_at partial-ish index (MySQL doesn't have true partial indexes, but the index is leading on status so the planner uses an index range scan).
Schedule from cron / a sidecar.
Same Logger + hook interface as Postgres:
const relay = new Relay({
store,
publisher,
logger,
hooks: {
onBatchClaimed: (n) => metrics.gauge("outbox.claim_size", n),
onFailed: (record, err, willRetry) =>
metrics.counter("outbox.failed", 1, { willRetry }),
},
});Full coverage → Observability.
- The Postgres equivalent for comparison → Postgres Adapter
- Configure the publisher → Kafka Publisher
- Production tuning → Operations Guide
Repository · Issues · npm: @eventferry/all · MIT
Get going
Adapters
Type & schema
Security
Operational
- Transactions and EOS
- Admin Operations
- Observability
- Consuming Events
- Dead-Letter Queue
- Reliability and Error Handling
Operations
Reference