From b628238f541810705c35f42900fa6cd1984a4a7b Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 10 Sep 2025 14:55:59 -0700 Subject: [PATCH 1/3] Add better Avro validation --- schemaregistry/serde/avro.ts | 9 +++++++- schemaregistry/test/serde/avro.spec.ts | 29 ++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/schemaregistry/serde/avro.ts b/schemaregistry/serde/avro.ts index 984d8bfb..43ebb946 100644 --- a/schemaregistry/serde/avro.ts +++ b/schemaregistry/serde/avro.ts @@ -3,7 +3,7 @@ import { FieldTransform, FieldType, Migration, RefResolver, RuleConditionError, - RuleContext, SchemaId, SerdeType, + RuleContext, SchemaId, SerdeType, SerializationError, Serializer, SerializerConfig } from "./serde"; import { @@ -93,6 +93,13 @@ export class AvroSerializer extends Serializer implements AvroSerde { const subject = this.subjectName(topic, info) msg = await this.executeRules( subject, topic, RuleMode.WRITE, null, info, msg, getInlineTags(info, deps)) + if (!avroType.isValid(msg, {errorHook: (path, any, type) => { + throw new SerializationError( + `Invalid message at ${path.join('.')}, expected ${type}, got ${stringify(any)}`) + }})) { + throw new SerializationError( + `Invalid message, expected ${avroType}, got ${stringify(msg)}`) + } let msgBytes = avroType.toBuffer(msg) msgBytes = await this.executeRulesWithPhase( subject, topic, RulePhase.ENCODING, RuleMode.WRITE, null, info, msgBytes, null) diff --git a/schemaregistry/test/serde/avro.spec.ts b/schemaregistry/test/serde/avro.spec.ts index 7580939e..a48e21c1 100644 --- a/schemaregistry/test/serde/avro.spec.ts +++ b/schemaregistry/test/serde/avro.spec.ts @@ -75,6 +75,18 @@ const demoSchema = ` ] } ` +const nameSchema = ` +{ + "type": "record", + "namespace": "examples", + "name": "NameSchema", + "fields": [ + { "name": "fullName", "type": "string" }, + { "name": "lastName", "type": "string" } + ], + "version": "1" +} +` const demoSchemaSingleTag = ` { "name": "DemoSchema", @@ -369,6 +381,23 @@ describe('AvroSerializer', () => { expect(obj2.boolField).toEqual(obj.boolField); expect(obj2.bytesField).toEqual(obj.bytesField); }) + it('bad serialization', async () => { + const conf: ClientConfig = { + baseURLs: [baseURL] + }; + const client = SchemaRegistryClient.newClient(conf); + const ser = new AvroSerializer(client, SerdeType.VALUE, { useLatestVersion: true }); + const info = { + schemaType: 'AVRO', + schema: nameSchema + }; + await client.register(subject, info, false); + try { + await ser.serialize(topic, { lastName: "lastName" }); + } catch (err) { + expect(err).toBeInstanceOf(SerializationError) + } + }) it('guid in header', async () => { let conf: ClientConfig = { baseURLs: [baseURL], From f299bc6b33821b4a9a1eff14b54d9dbc13ca11db Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 10 Sep 2025 14:59:20 -0700 Subject: [PATCH 2/3] Minor cleanup --- schemaregistry/serde/avro.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/schemaregistry/serde/avro.ts b/schemaregistry/serde/avro.ts index 43ebb946..54bc262f 100644 --- a/schemaregistry/serde/avro.ts +++ b/schemaregistry/serde/avro.ts @@ -93,13 +93,10 @@ export class AvroSerializer extends Serializer implements AvroSerde { const subject = this.subjectName(topic, info) msg = await this.executeRules( subject, topic, RuleMode.WRITE, null, info, msg, getInlineTags(info, deps)) - if (!avroType.isValid(msg, {errorHook: (path, any, type) => { + avroType.isValid(msg, {errorHook: (path, any, type) => { throw new SerializationError( `Invalid message at ${path.join('.')}, expected ${type}, got ${stringify(any)}`) - }})) { - throw new SerializationError( - `Invalid message, expected ${avroType}, got ${stringify(msg)}`) - } + }}) let msgBytes = avroType.toBuffer(msg) msgBytes = await this.executeRulesWithPhase( subject, topic, RulePhase.ENCODING, RuleMode.WRITE, null, info, msgBytes, null) From 39da52dbb762678a14b3b36428ba2f5adb278cea Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 10 Sep 2025 15:21:15 -0700 Subject: [PATCH 3/3] Minor cleanup --- schemaregistry/test/serde/avro.spec.ts | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/schemaregistry/test/serde/avro.spec.ts b/schemaregistry/test/serde/avro.spec.ts index a48e21c1..7d450af3 100644 --- a/schemaregistry/test/serde/avro.spec.ts +++ b/schemaregistry/test/serde/avro.spec.ts @@ -382,18 +382,19 @@ describe('AvroSerializer', () => { expect(obj2.bytesField).toEqual(obj.bytesField); }) it('bad serialization', async () => { - const conf: ClientConfig = { - baseURLs: [baseURL] - }; - const client = SchemaRegistryClient.newClient(conf); - const ser = new AvroSerializer(client, SerdeType.VALUE, { useLatestVersion: true }); - const info = { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let ser = new AvroSerializer(client, SerdeType.VALUE, {useLatestVersion: true}) + let info = { schemaType: 'AVRO', schema: nameSchema - }; - await client.register(subject, info, false); + } + await client.register(subject, info, false) try { - await ser.serialize(topic, { lastName: "lastName" }); + await ser.serialize(topic, { lastName: "lastName" }) } catch (err) { expect(err).toBeInstanceOf(SerializationError) }