From 1d4151812c5acab26d23a4a1657a5f411aac7125 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Thu, 7 Dec 2023 15:01:11 -0800 Subject: [PATCH] DGS-9387 Ensure isKey flag is passed when reusing deserializers In the console consumers, ensure isKey flag is passed when reusing deserializers. --- .../kafka/formatter/AvroMessageFormatter.java | 8 ++++---- .../formatter/json/JsonSchemaMessageFormatter.java | 8 ++++---- .../formatter/protobuf/ProtobufMessageFormatter.java | 8 ++++---- .../kafka/formatter/SchemaMessageDeserializer.java | 3 ++- .../kafka/formatter/SchemaMessageFormatter.java | 10 ++++++---- 5 files changed, 20 insertions(+), 17 deletions(-) diff --git a/avro-serializer/src/main/java/io/confluent/kafka/formatter/AvroMessageFormatter.java b/avro-serializer/src/main/java/io/confluent/kafka/formatter/AvroMessageFormatter.java index 0aa94e590bf..63b166abe45 100644 --- a/avro-serializer/src/main/java/io/confluent/kafka/formatter/AvroMessageFormatter.java +++ b/avro-serializer/src/main/java/io/confluent/kafka/formatter/AvroMessageFormatter.java @@ -84,9 +84,9 @@ protected SchemaMessageDeserializer createDeserializer(Deserializer keyD } @Override - protected void writeTo(String topic, Headers headers, byte[] data, PrintStream output) - throws IOException { - Object object = deserializer.deserialize(topic, headers, data); + protected void writeTo(String topic, Boolean isKey, Headers headers, + byte[] data, PrintStream output) throws IOException { + Object object = deserializer.deserialize(topic, isKey, headers, data); try { AvroSchemaUtils.toJson(object, output); } catch (AvroRuntimeException e) { @@ -129,7 +129,7 @@ public Object deserializeKey(String topic, Headers headers, byte[] payload) { } @Override - public Object deserialize(String topic, Headers headers, byte[] payload) + public Object deserialize(String topic, Boolean isKey, Headers headers, byte[] payload) throws SerializationException { return super.deserialize(topic, isKey, headers, payload, null); } diff --git a/json-schema-serializer/src/main/java/io/confluent/kafka/formatter/json/JsonSchemaMessageFormatter.java b/json-schema-serializer/src/main/java/io/confluent/kafka/formatter/json/JsonSchemaMessageFormatter.java index 1abf6cddf94..964e8fb45c4 100644 --- a/json-schema-serializer/src/main/java/io/confluent/kafka/formatter/json/JsonSchemaMessageFormatter.java +++ b/json-schema-serializer/src/main/java/io/confluent/kafka/formatter/json/JsonSchemaMessageFormatter.java @@ -87,9 +87,9 @@ protected SchemaMessageDeserializer createDeserializer(Deserializer ke @Override - protected void writeTo(String topic, Headers headers, byte[] data, PrintStream output) - throws IOException { - JsonNode object = deserializer.deserialize(topic, headers, data); + protected void writeTo(String topic, Boolean isKey, Headers headers, + byte[] data, PrintStream output) throws IOException { + JsonNode object = deserializer.deserialize(topic, isKey, headers, data); output.print(objectMapper.writeValueAsString(object)); } @@ -130,7 +130,7 @@ public Object deserializeKey(String topic, Headers headers, byte[] payload) { } @Override - public JsonNode deserialize(String topic, Headers headers, byte[] payload) + public JsonNode deserialize(String topic, Boolean isKey, Headers headers, byte[] payload) throws SerializationException { return (JsonNode) super.deserialize(false, topic, isKey, headers, payload); } diff --git a/protobuf-serializer/src/main/java/io/confluent/kafka/formatter/protobuf/ProtobufMessageFormatter.java b/protobuf-serializer/src/main/java/io/confluent/kafka/formatter/protobuf/ProtobufMessageFormatter.java index 6919d65de4e..ce58b3e09f9 100644 --- a/protobuf-serializer/src/main/java/io/confluent/kafka/formatter/protobuf/ProtobufMessageFormatter.java +++ b/protobuf-serializer/src/main/java/io/confluent/kafka/formatter/protobuf/ProtobufMessageFormatter.java @@ -96,9 +96,9 @@ public void init(Properties props) { } @Override - protected void writeTo(String topic, Headers headers, byte[] data, PrintStream output) - throws IOException { - Message object = deserializer.deserialize(topic, headers, data); + protected void writeTo(String topic, Boolean isKey, Headers headers, + byte[] data, PrintStream output) throws IOException { + Message object = deserializer.deserialize(topic, isKey, headers, data); try { JsonFormat.Printer printer = JsonFormat.printer() .includingDefaultValueFields() @@ -144,7 +144,7 @@ public Object deserializeKey(String topic, Headers headers, byte[] payload) { } @Override - public Message deserialize(String topic, Headers headers, byte[] payload) + public Message deserialize(String topic, Boolean isKey, Headers headers, byte[] payload) throws SerializationException { return (Message) super.deserialize(false, topic, isKey, headers, payload); } diff --git a/schema-serializer/src/main/java/io/confluent/kafka/formatter/SchemaMessageDeserializer.java b/schema-serializer/src/main/java/io/confluent/kafka/formatter/SchemaMessageDeserializer.java index b53e2d0dae3..50a5b4dcf44 100644 --- a/schema-serializer/src/main/java/io/confluent/kafka/formatter/SchemaMessageDeserializer.java +++ b/schema-serializer/src/main/java/io/confluent/kafka/formatter/SchemaMessageDeserializer.java @@ -33,7 +33,8 @@ public interface SchemaMessageDeserializer extends Closeable { Object deserializeKey(String topic, Headers headers, byte[] payload); - T deserialize(String topic, Headers headers, byte[] payload) throws SerializationException; + T deserialize(String topic, Boolean isKey, Headers headers, byte[] payload) + throws SerializationException; SchemaRegistryClient getSchemaRegistryClient(); diff --git a/schema-serializer/src/main/java/io/confluent/kafka/formatter/SchemaMessageFormatter.java b/schema-serializer/src/main/java/io/confluent/kafka/formatter/SchemaMessageFormatter.java index f1e97689626..48591cbf828 100644 --- a/schema-serializer/src/main/java/io/confluent/kafka/formatter/SchemaMessageFormatter.java +++ b/schema-serializer/src/main/java/io/confluent/kafka/formatter/SchemaMessageFormatter.java @@ -242,7 +242,8 @@ public void writeTo(ConsumerRecord consumerRecord, PrintStream o : nullLiteral); } else { if (consumerRecord.key() != null) { - writeTo(consumerRecord.topic(), consumerRecord.headers(), consumerRecord.key(), output); + writeTo(consumerRecord.topic(), true, consumerRecord.headers(), + consumerRecord.key(), output); } else { output.write(nullLiteral); } @@ -263,7 +264,8 @@ public void writeTo(ConsumerRecord consumerRecord, PrintStream o } try { if (consumerRecord.value() != null) { - writeTo(consumerRecord.topic(), consumerRecord.headers(), consumerRecord.value(), output); + writeTo(consumerRecord.topic(), false, consumerRecord.headers(), + consumerRecord.value(), output); } else { output.write(nullLiteral); } @@ -278,8 +280,8 @@ public void writeTo(ConsumerRecord consumerRecord, PrintStream o } } - protected abstract void writeTo(String topic, Headers headers, byte[] data, PrintStream output) - throws IOException; + protected abstract void writeTo(String topic, Boolean isKey, Headers headers, + byte[] data, PrintStream output) throws IOException; @Override public void close() {