Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions schemaregistry/serde/json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,15 @@ async function transformField(ctx: RuleContext, path: string, propName: string,
try {
ctx.enterField(msg, fullName, propName, getType(propSchema), getInlineTags(propSchema))
let value = msg[propName]
const newVal = await transform(ctx, propSchema, fullName, value, fieldTransform)
if (ctx.rule.kind === 'CONDITION') {
if (newVal === false) {
throw new RuleConditionError(ctx.rule)
if (value != null) {
const newVal = await transform(ctx, propSchema, fullName, value, fieldTransform)
if (ctx.rule.kind === 'CONDITION') {
if (newVal === false) {
throw new RuleConditionError(ctx.rule)
}
} else {
msg[propName] = newVal
}
} else {
msg[propName] = newVal
}
} finally {
ctx.leaveField()
Expand Down Expand Up @@ -453,6 +455,9 @@ function getType(schema: DereferencedJSONSchema): FieldType {
return FieldType.NULL
}
if (schema.type == null) {
if (schema.properties != null && Object.keys(schema.properties).length > 0) {
return FieldType.RECORD
}
return FieldType.NULL
}
if (Array.isArray(schema.type)) {
Expand Down
117 changes: 117 additions & 0 deletions schemaregistry/test/serde/json.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,75 @@ const defSchema = `
"type" : "object"
}
`
const messageSchema = `
{

"type": "object",
"properties": {
"messageType": {
"type": "string"
},
"version": {
"type": "string"
},
"payload": {
"type": "object",
"oneOf": [
{
"$ref": "#/$defs/authentication_request"
},
{
"$ref": "#/$defs/authentication_status"
}
]
}
},
"required": [
"payload",
"messageType",
"version"
],
"$defs": {
"authentication_request": {
"properties": {
"messageId": {
"type": "string",
"confluent:tags": ["PII"]
},
"timestamp": {
"type": "integer",
"minimum": 0
},
"requestId": {
"type": "string"
}
},
"required": [
"messageId",
"timestamp"
]
},
"authentication_status": {
"properties": {
"messageId": {
"type": "string",
"confluent:tags": ["PII"]
},
"authType": {
"type": [
"string",
"null"
]
}
},
"required": [
"messageId",
"authType"
]
}
}
}
`

describe('JsonSerializer', () => {
afterEach(async () => {
Expand Down Expand Up @@ -543,6 +612,54 @@ describe('JsonSerializer', () => {
expect(obj2.boolField).toEqual(obj.boolField);
expect(obj2.bytesField).toEqual(obj.bytesField);
})
it('cel field transform with union of refs', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],
cacheCapacity: 1000
}
let client = SchemaRegistryClient.newClient(conf)
let serConfig: JsonSerializerConfig = {
useLatestVersion: true,
}
let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig)

let encRule: Rule = {
name: 'test-cel',
kind: 'TRANSFORM',
mode: RuleMode.WRITE,
type: 'CEL_FIELD',
expr: "name == 'messageId' ; value + '-suffix'"
}
let ruleSet: RuleSet = {
domainRules: [encRule]
}

let info: SchemaInfo = {
schemaType: 'JSON',
schema: messageSchema,
ruleSet
}

await client.register(subject, info, false)

let obj = {
messageType: 'authentication_request',
version: '1.0',
payload: {
messageId: '12345',
timestamp: 123456789
}
}
let bytes = await ser.serialize(topic, obj)

let deserConfig: JsonDeserializerConfig = {}
let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig)
let obj2 = await deser.deserialize(topic, bytes)
expect(obj2.messageType).toEqual(obj.messageType);
expect(obj2.version).toEqual(obj.version);
expect(obj2.payload.messageId).toEqual('12345-suffix');
expect(obj2.payload.timestamp).toEqual(obj.payload.timestamp);
})
it('basic encryption', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],
Expand Down