From 6da245c6066758f5c5268a3ddfe4ffcbd06ae47c Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Tue, 30 Sep 2025 12:00:31 -0700 Subject: [PATCH 1/2] DGS-22343 Fix transformation of union of JSON refs --- schemaregistry/serde/json.ts | 17 ++-- schemaregistry/test/serde/json.spec.ts | 117 +++++++++++++++++++++++++ 2 files changed, 128 insertions(+), 6 deletions(-) diff --git a/schemaregistry/serde/json.ts b/schemaregistry/serde/json.ts index 93a1c0d9..3a39c90a 100644 --- a/schemaregistry/serde/json.ts +++ b/schemaregistry/serde/json.ts @@ -404,13 +404,15 @@ async function transformField(ctx: RuleContext, path: string, propName: string, try { ctx.enterField(msg, fullName, propName, getType(propSchema), getInlineTags(propSchema)) let value = msg[propName] - const newVal = await transform(ctx, propSchema, fullName, value, fieldTransform) - if (ctx.rule.kind === 'CONDITION') { - if (newVal === false) { - throw new RuleConditionError(ctx.rule) + if (value !== null) { + const newVal = await transform(ctx, propSchema, fullName, value, fieldTransform) + if (ctx.rule.kind === 'CONDITION') { + if (newVal === false) { + throw new RuleConditionError(ctx.rule) + } + } else { + msg[propName] = newVal } - } else { - msg[propName] = newVal } } finally { ctx.leaveField() @@ -453,6 +455,9 @@ function getType(schema: DereferencedJSONSchema): FieldType { return FieldType.NULL } if (schema.type == null) { + if (schema.properties != null && Object.keys(schema.properties).length > 0) { + return FieldType.RECORD + } return FieldType.NULL } if (Array.isArray(schema.type)) { diff --git a/schemaregistry/test/serde/json.spec.ts b/schemaregistry/test/serde/json.spec.ts index 0af7fff0..0da15cb0 100644 --- a/schemaregistry/test/serde/json.spec.ts +++ b/schemaregistry/test/serde/json.spec.ts @@ -206,6 +206,75 @@ const defSchema = ` "type" : "object" } ` +const messageSchema = ` + { + + "type": "object", + "properties": { + "messageType": { + "type": "string" + }, + "version": { + "type": "string" + }, + "payload": { + "type": "object", + "oneOf": [ + { + "$ref": "#/$defs/authentication_request" + }, + { + "$ref": "#/$defs/authentication_status" + } + ] + } + }, + "required": [ + "payload", + "messageType", + "version" + ], + "$defs": { + "authentication_request": { + "properties": { + "messageId": { + "type": "string", + "confluent:tags": ["PII"] + }, + "timestamp": { + "type": "integer", + "minimum": 0 + }, + "requestId": { + "type": "string" + } + }, + "required": [ + "messageId", + "timestamp" + ] + }, + "authentication_status": { + "properties": { + "messageId": { + "type": "string", + "confluent:tags": ["PII"] + }, + "authType": { + "type": [ + "string", + "null" + ] + } + }, + "required": [ + "messageId", + "authType" + ] + } + } + } +` describe('JsonSerializer', () => { afterEach(async () => { @@ -543,6 +612,54 @@ describe('JsonSerializer', () => { expect(obj2.boolField).toEqual(obj.boolField); expect(obj2.bytesField).toEqual(obj.bytesField); }) + it('cel field transform with union of refs', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: JsonSerializerConfig = { + useLatestVersion: true, + } + let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig) + + let encRule: Rule = { + name: 'test-cel', + kind: 'TRANSFORM', + mode: RuleMode.WRITE, + type: 'CEL_FIELD', + expr: "name == 'messageId' ; value + '-suffix'" + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'JSON', + schema: messageSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + messageType: 'authentication_request', + version: '1.0', + payload: { + messageId: '12345', + timestamp: 123456789 + } + } + let bytes = await ser.serialize(topic, obj) + + let deserConfig: JsonDeserializerConfig = {} + let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig) + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.messageType).toEqual(obj.messageType); + expect(obj2.version).toEqual(obj.version); + expect(obj2.payload.messageId).toEqual('12345-suffix'); + expect(obj2.payload.timestamp).toEqual(obj.payload.timestamp); + }) it('basic encryption', async () => { let conf: ClientConfig = { baseURLs: [baseURL], From 190fb532bd8dc487cd530489a2e4d9585b0ff93e Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Tue, 30 Sep 2025 12:10:43 -0700 Subject: [PATCH 2/2] Minor cleanup --- schemaregistry/serde/json.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/schemaregistry/serde/json.ts b/schemaregistry/serde/json.ts index 3a39c90a..c361cc9a 100644 --- a/schemaregistry/serde/json.ts +++ b/schemaregistry/serde/json.ts @@ -404,7 +404,7 @@ async function transformField(ctx: RuleContext, path: string, propName: string, try { ctx.enterField(msg, fullName, propName, getType(propSchema), getInlineTags(propSchema)) let value = msg[propName] - if (value !== null) { + if (value != null) { const newVal = await transform(ctx, propSchema, fullName, value, fieldTransform) if (ctx.rule.kind === 'CONDITION') { if (newVal === false) {