-
-
Notifications
You must be signed in to change notification settings - Fork 0
Transactions and EOS
Idempotent vs transactional producers, the fenced restart path, multi-instance EOS strategy, and the transactional.id rules you cannot break without losing exactly-once.
eventferry's at-least-once contract works everywhere. Exactly-once requires opting into the transactional producer and following the multi-instance rules below.
new KafkaPublisher({
brokers,
idempotent: true, // default true
maxInFlightRequests: 5, // ≤5 when idempotent
});Kafka's idempotent producer assigns a producer id (PID) at first connect and a per-partition sequence number to every record. The broker dedups on (PID, partition, sequence), so a record won the retry race exactly once — no duplicates on the wire.
Limits of idempotent alone:
- Dedup scope is per producer instance. If your producer crashes and restarts, it gets a fresh PID — the broker doesn't know the new producer is the same logical service. Outbox-side dedup (the
done-status transition) handles this. - Dedup scope is per partition. Doesn't help across partitions.
- No cross-record atomicity. You can't "publish 5 records or none."
For most outbox workloads, idempotent + at-least-once is the right contract. Reach for the transactional producer when you need cross-record atomicity OR when you publish multiple records that downstream consumers must see as a unit.
new KafkaPublisher({
brokers,
transactional: true,
transactionalId: "orders-publisher",
idempotent: true, // forced true when transactional
});The transactional producer wraps each sendBatch in a Kafka transaction (init → send → commit or abort). Consumers running isolation.level=read_committed see all-or-nothing batches — no half-published batch ever becomes visible.
eventferry's sendBatch is one Kafka transaction. The relay claim batch (batchSize: 100 by default) is the atomic unit on the broker too.
transactionalId must be:
- Stable across a single instance's restarts — so the broker recognizes the restarted producer as the same logical actor and lets it abort any pending transaction from the previous epoch.
- Unique across instances — so two replicas of your service don't fence each other in a loop.
These two rules are in tension. A literal constant ("orders-publisher") is stable but not unique. A random UUID per process is unique but not stable. Neither works.
The right pattern: derive it from per-pod runtime context. Use the callable form:
new KafkaPublisher({
brokers,
transactional: true,
transactionalId: () =>
`${process.env.POD_NAME}-${process.env.HOSTNAME}-${replicaIndex()}`,
});Pod name on Kubernetes, ECS task ID on Fargate, AZ + replica index on a StatefulSet — anything that's deterministic per instance and unique across instances. The callable is resolved at connect() time, so dynamic context (resolved after process start) is fine.
PRODUCER_FENCED and INVALID_PRODUCER_EPOCH errors classify as errorKind: "fenced" — a distinct kind from fatal because some fences are transient:
- Broker restart (rolling deploy on MSK).
- Network partition recovery.
- A real multi-instance collision.
eventferry can't tell the cause from the wire; it surfaces the kind and lets you decide.
Opt in to a single transparent reconnect-and-retry:
new KafkaPublisher({
brokers,
transactional: true,
transactionalId: "orders-publisher",
autoRecoverFromFence: true,
});When a publish batch reports a fence:
- The
onProducerFenced(error)hook fires (regardless of the recovery flag — informational). - The driver disconnects + reconnects (
initTransactionsre-runs for transactional producers). - The same batch is resent once.
- Still fenced? The publisher gives up and surfaces the failures unchanged.
Concurrent fenced publishes share a single in-flight reconnect — the producer is never torn down twice while a recovery is in progress.
Enable when you're running a single producer instance against transient brokers (rolling restarts, network blips). A fence is almost certainly a transient epoch mismatch.
Leave it OFF when you're running multiple producer instances. Cross-instance fence is the broker telling the loser instance to stop — silently retrying again creates a thrashing leadership flip. Use a unique transactionalId per instance instead, and let the fence propagate so observability catches the collision.
Default is false to preserve the safer multi-instance semantics.
Are you running multiple replicas with the same workload?
│
├─ No (single instance) ──► transactionalId can be a constant.
│ autoRecoverFromFence: true is safe.
│
└─ Yes ────────────────► transactionalId MUST be unique per instance.
Callable form, derived from pod/replica context.
autoRecoverFromFence: false (default).
Monitor onProducerFenced — non-zero rate means a
transactionalId collision worth investigating.
| Don't | Why |
|---|---|
transactionalId: "publisher" on 3 replicas |
All 3 fence each other in a loop. |
transactionalId: crypto.randomUUID() |
Loses transaction recovery on restart — fresh UUID = fresh producer = no abort of the previous epoch's open txn. |
Forgetting transactionalId while transactional: true
|
Throws on driver construction — eventferry refuses. |
Single instance + transactional: true without enabling autoRecoverFromFence
|
Every broker rolling restart manifests as DLQ noise instead of recovery. |
new KafkaPublisher({ brokers, acks: -1 }); // defaultacks: -1 (or "all") waits for all in-sync replicas before acking. The strongest durability — survives a broker crash mid-write.
acks: 1 waits only for the leader. Faster, but loses the record if the leader crashes before replication. Acceptable for non-critical workloads.
acks: 0 fire-and-forget. Never use with the outbox pattern — the relay would mark rows done without proof the broker accepted them.
new KafkaPublisher({
brokers,
transactional: true,
transactionalId: "...",
transactionTimeoutMs: 60_000, // default
});The broker-side ceiling on how long a transaction can stay open before auto-abort. Default 60s; capped by the broker's transaction.max.timeout.ms (cluster-wide, often 15min).
Set higher when your sendBatch legitimately takes minutes (massive batches, slow links). Otherwise leave default — a longer timeout just delays detection of a stuck producer.
new KafkaPublisher({
brokers,
hooks: {
onTransactionAbort: (err) => {
log.warn("txn aborted", { err: err.message });
metrics.counter("kafka.txn_abort", 1);
},
},
});Fires when sendBatch's inner abort path runs (mid-batch driver error, broker rejection). The driver still proceeds to abort the underlying transaction and return per-record failures — the hook is best-effort observability.
- General error classification → Reliability and Error Handling
-
Consumer-side
read_committedrecommendation → Consuming Events - Production rollout checklist → Operations Guide
Repository · Issues · npm: @eventferry/all · MIT
Get going
Adapters
Type & schema
Security
Operational
- Transactions and EOS
- Admin Operations
- Observability
- Consuming Events
- Dead-Letter Queue
- Reliability and Error Handling
Operations
Reference