Skip to content

Commit

Permalink
DBZ-4989 Passthrough payload schema in EventRouterDelegate
Browse files Browse the repository at this point in the history
  • Loading branch information
bmorganpa committed Apr 13, 2022
1 parent 053bf60 commit af454c7
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ public R apply(R r, RecordConverter<R> recordConverter) {

final Schema structValueSchema = onlyHeadersInOutputMessage ? null
: (fieldSchemaVersion == null)
? getValueSchema(fieldPayload, eventValueSchema, eventStruct.getString(routeByField))
: getValueSchema(fieldPayload, eventValueSchema, eventStruct.getInt32(fieldSchemaVersion), eventStruct.getString(routeByField));
? getValueSchema(payloadSchema, eventValueSchema, eventStruct.getString(routeByField))
: getValueSchema(payloadSchema, eventValueSchema, eventStruct.getInt32(fieldSchemaVersion), eventStruct.getString(routeByField));

final Struct structValue = onlyHeadersInOutputMessage ? null : new Struct(structValueSchema).put(ENVELOPE_PAYLOAD, payload);

Expand Down Expand Up @@ -365,17 +365,17 @@ public void configure(Map<String, ?> configMap) {
onlyHeadersInOutputMessage = !additionalFields.stream().anyMatch(field -> field.getPlacement() == EventRouterConfigDefinition.AdditionalFieldPlacement.ENVELOPE);
}

private Schema getValueSchema(String fieldPayload, Schema debeziumEventSchema, String routedTopic) {
private Schema getValueSchema(Schema payloadSchema, Schema debeziumEventSchema, String routedTopic) {
if (defaultValueSchema == null) {
defaultValueSchema = getSchemaBuilder(fieldPayload, debeziumEventSchema, routedTopic).build();
defaultValueSchema = getSchemaBuilder(payloadSchema, debeziumEventSchema, routedTopic).build();
}

return defaultValueSchema;
}

private Schema getValueSchema(String fieldPayload, Schema debeziumEventSchema, Integer version, String routedTopic) {
private Schema getValueSchema(Schema payloadSchema, Schema debeziumEventSchema, Integer version, String routedTopic) {
if (!versionedValueSchema.containsKey(version)) {
final Schema schema = getSchemaBuilder(fieldPayload, debeziumEventSchema, routedTopic)
final Schema schema = getSchemaBuilder(payloadSchema, debeziumEventSchema, routedTopic)
.version(version)
.build();
versionedValueSchema.put(version, schema);
Expand All @@ -384,12 +384,11 @@ private Schema getValueSchema(String fieldPayload, Schema debeziumEventSchema, I
return versionedValueSchema.get(version);
}

private SchemaBuilder getSchemaBuilder(String fieldPayload, Schema debeziumEventSchema, String routedTopic) {
private SchemaBuilder getSchemaBuilder(Schema payloadSchema, Schema debeziumEventSchema, String routedTopic) {
SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(getSchemaName(debeziumEventSchema, routedTopic));

// Add payload field
schemaBuilder
.field(ENVELOPE_PAYLOAD, debeziumEventSchema.field(fieldPayload).schema());
schemaBuilder.field(ENVELOPE_PAYLOAD, payloadSchema);

// Add additional fields while keeping the schema inherited from Debezium based on the table column type
additionalFields.forEach((additionalField -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,45 @@ public void shouldNotExpandJSONPayloadIfNotConfigured() {
assertThat(eventRouted.value()).isEqualTo("{\"fullName\": \"John Doe\", \"rating\": 4.9, \"age\": 42}");
}

@Test
public void canExpandJsonPayloadWithAdditionalFieldInEnvelope() {
final EventRouter<SourceRecord> router = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(
EventRouterConfigDefinition.EXPAND_JSON_PAYLOAD.name(),
"true");
config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:envelope");
router.configure(config);

final SourceRecord eventRecord = createEventRecord(
"da8d6de6-3b77-45ff-8f44-57db55a7a06c",
"UserCreated",
"10711fa5",
"User",
"{\"fullName\": \"John Doe\"}",
new HashMap<>(),
new HashMap<>());
final SourceRecord eventRouted = router.apply(eventRecord);

assertThat(eventRouted).isNotNull();

Schema valueSchema = eventRouted.valueSchema();
assertThat(valueSchema.type()).isEqualTo(SchemaBuilder.struct().type());
assertThat(valueSchema.fields().size()).isEqualTo(2);
assertThat(valueSchema.field("type").schema().type().getName()).isEqualTo("string");

Schema payloadSchema = valueSchema.field("payload").schema();
assertThat(payloadSchema.type()).isEqualTo(SchemaBuilder.struct().type());
assertThat(payloadSchema.fields().size()).isEqualTo(1);
assertThat(payloadSchema.field("fullName").schema().type().getName()).isEqualTo("string");

Struct valueStruct = (Struct) eventRouted.value();
assertThat(valueStruct.get("type")).isEqualTo("UserCreated");

Struct payloadStruct = valueStruct.getStruct("payload");
assertThat(payloadStruct.get("fullName")).isEqualTo("John Doe");
}

private SourceRecord createEventRecord() {
return createEventRecord(
"da8d6de6-3b77-45ff-8f44-57db55a7a06c",
Expand Down

0 comments on commit af454c7

Please sign in to comment.