-
-
Notifications
You must be signed in to change notification settings - Fork 0
Type Safe Events
defineOutbox(registry) is the single declaration that gives you a typed, validated producer surface and a symmetric consumer surface — the same registry on both sides. Schema drift between producer and consumer becomes impossible by construction.
Built on Standard Schema (Zod 3.24+, Valibot, ArkType, Effect Schema, anything). eventferry has no validator dependency — you bring your own.
import { z } from "zod";
import { defineOutbox } from "@eventferry/core";
const registry = {
"orders.created": {
aggregateType: "order",
schema: z.object({
orderId: z.string(),
total: z.number().int().positive(),
currency: z.enum(["USD", "EUR", "TRY"]),
}),
},
"orders.shipped": {
aggregateType: "order",
schema: z.object({
orderId: z.string(),
carrier: z.string(),
trackingNumber: z.string().optional(),
}),
},
"users.signed_up": {
aggregateType: "user",
schema: z.object({
userId: z.string(),
email: z.string().email(),
}),
},
} as const;as const matters — it lets TypeScript narrow the topic literal union and infer per-topic payload types. Without it, the enqueue call below would accept any string topic.
Pass { store } to bind it to a database adapter; you get a typed enqueue:
const events = defineOutbox(registry, { store });
await events.enqueue(client, "orders.created", {
aggregateId: order.id,
payload: { orderId: order.id, total: 99, currency: "USD" },
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
// TypeScript checks this against the Zod schema.
// Wrong field name? Compile error. Wrong type? Compile error.
key: order.customerId, // optional
headers: { "x-tenant": tenantId }, // optional
traceId: getCurrentTraceId(), // optional
});The schema runs on enqueue — a runtime payload that doesn't match throws OutboxValidationError and your transaction rolls back. Bad events never reach the outbox table, let alone the broker.
Pass the same client you used for your business changes. events.enqueue calls store.enqueue(client, ...) under the hood — no separate transaction.
Call defineOutbox(registry) without a store to get the consumer companion:
const events = defineOutbox(registry);
// Validate an already-parsed value:
const event = await events.validate("orders.created", parsedJson);
// ^? { orderId: string; total: number; currency: "USD" | "EUR" | "TRY" }
// Or JSON-parse + validate in one step:
const event = await events.decode("orders.created", message.value);decode(topic, bytes):
- Accepts
Buffer | Uint8Array | string. -
JSON.parses. - Runs the topic's schema.
- Returns the inferred output type of the schema.
- Throws
OutboxValidationErroron unknown topic / parse failure / schema violation.
Pair with @eventferry/kafka/consume:
import { decode as kafkaDecode } from "@eventferry/kafka/consume";
await consumer.run({
eachMessage: async ({ message, topic }) => {
const m = kafkaDecode(message, { decoder: "utf8" });
const event = await events.decode(topic as keyof typeof registry, m.value!);
// ^? typed per topic
await handle(event);
},
});The conventional way to share a contract between producer + consumer is Confluent Schema Registry (Avro / Protobuf / JSON Schema) — works, but adds infrastructure and a wire format magic byte.
defineOutbox covers the same-monorepo case without the registry:
- One TypeScript file declares topics + schemas.
- Producer + consumer import the same constant.
- Refactor a payload field → renaming compiles at both ends.
- Rename a topic → compile error everywhere it's used.
When your consumer is in a different language (Go, Java, Python), reach for Schema Registry. When it's the same monorepo (or another TS service that can import a shared package), defineOutbox is the cheaper, lighter answer.
Standard Schema is a shared interface that Zod, Valibot, ArkType, and Effect Schema all implement. eventferry depends on the interface, not on any validator package — your bundle stays lean and you keep your existing validator choice.
// Valibot — same API surface
import * as v from "valibot";
const registry = {
"orders.created": {
aggregateType: "order",
schema: v.object({
orderId: v.string(),
total: v.pipe(v.number(), v.integer(), v.minValue(1)),
}),
},
} as const;
defineOutbox(registry, { store }); // works identicallyimport { OutboxValidationError } from "@eventferry/core";
try {
await events.enqueue(client, "orders.created", { aggregateId, payload });
} catch (err) {
if (err instanceof OutboxValidationError) {
log.warn("invalid event", { topic: err.topic, issues: err.issues });
throw err; // let your tx rollback
}
throw err;
}err.issues is the Standard Schema issues array — same format Zod gives you on parse failure. Each issue has message, optional path, optional code.
await events.enqueue(client, "orders.created", {
aggregateId: order.id,
payload: { orderId: order.id, total: order.total, currency: "USD" },
key: order.customerId, // overrides default (aggregateId)
headers: {
"x-tenant": tenantId,
"x-source": "web-api",
},
messageId: crypto.randomUUID(), // optional; otherwise generated
traceId: getCurrentTraceId(), // optional; flows to W3C traceparent
});headers map to Kafka message headers — visible on the consumer side via decode(message).headers.
- Schema Registry for cross-language consumers → Schema Registry
- Consumer-side recipes → Consuming Events
- Where this validates inside the relay flow → Core Concepts
Repository · Issues · npm: @eventferry/all · MIT
Get going
Adapters
Type & schema
Security
Operational
- Transactions and EOS
- Admin Operations
- Observability
- Consuming Events
- Dead-Letter Queue
- Reliability and Error Handling
Operations
Reference