Skip to content

Kafka Publisher

SametGoktepe edited this page Jun 17, 2026 · 1 revision

KafkaPublisher is the Publisher interface implementation for @eventferry/kafka. It wraps either kafkajs or @confluentinc/kafka-javascript (librdkafka) and adds error classification, DLQ enrichment, OTel tracing, lifecycle hooks, admin operations, health checks, and producer-fenced recovery.

This page covers the core configuration. Deeper subsurfaces live on their own pages — security on Authentication and TLS, Schema Registry on Schema Registry, EOS on Transactions and EOS, OTel on Observability, admin on Admin Operations.


Install + pick a driver

npm i @eventferry/kafka

# Pick exactly ONE of these:
npm i kafkajs                              # pure JS, no native build
npm i @confluentinc/kafka-javascript       # librdkafka, fine-grained tuning

Both drivers are optional peers — install only the one you'll use.

Which driver

Need Use
Default. Pure JS, runs in Lambda / Alpine / wherever Node runs kafkajs
Fine producer tuning (lingerMs, batchSize, deliveryTimeoutMs, compression level) confluent
librdkafka stats JSON for observability dashboards confluent
AWS MSK IAM with high message rates confluent (better SASL/OAUTHBEARER throughput)
Custom partitioner factory kafkajs (librdkafka's partitioner is a C-level extension point, no JS callback)

Switch with driver: "kafkajs" | "confluent" — default is "kafkajs".


Minimal config

import { KafkaPublisher } from "@eventferry/kafka";

const publisher = new KafkaPublisher({
  brokers: ["broker-1:9092", "broker-2:9092"],
  clientId: "my-service",
  idempotent: true,                        // dedup + ordering on the wire (default true)
});

await publisher.connect();
// hand to a relay

brokers is the bootstrap list. clientId defaults to "eventferry" — set it to your service name so broker-side logs are searchable.


Producer tuning

The publisher exposes a normalized tuning surface. Both drivers honor the universally supported knobs; the rest are confluent-only and log a one-time warning on kafkajs.

new KafkaPublisher({
  brokers,
  driver: "confluent",

  // Universal
  idempotent: true,
  acks: -1,                                // -1/"all" (default), 0, or 1
  compression: "zstd",                     // none | gzip | snappy | lz4 | zstd
  maxInFlightRequests: 5,                  // ≤5 when idempotent
  requestTimeoutMs: 30_000,
  transactionTimeoutMs: 60_000,

  // confluent only — kafkajs warns + ignores
  lingerMs: 25,                            // accumulate up to 25ms before flushing a partition batch
  batchSize: 131_072,                      // max bytes per partition batch
  deliveryTimeoutMs: 120_000,              // end-to-end record timeout
  maxRequestSize: 2_000_000,               // single record cap, ≤ broker's message.max.bytes
  compressionLevel: 9,                     // librdkafka compression.level (zstd: 1–22)
});

lingerMs and the producer-buffer trade-off

lingerMs: 25 says "wait up to 25ms to accumulate more records before sending a partition batch." This is the single biggest knob for throughput vs latency on the confluent driver.

  • lingerMs: 0 (default) — ship every record immediately. Lowest latency, highest request rate on the broker.
  • lingerMs: 25 — typical compromise. Lets a busy aggregate fill a batch; minimal extra latency.
  • lingerMs: 100+ — high throughput regimes (millions of msgs/sec). Latency-sensitive workloads should never go this high.

The relay's claim batch (batchSize: 100 in Relay config) is already a batchlingerMs adds a second layer of batching at the driver level. Both are useful at high throughput; at low throughput keep lingerMs: 0.


Partitioning

By default Kafka hashes the message key to a partition. eventferry honors three lever:

  1. PublishableMessage.key — the relay sets this from OutboxMessageInput.key if provided, otherwise from aggregateId. Default behavior: same aggregate → same key → same partition.
  2. PublishableMessage.partition — explicit per-message partition override. Wins over the key. Set when you have a stronger routing requirement than key-hash.
  3. customPartitioner (kafkajs only) — factory passed straight to kafka.producer({ createPartitioner }). Overrides the partitioner preset entirely.
// Always route to partition 0 (deterministic, used by integration tests)
new KafkaPublisher({
  brokers,
  driver: "kafkajs",
  customPartitioner: () => () => 0,
});

// Default partitioner choice on kafkajs (the warning silencer)
new KafkaPublisher({
  brokers,
  partitioner: "java-compatible",          // matches Java client murmur2 hash
});

partitioner accepts "java-compatible" (recommended for greenfield), "legacy", or "default". Setting any value silences kafkajs's KafkaJSPartitionerNotSpecified warning.


DLQ routing

When a record exhausts retry.maxAttempts or hits a fatal / poison error, the relay calls publisher.publishToDlq(message, error). The DLQ message rides on ${topic}.dlq by default and carries:

Header Value
dlq-reason error.message
dlq-error-class KafkaJSProtocolError, RecordTooLargeException, etc. Use this for routing, not dlq-reason text.
dlq-original-topic The topic the record was originally destined for
dlq-failed-at ISO timestamp
dlq-attempts How many tries the relay made
dlq-stack (opt-in) Truncated UTF-8 stack

Full DLQ consumer recipe → Dead-Letter Queue.


Error classification

Every failed PublishResult carries an errorKind:

Kind Relay reaction
retriable Retry per backoff. Default for unclassified errors.
fatal Skip retry, DLQ + dead immediately.
poison Same as fatal. Reserved for record-level rejection (oversized, corrupt).
backpressure Requeue WITHOUT incrementing attempts; the producer buffer is full.
quota Retry with a longer backoff (retry.quotaMultiplier).
fenced The broker fenced this producer epoch. See Transactions and EOS.

The publisher classifies natively via the kafkajs / librdkafka error tables. Full mapping → Reliability and Error Handling.


Power-user escape hatches

When the typed surface doesn't reach a knob you need, drop down to the native client:

new KafkaPublisher({
  brokers,
  driver: "confluent",
  rawProducerConfig: {                     // librdkafka keys — wins over translated config
    "queue.buffering.max.messages": 100_000,
    "statistics.interval.ms": 5_000,
    "socket.keepalive.enable": true,
  },
});

new KafkaPublisher({
  brokers,
  driver: "kafkajs",
  rawKafkaJsProducerConfig: {              // kafkajs producer keys — wins over translated config
    retry: { retries: 7, initialRetryTime: 250 },
    metadataMaxAge: 5_000,
  },
});

Native keys win against eventferry's translated ones — that's the contract of an escape hatch.


Lifecycle hooks

new KafkaPublisher({
  brokers,
  hooks: {
    onConnect: () => log.info("publisher connected"),
    onDisconnect: () => log.info("publisher disconnected"),
    onPublish: (result, message) => {
      metrics.counter("kafka.publish", 1, { ok: result.ok, topic: message.topic });
    },
    onError: (err, message) => log.error("publish failed", { err, topic: message?.topic }),
    onTransactionAbort: (err) => log.warn("txn aborted", { err }),
    onProducerFenced: (err) => log.warn("producer fenced", { err }),
  },
});

Hooks are wrapped in try/catch — a misbehaving observer can never break publishing.


What's next

Clone this wiki locally