Skip to content

Consuming Events

SametGoktepe edited this page Jun 17, 2026 · 1 revision

eventferry is publisher-only — your consumer is whatever Kafka client you already use. This page covers the two helpers that make the consumer side ergonomic and the canonical patterns for typed payloads, trace continuation, and DLQ routing.


The two helpers

Imported via the @eventferry/kafka/consume subpath — producer-free, so your consumer doesn't pull in kafkajs/confluent producer code through the main entry.

Helper What
decode(message, { decoder }) Normalize the raw message shape (key/value/headers/timestamp/offset/partition) both kafkajs and confluent deliver. Built-in decoders: json (default), utf8, none, or a custom (bytes) => V.
extractTraceContext(headers) Strict W3C traceparent / tracestate parse — accepts both Buffer and string header shapes. Returns null on absent / invalid headers.

Canonical consumer loop

import { Kafka } from "kafkajs";
import { decode, extractTraceContext } from "@eventferry/kafka/consume";
import { defineOutbox } from "@eventferry/core";
import { registry } from "./outbox-registry";

const events = defineOutbox(registry);  // no store — consumer side

const consumer = new Kafka({ brokers }).consumer({ groupId: "orders-worker" });
await consumer.connect();
await consumer.subscribe({ topic: "orders.created", fromBeginning: false });

await consumer.run({
  eachMessage: async ({ message, topic }) => {
    // 1. Normalize the raw kafkajs/confluent shape.
    const m = decode(message, { decoder: "utf8" });

    // 2. Continue the W3C trace context (optional but recommended).
    const trace = extractTraceContext(message.headers);
    if (trace) startConsumerSpan(trace.traceId, trace.spanId);

    // 3. Typed + validated payload via the SAME registry as the producer.
    const event = await events.decode(topic as keyof typeof registry, m.value!);
    //    ^? typed per topic

    await handle(event);
  },
});

What decode returns

interface DecodedMessage<V> {
  key: string | null;
  value: V | null;           // null when value is a tombstone (compacted topic)
  headers: Record<string, string>;
  timestamp?: number;        // epoch ms
  offset?: string;           // stringified — Kafka offsets exceed 2^53
  partition?: number;
}

Empty / null value comes back as value: null for every decoder — tombstones are first-class on compacted topics. Test before passing to your handler.

Decoder choices

Decoder Use
"json" (default) JSON.parse(value.toString("utf8")). Throws labelled JSON.parse failed on malformed payloads.
"utf8" Raw text. Pair with defineOutbox.decode() for typed payload.
"none" Raw Buffer. Pair with binary deserializers (Avro / Protobuf / MessagePack).
(bytes: Buffer) => V Custom. e.g. (b) => avroSchema.fromBuffer(b).

Typed payload via the producer-side registry

When your consumer lives in the same monorepo as the producer, hand the decoded bytes to the same defineOutbox(registry) the producer used. decode validates against the topic's Standard Schema and returns the typed payload:

import { defineOutbox } from "@eventferry/core";
import { decode } from "@eventferry/kafka/consume";
import { registry } from "./outbox-registry";

const events = defineOutbox(registry);

await consumer.run({
  eachMessage: async ({ message, topic }) => {
    const m = decode(message, { decoder: "utf8" });
    const event = await events.decode(topic as keyof typeof registry, m.value!);
    //    ^? { orderId: string; total: number }
    await handle(event);
  },
});

events.decode(topic, bytes) throws OutboxValidationError if the topic isn't in the registry or the payload doesn't match the schema. Same registry, both sides — schema drift is impossible by construction.

For cross-language consumers (Go, Java, Python), skip this companion and use Confluent Schema Registry → Schema Registry.


W3C trace context — full example

The publisher's tracer.inject writes traceparent / tracestate headers. On the consumer, parse them with extractTraceContext and start a CONSUMER span as a child of the producer span:

import { trace, SpanKind, propagation } from "@opentelemetry/api";
import { decode, extractTraceContext } from "@eventferry/kafka/consume";

const otel = trace.getTracer("orders-worker");

await consumer.run({
  eachMessage: async ({ message, topic }) => {
    const traceCtx = extractTraceContext(message.headers);
    const headers = traceCtx ? { traceparent: traceCtx.traceparent } : {};
    const parentCtx = propagation.extract(otelContext.active(), headers);

    await otelContext.with(parentCtx, async () => {
      const span = otel.startSpan(`${topic} process`, { kind: SpanKind.CONSUMER });
      try {
        const m = decode(message);
        await handle(m.value);
        span.setStatus({ code: SpanStatusCode.OK });
      } catch (err) {
        span.recordException(err as Error);
        span.setStatus({ code: SpanStatusCode.ERROR });
        throw err;
      } finally {
        span.end();
      }
    });
  },
});

extractTraceContext strictly validates per the W3C spec — rejects all-zero IDs, version: ff, malformed hex. Returns null on any of those, so your consumer falls back to starting a fresh trace cleanly.


isolation.level for EOS consumers

If your producer uses transactional: true, configure consumers with isolation.level=read_committed so they see all-or-nothing batches:

const consumer = new Kafka({ brokers }).consumer({
  groupId: "orders-worker",
  readUncommitted: false,                 // kafkajs equivalent — default true
});

// confluent driver:
const consumer = new Kafka({ brokers, /* ... */ }).consumer({
  groupId: "orders-worker",
  // librdkafka config:
  // "isolation.level": "read_committed"  // via rawConsumerConfig
});

Without read_committed, consumers see in-flight transactional records that might still abort. Half-published batches become visible — defeats EOS on the consumer side.


DLQ consumer

DLQ records carry enriched headers — route by dlq-error-class, not by parsing dlq-reason:

const dlqConsumer = new Kafka({ brokers }).consumer({ groupId: "dlq-handler" });
await dlqConsumer.subscribe({ topic: "orders.created.dlq" });

await dlqConsumer.run({
  eachMessage: async ({ message }) => {
    const m = decode(message);
    const errClass = m.headers["dlq-error-class"];
    const reason = m.headers["dlq-reason"];
    const failedAt = m.headers["dlq-failed-at"];
    const attempts = Number(m.headers["dlq-attempts"] ?? "0");

    if (errClass === "KafkaJSProtocolError" && reason?.includes("MESSAGE_TOO_LARGE")) {
      // Poison record — human ticket
      await ticket.create({
        title: `Oversized DLQ from ${m.headers["dlq-original-topic"]}`,
        body: reason,
      });
    } else if (errClass === "TopicAuthorizationException") {
      // Auth fix needed — page on-call
      await alert.page("kafka.auth_fail", { reason, originalTopic: m.headers["dlq-original-topic"] });
    } else {
      // Transient — back to a retry queue
      await retryQueue.put({
        payload: m.value,
        topic: m.headers["dlq-original-topic"]!,
        attemptsSoFar: attempts,
      });
    }
  },
});

Full DLQ guide → Dead-Letter Queue.


What's next

Clone this wiki locally