-
-
Notifications
You must be signed in to change notification settings - Fork 0
Dead Letter Queue
Records that can't be published end up on a dead-letter queue topic with enriched headers describing why. This page covers what triggers DLQ routing, what the headers carry, and the canonical replay / triage recipes.
The relay routes to DLQ in three cases:
-
errorKind: "fatal"— authentication denied, ACL refused. Retrying cannot help. -
errorKind: "poison"— record-level rejection by every broker (MESSAGE_TOO_LARGE,CORRUPT_MESSAGE, schema-registry refused encoding). -
attempts > retry.maxAttempts— exhausted the retry budget.
For backpressure / quota / retriable errors, the relay retries per the configured backoff first — DLQ only after the budget is gone.
Once a record is DLQ'd, the row's status flips to dead. The reaper does not reclaim dead rows. They stay in the outbox table until purgeDone (or a manual cleanup) removes them.
Every DLQ record carries:
| Header | Value | Use |
|---|---|---|
dlq-reason |
error.message |
Human-readable. Do NOT route on this. |
dlq-error-class |
KafkaJSProtocolError, RecordTooLargeException, TopicAuthorizationException, … |
Route here. Stable across kafkajs versions. |
dlq-original-topic |
The topic the record was originally destined for | For replay targeting. |
dlq-failed-at |
ISO 8601 timestamp | Incident correlation. |
dlq-attempts |
How many tries the relay made | Replay-queue accounting. |
dlq-stack (optional) |
Truncated UTF-8 stack | Opt-in via dlq.includeStackTraces. |
The original message headers, key, and value are preserved unchanged — a replay reconstructs the original PublishableMessage directly from the DLQ message.
new Relay({
store,
publisher,
dlq: {
topicSuffix: ".dlq", // default — DLQ topic = ${topic}${suffix}
includeStackTraces: true, // default false
maxStackBytes: 8 * 1024, // default 8KB; bytes counted in UTF-8
},
});| Knob | Default | Notes |
|---|---|---|
topicSuffix |
".dlq" |
DLQ topic is ${originalTopic}${suffix}. Set to "" and configure a custom topic function for non-suffix DLQ patterns. |
topic |
(t) => ${t}.dlq`` |
Function form for full control. e.g. (t) => "deadletter" for a single shared DLQ. |
includeStackTraces |
false |
Opt in — stacks can carry PII or be large. |
maxStackBytes |
8192 |
Truncated at a UTF-8 char boundary so the header stays valid. |
Route by dlq-error-class, never parse dlq-reason text:
import { Kafka } from "kafkajs";
import { decode } from "@eventferry/kafka/consume";
const dlqConsumer = new Kafka({ brokers }).consumer({ groupId: "dlq-handler" });
await dlqConsumer.subscribe({ topic: "orders.created.dlq" });
await dlqConsumer.run({
eachMessage: async ({ message }) => {
const m = decode(message);
const errClass = m.headers["dlq-error-class"];
const reason = m.headers["dlq-reason"];
const originalTopic = m.headers["dlq-original-topic"];
const attempts = Number(m.headers["dlq-attempts"] ?? "0");
switch (errClass) {
case "RecordTooLargeException":
case "KafkaJSProtocolError": // MESSAGE_TOO_LARGE et al
await ticket.create({
title: `Oversized DLQ from ${originalTopic}`,
body: `${reason}\n\nAttempts: ${attempts}`,
});
break;
case "TopicAuthorizationException":
case "ClusterAuthorizationException":
await alert.page("kafka.auth_fail", { originalTopic, reason });
break;
case "OutboxValidationError":
// Producer-side schema mismatch — the relay caught it before publish.
// The payload is broken; quarantine for manual investigation.
await quarantine.put(m);
break;
default:
// Transient — retry queue with capped attempts
if (attempts < 20) {
await retryQueue.put({
payload: m.value,
topic: originalTopic!,
attemptsSoFar: attempts,
scheduleAt: Date.now() + Math.min(60_000, 1000 * Math.pow(2, attempts)),
});
} else {
await ticket.create({
title: `Exhausted retries on ${originalTopic}`,
body: reason,
});
}
}
},
});A retry queue handler that re-enqueues to the original topic:
import { KafkaPublisher } from "@eventferry/kafka";
const publisher = new KafkaPublisher({ brokers });
await publisher.connect();
async function replayDlqMessage(dlqMsg: ConsumedMessage) {
const m = decode(dlqMsg);
const originalTopic = m.headers["dlq-original-topic"];
if (!originalTopic) throw new Error("missing dlq-original-topic header");
// Strip dlq-* headers before replaying so we don't pollute the original topic
const cleanHeaders = Object.fromEntries(
Object.entries(m.headers).filter(([k]) => !k.startsWith("dlq-")),
);
await publisher.publish([{
topic: originalTopic,
key: m.key,
value: Buffer.from(m.value as string), // adjust per your decoder
headers: cleanHeaders,
recordId: crypto.randomUUID(),
messageId: cleanHeaders["x-message-id"] ?? crypto.randomUUID(),
}]);
}Replay is separate from the outbox — it bypasses store.enqueue. That's intentional: a replay is an operator decision, not a domain event. The replayed record gets a fresh messageId and no outbox row.
For high-cardinality topic spaces (hundreds of topics), per-topic DLQs are operational noise. Use a single shared DLQ and route off dlq-original-topic:
new Relay({
store,
publisher,
dlq: {
topic: () => "deadletter", // every failure goes here
},
});Your DLQ consumer reads from one topic and fans out on dlq-original-topic. Trade-off: a single DLQ is a single failure domain — a poison record can backpressure unrelated retries. Per-topic DLQs are the default for a reason.
In a healthy system the DLQ is always empty outside incident windows. If you see steady DLQ flow with dlq-error-class: KafkaJSConnectionError, your retry budget is too small or your broker is unstable — fix at the source, don't accept DLQ traffic as normal.
The onFailed(record, err, willRetry) hook with willRetry: false is your real-time DLQ-rate signal:
new Relay({
store,
publisher,
hooks: {
onFailed: (record, err, willRetry) => {
if (!willRetry) {
metrics.counter("outbox.dlq_routed", 1, { topic: record.topic });
}
},
},
});Alert on the rate of change, not absolute count — a single DLQ event is usually noise; sustained flow is an incident.
- The full retry / classification picture → Reliability and Error Handling
- Decode patterns for the DLQ consumer → Consuming Events
- Operational hooks + alerting → Observability
Repository · Issues · npm: @eventferry/all · MIT
Get going
Adapters
Type & schema
Security
Operational
- Transactions and EOS
- Admin Operations
- Observability
- Consuming Events
- Dead-Letter Queue
- Reliability and Error Handling
Operations
Reference