-
-
Notifications
You must be signed in to change notification settings - Fork 0
Observability
OpenTelemetry tracing, lifecycle hooks, librdkafka stats, health checks. Everything you'd wire into your dashboards, alerts, and on-call runbooks.
eventferry depends on no observability package. You bring the SDK; we expose typed adapter points.
KafkaPublisher emits one span per publish() call following the current stable OpenTelemetry messaging semantic conventions.
import { trace, SpanKind, SpanStatusCode, context as otelContext, propagation } from "@opentelemetry/api";
import type { KafkaTracer, SpanLike } from "@eventferry/kafka";
const otel = trace.getTracer("@eventferry/kafka");
const tracer: KafkaTracer = {
startPublishSpan(name, attributes) {
const span = otel.startSpan(name, { kind: SpanKind.PRODUCER, attributes });
return {
setAttribute: (k, v) => span.setAttribute(k, v),
setAttributes: (a) => span.setAttributes(a),
setStatus: (s) =>
span.setStatus({
code: s.code === "ok" ? SpanStatusCode.OK : SpanStatusCode.ERROR,
message: s.message,
}),
recordException: (e) => span.recordException(e),
end: () => span.end(),
} satisfies SpanLike;
},
// Propagate W3C trace context to message headers so consumers can continue the trace:
inject(_span, headers) {
propagation.inject(otelContext.active(), headers);
},
};
new KafkaPublisher({ brokers, tracer });Span attributes (set by eventferry):
| Attribute | Value |
|---|---|
messaging.system |
"kafka" |
messaging.operation.type |
"publish" |
messaging.destination.name |
First topic in the batch |
messaging.batch.message_count |
Batch size (including 1) |
Span name: "{topic} publish". Multi-topic batches use the first topic — split your batches upstream if per-topic spans matter.
The optional KafkaTracer.inject(span, headers) hook is called once per message AFTER the batch span is created and BEFORE records hit the wire. It writes W3C traceparent / tracestate headers; the consumer extracts them with extractTraceContext:
import { extractTraceContext } from "@eventferry/kafka/consume";
await consumer.run({
eachMessage: async ({ message }) => {
const trace = extractTraceContext(message.headers);
if (trace) {
// Start a CONSUMER span as a child of trace.traceId / trace.spanId
const span = otel.startSpan("orders.created process", {
kind: SpanKind.CONSUMER,
links: [{ context: { traceId: trace.traceId, spanId: trace.spanId, traceFlags: trace.sampled ? 1 : 0 } }],
});
// ...
}
},
});The publisher clones each outbound message before injecting — your PublishableMessage reference is never mutated, so the relay's retry path stays correct.
Every operational signal worth alerting on, wrapped in try/catch so a misbehaving hook can never break publishing.
new KafkaPublisher({
brokers,
hooks: {
onConnect: () => log.info("publisher connected"),
onDisconnect: () => log.info("publisher disconnected"),
onPublish: (result, message) => {
metrics.counter("kafka.publish", 1, {
ok: result.ok,
topic: message.topic,
errorKind: result.errorKind ?? "ok",
});
},
onError: (err, message) => {
log.error("publish failed", {
err: err.message,
topic: message?.topic,
recordId: message?.recordId,
});
},
onTransactionAbort: (err) => {
log.warn("txn aborted", { err: err.message });
metrics.counter("kafka.txn_abort", 1);
},
onProducerFenced: (err) => {
log.warn("producer fenced", { err: err.message });
metrics.counter("kafka.fenced", 1);
},
},
});new Relay({
store,
publisher,
hooks: {
onBatchClaimed: (n) => metrics.gauge("outbox.claim_size", n),
onBatchPublished: (results) => {
const okCount = results.filter((r) => r.ok).length;
metrics.counter("outbox.publish_ok", okCount);
metrics.counter("outbox.publish_fail", results.length - okCount);
},
onFailed: (record, err, willRetry) => {
metrics.counter("outbox.failed", 1, { willRetry });
log.warn("relay record failed", {
recordId: record.id, willRetry, err: err.message,
});
},
onError: (err) => log.error("relay error", { err: err.message }),
},
});| Hook | When |
|---|---|
onBatchClaimed(n) |
After a claim returns N rows. Useful for queue-depth dashboards. |
onBatchPublished(results) |
After publisher returns. Per-record success/fail breakdown. |
onFailed(record, err, willRetry) |
Per-record failure. willRetry: false means DLQ / dead next. |
onError(err) |
Batch-level error not attributable to a single record (DB error, publisher crash). |
Pipe queue depth, broker latency, broker timeout counts, and per-topic/per-partition counters into your metrics stack:
new KafkaPublisher({
brokers,
driver: "confluent",
onStats: (stats) => {
promClient.gauge("kafka_msg_cnt").set(stats.msg_cnt as number);
promClient.gauge("kafka_txmsgs").set(stats.txmsgs as number);
// librdkafka stats are opaque (huge schema, evolves per version) —
// reach for the fields you care about.
},
statsIntervalMs: 30_000, // default when onStats is set
});| Knob | Default | Notes |
|---|---|---|
onStats |
unset | Receives parsed JSON, swallows exceptions + parse failures so misbehaving observers don't crash the producer loop. |
statsIntervalMs |
30_000 when onStats is set, OFF otherwise |
librdkafka CPU-bills the JSON serialization; we don't enable it silently. |
Reference: librdkafka STATISTICS.md.
No-op on the kafkajs driver — logs a one-time warning and ignores both options.
Cheap reachability probe usable as /healthz / /readyz:
app.get("/healthz", async (_req, res) => {
const status = await publisher.healthCheck({ timeoutMs: 3_000 });
res.status(status.ok ? 200 : 503).json({
ok: status.ok,
latencyMs: status.latencyMs,
error: status.error?.message,
});
});Returns { ok, latencyMs, timestamp, error? }. Default timeoutMs: 5_000; timeoutMs: 0 disables the timer.
What it proves: the broker is reachable AND the configured credentials still authenticate. What it does NOT prove: the producer's send path is fully operational. A fenced transactional producer would still answer healthy here. Use a separate "last successful publish" gauge for that signal.
The borrowed admin is always closed (try/finally); admin-side close errors are swallowed — health checks aren't the place to crash.
eventferry routes its own diagnostics (driver warnings, hook failures) through a structured logger when you provide one. Same interface across packages:
interface Logger {
debug(msg: string, ctx?: object): void;
info(msg: string, ctx?: object): void;
warn(msg: string, ctx?: object): void;
error(msg: string, ctx?: object): void;
}Pass it everywhere:
new KafkaPublisher({ brokers, logger: pinoAdapter });
new Relay({ store, publisher, logger: pinoAdapter });Without a logger, the kafkajs driver falls back to console.warn for its diagnostics (preserves prior behavior); the relay is silent.
- End-to-end trace propagation on the consumer → Consuming Events
-
Error classification (
errorKind) → Reliability and Error Handling - DLQ enrichment headers → Dead-Letter Queue
Repository · Issues · npm: @eventferry/all · MIT
Get going
Adapters
Type & schema
Security
Operational
- Transactions and EOS
- Admin Operations
- Observability
- Consuming Events
- Dead-Letter Queue
- Reliability and Error Handling
Operations
Reference