diff --git a/schemaregistry/serde/avro.ts b/schemaregistry/serde/avro.ts index 984d8bfb..54bc262f 100644 --- a/schemaregistry/serde/avro.ts +++ b/schemaregistry/serde/avro.ts @@ -3,7 +3,7 @@ import { FieldTransform, FieldType, Migration, RefResolver, RuleConditionError, - RuleContext, SchemaId, SerdeType, + RuleContext, SchemaId, SerdeType, SerializationError, Serializer, SerializerConfig } from "./serde"; import { @@ -93,6 +93,10 @@ export class AvroSerializer extends Serializer implements AvroSerde { const subject = this.subjectName(topic, info) msg = await this.executeRules( subject, topic, RuleMode.WRITE, null, info, msg, getInlineTags(info, deps)) + avroType.isValid(msg, {errorHook: (path, any, type) => { + throw new SerializationError( + `Invalid message at ${path.join('.')}, expected ${type}, got ${stringify(any)}`) + }}) let msgBytes = avroType.toBuffer(msg) msgBytes = await this.executeRulesWithPhase( subject, topic, RulePhase.ENCODING, RuleMode.WRITE, null, info, msgBytes, null) diff --git a/schemaregistry/test/serde/avro.spec.ts b/schemaregistry/test/serde/avro.spec.ts index 7580939e..7d450af3 100644 --- a/schemaregistry/test/serde/avro.spec.ts +++ b/schemaregistry/test/serde/avro.spec.ts @@ -75,6 +75,18 @@ const demoSchema = ` ] } ` +const nameSchema = ` +{ + "type": "record", + "namespace": "examples", + "name": "NameSchema", + "fields": [ + { "name": "fullName", "type": "string" }, + { "name": "lastName", "type": "string" } + ], + "version": "1" +} +` const demoSchemaSingleTag = ` { "name": "DemoSchema", @@ -369,6 +381,24 @@ describe('AvroSerializer', () => { expect(obj2.boolField).toEqual(obj.boolField); expect(obj2.bytesField).toEqual(obj.bytesField); }) + it('bad serialization', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let ser = new AvroSerializer(client, SerdeType.VALUE, {useLatestVersion: true}) + let info = { + schemaType: 'AVRO', + schema: nameSchema + } + await client.register(subject, info, false) + try { + await ser.serialize(topic, { lastName: "lastName" }) + } catch (err) { + expect(err).toBeInstanceOf(SerializationError) + } + }) it('guid in header', async () => { let conf: ClientConfig = { baseURLs: [baseURL],