Skip to content

Schema Registry

SametGoktepe edited this page Jun 17, 2026 · 1 revision

Confluent Schema Registry serializer — encode outbox payloads as Avro / Protobuf / JSON Schema in the Confluent wire format (magic byte 0x00 + 4-byte schema id + payload), instead of plain JSON. Drop in as the relay's serializer option.

This is the right answer when your consumer stack is the JVM Confluent ecosystem (kafka-streams, kafka-connect, ksqlDB) or any client that expects the registry-encoded wire format. For pure-TS same-monorepo consumers, Type-Safe Events is lighter.


Install

npm i @eventferry/schema-registry @kafkajs/confluent-schema-registry

@kafkajs/confluent-schema-registry is the optional peer that actually talks HTTP to the registry.


Minimal setup

import { Relay } from "@eventferry/core";
import { SchemaRegistrySerializer } from "@eventferry/schema-registry";

const orderCreatedAvsc = JSON.stringify({
  type: "record",
  namespace: "com.example",
  name: "OrderCreated",
  fields: [
    { name: "orderId", type: "string" },
    { name: "total", type: "int" },
  ],
});

const serializer = new SchemaRegistrySerializer({
  host: "http://sr.internal:8081",
  schemas: {
    "orders.created": { type: "AVRO", schema: orderCreatedAvsc },
  },
});

new Relay({ store, publisher, serializer });

schemas maps topic → schema definition. eventferry registers each schema on first use (cached after that), then encodes every record for that topic with the resolved schema id.


Authentication

Two HTTP auth shapes — match Confluent Cloud + commercial registries:

// Basic — Confluent Cloud API key + secret
new SchemaRegistrySerializer({
  host,
  auth: {
    type: "basic",
    username: "<api-key>",
    password: "<api-secret>",
  },
});

// Bearer (OIDC, custom proxy, etc.)
new SchemaRegistrySerializer({
  host,
  auth: {
    type: "bearer",
    token: process.env.SR_BEARER_TOKEN,
  },
});

// Rotating bearer — callable resolved on every request
new SchemaRegistrySerializer({
  host,
  auth: {
    type: "bearer",
    token: async () => await getCachedAccessToken(),
  },
});

The callable form is called on every request — cache inside your provider if rotation cost matters. auth is ignored when you inject a pre-built registry client (configure auth there yourself).

mTLS to the registry lives separately — supply a custom https.Agent on a self-constructed client and pass it via the registry option. Registry TLS is independent of broker TLS, so we don't fold it into this surface.


Subject naming strategy

Confluent's three built-in strategies, exposed as a preset:

new SchemaRegistrySerializer({
  host,
  subjectStrategy: "TopicNameStrategy",                      // default
  // or:
  // subjectStrategy: "RecordNameStrategy",
  // subjectStrategy: "TopicRecordNameStrategy",
});
Strategy Subject pattern When
TopicNameStrategy (default) ${topic}-value / ${topic}-key One schema per (topic, isKey) tuple. Most setups.
RecordNameStrategy ${recordName} Same record type reused across topics.
TopicRecordNameStrategy ${topic}-${recordName} Multiple record types per topic.

The two record-based strategies need a recordName resolver — read the namespace + name from your avsc:

new SchemaRegistrySerializer({
  host,
  subjectStrategy: "RecordNameStrategy",
  recordName: (record) => `com.example.${record.aggregateType}.Created`,
});

Full custom

Skip the preset and pass an explicit subject function:

new SchemaRegistrySerializer({
  host,
  subject: (topic, isKey, record) =>
    `acme.${record.aggregateType}.${topic}.${isKey ? "key" : "value"}`,
});

The single-arg legacy form (topic) => string still works for back-compat.


Avro key serialization

By default the relay treats record.key as a plain string and Kafka sends it raw. For Avro-encoded keys, configure keySchemas and call serializeKey from your publish glue:

const serializer = new SchemaRegistrySerializer({
  host,
  schemas:    { "orders.created": { type: "AVRO", schema: valueAvsc } },
  keySchemas: { "orders.created": { type: "AVRO", schema: keyAvsc } },
});

// In your custom publish path:
const encodedValue = await serializer.serialize(record);
const encodedKey   = await serializer.serializeKey(record); // null when record.key is null

The relay does not call serializeKey automatically — Avro keys are an application-level convention, and silently changing them would break consumers expecting UTF-8 string keys. Wire it in your publish path explicitly.

Key and value subject ids cache independently — registering one doesn't affect the other.


autoRegister: false — production registries

For Confluent Cloud production or regulated environments where schemas are managed out-of-band (Terraform, CI/CD, kafka-tooling):

new SchemaRegistrySerializer({
  host,
  schemas: { /* ignored when autoRegister is false */ },
  autoRegister: false,
});

With autoRegister: false, the serializer always resolves by getLatestSchemaId(subject) — locally supplied schemas / keySchemas bytes become docs-only. Matches Confluent client's auto.register.schemas=false.

The schema must exist at the registry already; the serializer will throw Subject 'X' not found on first publish otherwise. Common deploy pattern:

  1. CI step registers the new schema with a guarded register call (only on new versions).
  2. Application deploys with autoRegister: false.
  3. Schema evolution checks run registry-side, not application-side.

Wire format consumers

Decode on the consumer with the same registry client:

import { SchemaRegistry } from "@kafkajs/confluent-schema-registry";

const registry = new SchemaRegistry({ host });

await consumer.run({
  eachMessage: async ({ message }) => {
    const decoded = await registry.decode(message.value!);
    //    ^ typed by your decode-side typing (avsc-to-TS or hand-typed)
    await handle(decoded);
  },
});

JVM consumers use the Confluent KafkaAvroDeserializer and read the schema id from the magic byte — same wire format, different client.


When NOT to use Schema Registry

  • Pure JSON, same monorepodefineOutbox(registry).decode() is lighter and gives you typed payloads without registry infrastructure. See Type-Safe Events.
  • Apicurio Registry — different REST API; needs a separate adapter (@eventferry/schema-registry-apicurio is on the roadmap if there's demand).
  • AWS Glue Schema Registry — different wire format; needs @eventferry/schema-registry-aws (also roadmap, demand-gated).

What's next

Clone this wiki locally