Skip to content

Commit

Permalink
DGS-9387 Ensure isKey flag is passed when reusing deserializers
Browse files Browse the repository at this point in the history
In the console consumers, ensure isKey flag is passed when reusing
deserializers.
  • Loading branch information
rayokota committed Dec 7, 2023
1 parent 014e1d6 commit 1d41518
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 17 deletions.
Expand Up @@ -84,9 +84,9 @@ protected SchemaMessageDeserializer<Object> createDeserializer(Deserializer keyD
}

@Override
protected void writeTo(String topic, Headers headers, byte[] data, PrintStream output)
throws IOException {
Object object = deserializer.deserialize(topic, headers, data);
protected void writeTo(String topic, Boolean isKey, Headers headers,
byte[] data, PrintStream output) throws IOException {
Object object = deserializer.deserialize(topic, isKey, headers, data);
try {
AvroSchemaUtils.toJson(object, output);
} catch (AvroRuntimeException e) {
Expand Down Expand Up @@ -129,7 +129,7 @@ public Object deserializeKey(String topic, Headers headers, byte[] payload) {
}

@Override
public Object deserialize(String topic, Headers headers, byte[] payload)
public Object deserialize(String topic, Boolean isKey, Headers headers, byte[] payload)
throws SerializationException {
return super.deserialize(topic, isKey, headers, payload, null);
}
Expand Down
Expand Up @@ -87,9 +87,9 @@ protected SchemaMessageDeserializer<JsonNode> createDeserializer(Deserializer ke


@Override
protected void writeTo(String topic, Headers headers, byte[] data, PrintStream output)
throws IOException {
JsonNode object = deserializer.deserialize(topic, headers, data);
protected void writeTo(String topic, Boolean isKey, Headers headers,
byte[] data, PrintStream output) throws IOException {
JsonNode object = deserializer.deserialize(topic, isKey, headers, data);
output.print(objectMapper.writeValueAsString(object));
}

Expand Down Expand Up @@ -130,7 +130,7 @@ public Object deserializeKey(String topic, Headers headers, byte[] payload) {
}

@Override
public JsonNode deserialize(String topic, Headers headers, byte[] payload)
public JsonNode deserialize(String topic, Boolean isKey, Headers headers, byte[] payload)
throws SerializationException {
return (JsonNode) super.deserialize(false, topic, isKey, headers, payload);
}
Expand Down
Expand Up @@ -96,9 +96,9 @@ public void init(Properties props) {
}

@Override
protected void writeTo(String topic, Headers headers, byte[] data, PrintStream output)
throws IOException {
Message object = deserializer.deserialize(topic, headers, data);
protected void writeTo(String topic, Boolean isKey, Headers headers,
byte[] data, PrintStream output) throws IOException {
Message object = deserializer.deserialize(topic, isKey, headers, data);
try {
JsonFormat.Printer printer = JsonFormat.printer()
.includingDefaultValueFields()
Expand Down Expand Up @@ -144,7 +144,7 @@ public Object deserializeKey(String topic, Headers headers, byte[] payload) {
}

@Override
public Message deserialize(String topic, Headers headers, byte[] payload)
public Message deserialize(String topic, Boolean isKey, Headers headers, byte[] payload)
throws SerializationException {
return (Message) super.deserialize(false, topic, isKey, headers, payload);
}
Expand Down
Expand Up @@ -33,7 +33,8 @@ public interface SchemaMessageDeserializer<T> extends Closeable {

Object deserializeKey(String topic, Headers headers, byte[] payload);

T deserialize(String topic, Headers headers, byte[] payload) throws SerializationException;
T deserialize(String topic, Boolean isKey, Headers headers, byte[] payload)
throws SerializationException;

SchemaRegistryClient getSchemaRegistryClient();

Expand Down
Expand Up @@ -242,7 +242,8 @@ public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream o
: nullLiteral);
} else {
if (consumerRecord.key() != null) {
writeTo(consumerRecord.topic(), consumerRecord.headers(), consumerRecord.key(), output);
writeTo(consumerRecord.topic(), true, consumerRecord.headers(),
consumerRecord.key(), output);
} else {
output.write(nullLiteral);
}
Expand All @@ -263,7 +264,8 @@ public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream o
}
try {
if (consumerRecord.value() != null) {
writeTo(consumerRecord.topic(), consumerRecord.headers(), consumerRecord.value(), output);
writeTo(consumerRecord.topic(), false, consumerRecord.headers(),
consumerRecord.value(), output);
} else {
output.write(nullLiteral);
}
Expand All @@ -278,8 +280,8 @@ public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream o
}
}

protected abstract void writeTo(String topic, Headers headers, byte[] data, PrintStream output)
throws IOException;
protected abstract void writeTo(String topic, Boolean isKey, Headers headers,
byte[] data, PrintStream output) throws IOException;

@Override
public void close() {
Expand Down

0 comments on commit 1d41518

Please sign in to comment.