diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java index a67516ecc731..188b51c24a09 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java @@ -36,7 +36,6 @@ import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import java.io.Closeable; @@ -296,15 +295,13 @@ ConsumerRecord parseRecord(TopicPartition partition, long timestamp = record.timestamp(); Headers headers = new RecordHeaders(record.headers()); ByteBuffer keyBytes = record.key(); - byte[] keyByteArray = keyBytes == null ? null : org.apache.kafka.common.utils.Utils.toArray(keyBytes); - K key = keyBytes == null ? null : fetchConfig.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray); + K key = keyBytes == null ? null : fetchConfig.keyDeserializer.deserialize(partition.topic(), headers, keyBytes); ByteBuffer valueBytes = record.value(); - byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes); - V value = valueBytes == null ? null : fetchConfig.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray); + V value = valueBytes == null ? null : fetchConfig.valueDeserializer.deserialize(partition.topic(), headers, valueBytes); return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, timestamp, timestampType, - keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length, - valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length, + keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(), + valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(), key, value, headers, leaderEpoch); } catch (RuntimeException e) { throw new RecordDeserializationException(partition, record.offset(), diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BooleanDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/BooleanDeserializer.java index 2384fc905f58..1add7ed3388b 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/BooleanDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/BooleanDeserializer.java @@ -17,6 +17,9 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.header.Headers; + +import java.nio.ByteBuffer; public class BooleanDeserializer implements Deserializer { private static final byte TRUE = 0x01; @@ -40,4 +43,24 @@ public Boolean deserialize(final String topic, final byte[] data) { throw new SerializationException("Unexpected byte received by BooleanDeserializer: " + data[0]); } } + + @Override + public Boolean deserialize(String topic, Headers headers, ByteBuffer data) { + if (data == null) { + return null; + } + + if (data.remaining() != 1) { + throw new SerializationException("Size of data received by BooleanDeserializer is not 1"); + } + + final byte b = data.get(data.position()); + if (b == TRUE) { + return true; + } else if (b == FALSE) { + return false; + } else { + throw new SerializationException("Unexpected byte received by BooleanDeserializer: " + b); + } + } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java index 0dfcf5f26c32..6ef05af16fc2 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java @@ -16,13 +16,22 @@ */ package org.apache.kafka.common.serialization; +import org.apache.kafka.common.header.Headers; + import java.nio.ByteBuffer; public class ByteBufferDeserializer implements Deserializer { + + @Override public ByteBuffer deserialize(String topic, byte[] data) { if (data == null) return null; return ByteBuffer.wrap(data); } + + @Override + public ByteBuffer deserialize(String topic, Headers headers, ByteBuffer data) { + return data; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java index eb56485abce3..9fd8d50be744 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java @@ -17,8 +17,10 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.utils.Utils; import java.io.Closeable; +import java.nio.ByteBuffer; import java.util.Map; /** @@ -60,6 +62,17 @@ default T deserialize(String topic, Headers headers, byte[] data) { return deserialize(topic, data); } + /** + * Deserialize a record value from a ByteBuffer into a value or object. + * @param topic topic associated with the data + * @param headers headers associated with the record; may be empty. + * @param data serialized ByteBuffer; may be null; implementations are recommended to handle null by returning a value or null rather than throwing an exception. + * @return deserialized typed data; may be null + */ + default T deserialize(String topic, Headers headers, ByteBuffer data) { + return deserialize(topic, headers, Utils.toNullableArray(data)); + } + /** * Close this deserializer. *

diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java index 0fa1cce4d740..8b3d8b698872 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java @@ -17,6 +17,9 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.header.Headers; + +import java.nio.ByteBuffer; public class DoubleDeserializer implements Deserializer { @@ -35,4 +38,16 @@ public Double deserialize(String topic, byte[] data) { } return Double.longBitsToDouble(value); } + + @Override + public Double deserialize(String topic, Headers headers, ByteBuffer data) { + if (data == null) { + return null; + } + + if (data.remaining() != 8) { + throw new SerializationException("Size of data received by DoubleDeserializer is not 8"); + } + return data.getDouble(data.position()); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java index 09031779426c..0389cacdcf88 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java @@ -17,6 +17,9 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.header.Headers; + +import java.nio.ByteBuffer; public class FloatDeserializer implements Deserializer { @Override @@ -34,4 +37,16 @@ public Float deserialize(final String topic, final byte[] data) { } return Float.intBitsToFloat(value); } + + @Override + public Float deserialize(String topic, Headers headers, ByteBuffer data) { + if (data == null) { + return null; + } + + if (data.remaining() != 4) { + throw new SerializationException("Size of data received by Deserializer is not 4"); + } + return data.getFloat(data.position()); + } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java index 20ca63f02241..f9ac21374d7c 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java @@ -17,8 +17,12 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.header.Headers; + +import java.nio.ByteBuffer; public class IntegerDeserializer implements Deserializer { + @Override public Integer deserialize(String topic, byte[] data) { if (data == null) return null; @@ -33,4 +37,16 @@ public Integer deserialize(String topic, byte[] data) { } return value; } + + @Override + public Integer deserialize(String topic, Headers headers, ByteBuffer data) { + if (data == null) { + return null; + } + + if (data.remaining() != 4) { + throw new SerializationException("Size of data received by IntegerDeserializer is not 4"); + } + return data.getInt(data.position()); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java index 1e445d2452f0..38cbfd982707 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java @@ -17,8 +17,12 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.header.Headers; + +import java.nio.ByteBuffer; public class LongDeserializer implements Deserializer { + @Override public Long deserialize(String topic, byte[] data) { if (data == null) return null; @@ -33,4 +37,16 @@ public Long deserialize(String topic, byte[] data) { } return value; } + + @Override + public Long deserialize(String topic, Headers headers, ByteBuffer data) { + if (data == null) { + return null; + } + + if (data.remaining() != 8) { + throw new SerializationException("Size of data received by LongDeserializer is not 8"); + } + return data.getLong(data.position()); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java index 7814a7bd7122..42924fb77af8 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java @@ -17,9 +17,13 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.header.Headers; + +import java.nio.ByteBuffer; public class ShortDeserializer implements Deserializer { + @Override public Short deserialize(String topic, byte[] data) { if (data == null) return null; @@ -34,4 +38,16 @@ public Short deserialize(String topic, byte[] data) { } return value; } + + @Override + public Short deserialize(String topic, Headers headers, ByteBuffer data) { + if (data == null) { + return null; + } + + if (data.remaining() != 2) { + throw new SerializationException("Size of data received by ShortDeserializer is not 2"); + } + return data.getShort(data.position()); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java index 3d8b7bbd9806..935c58889c59 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java @@ -17,8 +17,11 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.utils.Utils; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Map; @@ -50,4 +53,20 @@ public String deserialize(String topic, byte[] data) { throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding); } } + + @Override + public String deserialize(String topic, Headers headers, ByteBuffer data) { + if (data == null) { + return null; + } + + try { + if (data.hasArray()) { + return new String(data.array(), data.position() + data.arrayOffset(), data.remaining(), encoding); + } + return new String(Utils.toArray(data), encoding); + } catch (UnsupportedEncodingException e) { + throw new SerializationException("Error when deserializing ByteBuffer to string due to unsupported encoding " + encoding); + } + } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java index 779a9bd1529d..f1f1403cc925 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java @@ -17,8 +17,11 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.utils.Utils; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.UUID; @@ -53,4 +56,22 @@ public UUID deserialize(String topic, byte[] data) { throw new SerializationException("Error parsing data into UUID", e); } } + + @Override + public UUID deserialize(String topic, Headers headers, ByteBuffer data) { + try { + if (data == null) { + return null; + } + + if (data.hasArray()) { + return UUID.fromString(new String(data.array(), data.arrayOffset() + data.position(), data.remaining(), encoding)); + } + return UUID.fromString(new String(Utils.toArray(data), encoding)); + } catch (UnsupportedEncodingException e) { + throw new SerializationException("Error when deserializing ByteBuffer to UUID due to unsupported encoding " + encoding, e); + } catch (IllegalArgumentException e) { + throw new SerializationException("Error parsing data into UUID", e); + } + } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/VoidDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/VoidDeserializer.java index 08ff57a7bb28..1f0d3b0c2962 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/VoidDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/VoidDeserializer.java @@ -16,6 +16,10 @@ */ package org.apache.kafka.common.serialization; +import org.apache.kafka.common.header.Headers; + +import java.nio.ByteBuffer; + public class VoidDeserializer implements Deserializer { @Override public Void deserialize(String topic, byte[] data) { @@ -24,4 +28,12 @@ public Void deserialize(String topic, byte[] data) { return null; } + + @Override + public Void deserialize(String topic, Headers headers, ByteBuffer data) { + if (data != null) { + throw new IllegalArgumentException("Data should be null for a VoidDeserializer."); + } + return null; + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index b55c6822b0f4..c2e654a5f0bd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -55,6 +55,7 @@ import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.message.HeartbeatResponseData; @@ -302,6 +303,16 @@ public String deserialize(String topic, byte[] data) { return super.deserialize(topic, data); } } + + @Override + public String deserialize(String topic, Headers headers, ByteBuffer data) { + if (i == recordIndex) { + throw new SerializationException(); + } else { + i++; + return super.deserialize(topic, headers, data); + } + } }; } diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java index 6607678dedff..2fd28037c81b 100644 --- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java @@ -34,6 +34,7 @@ import java.util.Stack; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.kafka.common.utils.Utils.wrapNullable; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -45,18 +46,20 @@ public class SerializationTest { final private String topic = "testTopic"; final private Map, List> testData = new HashMap, List>() { { - put(String.class, Arrays.asList("my string")); - put(Short.class, Arrays.asList((short) 32767, (short) -32768)); - put(Integer.class, Arrays.asList(423412424, -41243432)); - put(Long.class, Arrays.asList(922337203685477580L, -922337203685477581L)); - put(Float.class, Arrays.asList(5678567.12312f, -5678567.12341f)); - put(Double.class, Arrays.asList(5678567.12312d, -5678567.12341d)); - put(byte[].class, Arrays.asList("my string".getBytes())); - put(ByteBuffer.class, Arrays.asList(ByteBuffer.wrap("my string".getBytes()), + put(String.class, Arrays.asList(null, "my string")); + put(Short.class, Arrays.asList(null, (short) 32767, (short) -32768)); + put(Integer.class, Arrays.asList(null, 423412424, -41243432)); + put(Long.class, Arrays.asList(null, 922337203685477580L, -922337203685477581L)); + put(Float.class, Arrays.asList(null, 5678567.12312f, -5678567.12341f)); + put(Double.class, Arrays.asList(null, 5678567.12312d, -5678567.12341d)); + put(byte[].class, Arrays.asList(null, "my string".getBytes())); + put(ByteBuffer.class, Arrays.asList( + null, + ByteBuffer.wrap("my string".getBytes()), ByteBuffer.allocate(10).put("my string".getBytes()), ByteBuffer.allocateDirect(10).put("my string".getBytes()))); - put(Bytes.class, Arrays.asList(new Bytes("my string".getBytes()))); - put(UUID.class, Arrays.asList(UUID.randomUUID())); + put(Bytes.class, Arrays.asList(null, new Bytes("my string".getBytes()))); + put(UUID.class, Arrays.asList(null, UUID.randomUUID())); } }; @@ -69,8 +72,17 @@ public void allSerdesShouldRoundtripInput() { for (Map.Entry, List> test : testData.entrySet()) { try (Serde serde = Serdes.serdeFrom((Class) test.getKey())) { for (Object value : test.getValue()) { - assertEquals(value, serde.deserializer().deserialize(topic, serde.serializer().serialize(topic, value)), + final byte[] serialized = serde.serializer().serialize(topic, value); + assertEquals(value, serde.deserializer().deserialize(topic, serialized), "Should get the original " + test.getKey().getSimpleName() + " after serialization and deserialization"); + + if (value instanceof byte[]) { + assertArrayEquals((byte[]) value, (byte[]) serde.deserializer().deserialize(topic, null, (byte[]) value), + "Should get the original " + test.getKey().getSimpleName() + " after serialization and deserialization"); + } else { + assertEquals(value, serde.deserializer().deserialize(topic, null, wrapNullable(serialized)), + "Should get the original " + test.getKey().getSimpleName() + " after serialization and deserialization"); + } } } } @@ -84,6 +96,8 @@ public void allSerdesShouldSupportNull() { "Should support null in " + cls.getSimpleName() + " serialization"); assertNull(serde.deserializer().deserialize(topic, null), "Should support null in " + cls.getSimpleName() + " deserialization"); + assertNull(serde.deserializer().deserialize(topic, null, (ByteBuffer) null), + "Should support null in " + cls.getSimpleName() + " deserialization"); } } } @@ -361,6 +375,23 @@ public void voidDeserializerShouldThrowOnNotNullValues() { } } + @Test + public void stringDeserializerSupportByteBuffer() { + final String data = "Hello, ByteBuffer!"; + try (Serde serde = Serdes.String()) { + final Serializer serializer = serde.serializer(); + final Deserializer deserializer = serde.deserializer(); + final byte[] serializedBytes = serializer.serialize(topic, data); + final ByteBuffer heapBuff = ByteBuffer.allocate(serializedBytes.length << 1).put(serializedBytes); + heapBuff.flip(); + assertEquals(data, deserializer.deserialize(topic, null, heapBuff)); + + final ByteBuffer directBuff = ByteBuffer.allocateDirect(serializedBytes.length << 2).put(serializedBytes); + directBuff.flip(); + assertEquals(data, deserializer.deserialize(topic, null, directBuff)); + } + } + private Serde getStringSerde(String encoder) { Map serializerConfigs = new HashMap<>(); serializerConfigs.put("key.serializer.encoding", encoder); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java index 03e0c3a9a53f..38eb1a7c71a0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java @@ -16,12 +16,15 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import java.nio.ByteBuffer; + class SerdeThatDoesntHandleNull implements Serde { @Override public Serializer serializer() { @@ -38,6 +41,14 @@ public String deserialize(final String topic, final byte[] data) { } return super.deserialize(topic, data); } + + @Override + public String deserialize(final String topic, final Headers headers, final ByteBuffer data) { + if (data == null) { + throw new NullPointerException(); + } + return super.deserialize(topic, headers, data); + } }; } }