-
-
Notifications
You must be signed in to change notification settings - Fork 0
Consuming Events
eventferry is publisher-only — your consumer is whatever Kafka client you already use. This page covers the two helpers that make the consumer side ergonomic and the canonical patterns for typed payloads, trace continuation, and DLQ routing.
Imported via the @eventferry/kafka/consume subpath — producer-free, so your consumer doesn't pull in kafkajs/confluent producer code through the main entry.
| Helper | What |
|---|---|
decode(message, { decoder }) |
Normalize the raw message shape (key/value/headers/timestamp/offset/partition) both kafkajs and confluent deliver. Built-in decoders: json (default), utf8, none, or a custom (bytes) => V. |
extractTraceContext(headers) |
Strict W3C traceparent / tracestate parse — accepts both Buffer and string header shapes. Returns null on absent / invalid headers. |
import { Kafka } from "kafkajs";
import { decode, extractTraceContext } from "@eventferry/kafka/consume";
import { defineOutbox } from "@eventferry/core";
import { registry } from "./outbox-registry";
const events = defineOutbox(registry); // no store — consumer side
const consumer = new Kafka({ brokers }).consumer({ groupId: "orders-worker" });
await consumer.connect();
await consumer.subscribe({ topic: "orders.created", fromBeginning: false });
await consumer.run({
eachMessage: async ({ message, topic }) => {
// 1. Normalize the raw kafkajs/confluent shape.
const m = decode(message, { decoder: "utf8" });
// 2. Continue the W3C trace context (optional but recommended).
const trace = extractTraceContext(message.headers);
if (trace) startConsumerSpan(trace.traceId, trace.spanId);
// 3. Typed + validated payload via the SAME registry as the producer.
const event = await events.decode(topic as keyof typeof registry, m.value!);
// ^? typed per topic
await handle(event);
},
});interface DecodedMessage<V> {
key: string | null;
value: V | null; // null when value is a tombstone (compacted topic)
headers: Record<string, string>;
timestamp?: number; // epoch ms
offset?: string; // stringified — Kafka offsets exceed 2^53
partition?: number;
}Empty / null value comes back as value: null for every decoder — tombstones are first-class on compacted topics. Test before passing to your handler.
| Decoder | Use |
|---|---|
"json" (default) |
JSON.parse(value.toString("utf8")). Throws labelled JSON.parse failed on malformed payloads. |
"utf8" |
Raw text. Pair with defineOutbox.decode() for typed payload. |
"none" |
Raw Buffer. Pair with binary deserializers (Avro / Protobuf / MessagePack). |
(bytes: Buffer) => V |
Custom. e.g. (b) => avroSchema.fromBuffer(b). |
When your consumer lives in the same monorepo as the producer, hand the decoded bytes to the same defineOutbox(registry) the producer used. decode validates against the topic's Standard Schema and returns the typed payload:
import { defineOutbox } from "@eventferry/core";
import { decode } from "@eventferry/kafka/consume";
import { registry } from "./outbox-registry";
const events = defineOutbox(registry);
await consumer.run({
eachMessage: async ({ message, topic }) => {
const m = decode(message, { decoder: "utf8" });
const event = await events.decode(topic as keyof typeof registry, m.value!);
// ^? { orderId: string; total: number }
await handle(event);
},
});events.decode(topic, bytes) throws OutboxValidationError if the topic isn't in the registry or the payload doesn't match the schema. Same registry, both sides — schema drift is impossible by construction.
For cross-language consumers (Go, Java, Python), skip this companion and use Confluent Schema Registry → Schema Registry.
The publisher's tracer.inject writes traceparent / tracestate headers. On the consumer, parse them with extractTraceContext and start a CONSUMER span as a child of the producer span:
import { trace, SpanKind, propagation } from "@opentelemetry/api";
import { decode, extractTraceContext } from "@eventferry/kafka/consume";
const otel = trace.getTracer("orders-worker");
await consumer.run({
eachMessage: async ({ message, topic }) => {
const traceCtx = extractTraceContext(message.headers);
const headers = traceCtx ? { traceparent: traceCtx.traceparent } : {};
const parentCtx = propagation.extract(otelContext.active(), headers);
await otelContext.with(parentCtx, async () => {
const span = otel.startSpan(`${topic} process`, { kind: SpanKind.CONSUMER });
try {
const m = decode(message);
await handle(m.value);
span.setStatus({ code: SpanStatusCode.OK });
} catch (err) {
span.recordException(err as Error);
span.setStatus({ code: SpanStatusCode.ERROR });
throw err;
} finally {
span.end();
}
});
},
});extractTraceContext strictly validates per the W3C spec — rejects all-zero IDs, version: ff, malformed hex. Returns null on any of those, so your consumer falls back to starting a fresh trace cleanly.
If your producer uses transactional: true, configure consumers with isolation.level=read_committed so they see all-or-nothing batches:
const consumer = new Kafka({ brokers }).consumer({
groupId: "orders-worker",
readUncommitted: false, // kafkajs equivalent — default true
});
// confluent driver:
const consumer = new Kafka({ brokers, /* ... */ }).consumer({
groupId: "orders-worker",
// librdkafka config:
// "isolation.level": "read_committed" // via rawConsumerConfig
});Without read_committed, consumers see in-flight transactional records that might still abort. Half-published batches become visible — defeats EOS on the consumer side.
DLQ records carry enriched headers — route by dlq-error-class, not by parsing dlq-reason:
const dlqConsumer = new Kafka({ brokers }).consumer({ groupId: "dlq-handler" });
await dlqConsumer.subscribe({ topic: "orders.created.dlq" });
await dlqConsumer.run({
eachMessage: async ({ message }) => {
const m = decode(message);
const errClass = m.headers["dlq-error-class"];
const reason = m.headers["dlq-reason"];
const failedAt = m.headers["dlq-failed-at"];
const attempts = Number(m.headers["dlq-attempts"] ?? "0");
if (errClass === "KafkaJSProtocolError" && reason?.includes("MESSAGE_TOO_LARGE")) {
// Poison record — human ticket
await ticket.create({
title: `Oversized DLQ from ${m.headers["dlq-original-topic"]}`,
body: reason,
});
} else if (errClass === "TopicAuthorizationException") {
// Auth fix needed — page on-call
await alert.page("kafka.auth_fail", { reason, originalTopic: m.headers["dlq-original-topic"] });
} else {
// Transient — back to a retry queue
await retryQueue.put({
payload: m.value,
topic: m.headers["dlq-original-topic"]!,
attemptsSoFar: attempts,
});
}
},
});Full DLQ guide → Dead-Letter Queue.
- DLQ routing details + replay patterns → Dead-Letter Queue
- OTel span setup → Observability
- Typed registry on the producer side → Type-Safe Events
Repository · Issues · npm: @eventferry/all · MIT
Get going
Adapters
Type & schema
Security
Operational
- Transactions and EOS
- Admin Operations
- Observability
- Consuming Events
- Dead-Letter Queue
- Reliability and Error Handling
Operations
Reference