-
-
Notifications
You must be signed in to change notification settings - Fork 0
Schema Registry
Confluent Schema Registry serializer — encode outbox payloads as Avro / Protobuf / JSON Schema in the Confluent wire format (magic byte 0x00 + 4-byte schema id + payload), instead of plain JSON. Drop in as the relay's serializer option.
This is the right answer when your consumer stack is the JVM Confluent ecosystem (kafka-streams, kafka-connect, ksqlDB) or any client that expects the registry-encoded wire format. For pure-TS same-monorepo consumers, Type-Safe Events is lighter.
npm i @eventferry/schema-registry @kafkajs/confluent-schema-registry@kafkajs/confluent-schema-registry is the optional peer that actually talks HTTP to the registry.
import { Relay } from "@eventferry/core";
import { SchemaRegistrySerializer } from "@eventferry/schema-registry";
const orderCreatedAvsc = JSON.stringify({
type: "record",
namespace: "com.example",
name: "OrderCreated",
fields: [
{ name: "orderId", type: "string" },
{ name: "total", type: "int" },
],
});
const serializer = new SchemaRegistrySerializer({
host: "http://sr.internal:8081",
schemas: {
"orders.created": { type: "AVRO", schema: orderCreatedAvsc },
},
});
new Relay({ store, publisher, serializer });schemas maps topic → schema definition. eventferry registers each schema on first use (cached after that), then encodes every record for that topic with the resolved schema id.
Two HTTP auth shapes — match Confluent Cloud + commercial registries:
// Basic — Confluent Cloud API key + secret
new SchemaRegistrySerializer({
host,
auth: {
type: "basic",
username: "<api-key>",
password: "<api-secret>",
},
});
// Bearer (OIDC, custom proxy, etc.)
new SchemaRegistrySerializer({
host,
auth: {
type: "bearer",
token: process.env.SR_BEARER_TOKEN,
},
});
// Rotating bearer — callable resolved on every request
new SchemaRegistrySerializer({
host,
auth: {
type: "bearer",
token: async () => await getCachedAccessToken(),
},
});The callable form is called on every request — cache inside your provider if rotation cost matters. auth is ignored when you inject a pre-built registry client (configure auth there yourself).
mTLS to the registry lives separately — supply a custom https.Agent on a self-constructed client and pass it via the registry option. Registry TLS is independent of broker TLS, so we don't fold it into this surface.
Confluent's three built-in strategies, exposed as a preset:
new SchemaRegistrySerializer({
host,
subjectStrategy: "TopicNameStrategy", // default
// or:
// subjectStrategy: "RecordNameStrategy",
// subjectStrategy: "TopicRecordNameStrategy",
});| Strategy | Subject pattern | When |
|---|---|---|
TopicNameStrategy (default) |
${topic}-value / ${topic}-key
|
One schema per (topic, isKey) tuple. Most setups. |
RecordNameStrategy |
${recordName} |
Same record type reused across topics. |
TopicRecordNameStrategy |
${topic}-${recordName} |
Multiple record types per topic. |
The two record-based strategies need a recordName resolver — read the namespace + name from your avsc:
new SchemaRegistrySerializer({
host,
subjectStrategy: "RecordNameStrategy",
recordName: (record) => `com.example.${record.aggregateType}.Created`,
});Skip the preset and pass an explicit subject function:
new SchemaRegistrySerializer({
host,
subject: (topic, isKey, record) =>
`acme.${record.aggregateType}.${topic}.${isKey ? "key" : "value"}`,
});The single-arg legacy form (topic) => string still works for back-compat.
By default the relay treats record.key as a plain string and Kafka sends it raw. For Avro-encoded keys, configure keySchemas and call serializeKey from your publish glue:
const serializer = new SchemaRegistrySerializer({
host,
schemas: { "orders.created": { type: "AVRO", schema: valueAvsc } },
keySchemas: { "orders.created": { type: "AVRO", schema: keyAvsc } },
});
// In your custom publish path:
const encodedValue = await serializer.serialize(record);
const encodedKey = await serializer.serializeKey(record); // null when record.key is nullThe relay does not call serializeKey automatically — Avro keys are an application-level convention, and silently changing them would break consumers expecting UTF-8 string keys. Wire it in your publish path explicitly.
Key and value subject ids cache independently — registering one doesn't affect the other.
For Confluent Cloud production or regulated environments where schemas are managed out-of-band (Terraform, CI/CD, kafka-tooling):
new SchemaRegistrySerializer({
host,
schemas: { /* ignored when autoRegister is false */ },
autoRegister: false,
});With autoRegister: false, the serializer always resolves by getLatestSchemaId(subject) — locally supplied schemas / keySchemas bytes become docs-only. Matches Confluent client's auto.register.schemas=false.
The schema must exist at the registry already; the serializer will throw Subject 'X' not found on first publish otherwise. Common deploy pattern:
- CI step registers the new schema with a guarded
registercall (only on new versions). - Application deploys with
autoRegister: false. - Schema evolution checks run registry-side, not application-side.
Decode on the consumer with the same registry client:
import { SchemaRegistry } from "@kafkajs/confluent-schema-registry";
const registry = new SchemaRegistry({ host });
await consumer.run({
eachMessage: async ({ message }) => {
const decoded = await registry.decode(message.value!);
// ^ typed by your decode-side typing (avsc-to-TS or hand-typed)
await handle(decoded);
},
});JVM consumers use the Confluent KafkaAvroDeserializer and read the schema id from the magic byte — same wire format, different client.
-
Pure JSON, same monorepo —
defineOutbox(registry).decode()is lighter and gives you typed payloads without registry infrastructure. See Type-Safe Events. -
Apicurio Registry — different REST API; needs a separate adapter (
@eventferry/schema-registry-apicuriois on the roadmap if there's demand). -
AWS Glue Schema Registry — different wire format; needs
@eventferry/schema-registry-aws(also roadmap, demand-gated).
- The lighter alternative for same-monorepo consumers → Type-Safe Events
-
Plug the serializer into the relay → Postgres Adapter (the
serializeroption flows intonew Relay) - Consumer-side decode patterns → Consuming Events
Repository · Issues · npm: @eventferry/all · MIT
Get going
Adapters
Type & schema
Security
Operational
- Transactions and EOS
- Admin Operations
- Observability
- Consuming Events
- Dead-Letter Queue
- Reliability and Error Handling
Operations
Reference