Skip to content

Commit

Permalink
KAFKA-14944: Reduce CompletedFetch#parseRecord() memory copy (#12545)
Browse files Browse the repository at this point in the history
This implements KIP-863: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035
Direct use ByteBuffer instead of byte[] to deserialize.

Reviewers: Luke Chen <showuon@gmail.com>, Kirk True <kirk@kirktrue.pro>
  • Loading branch information
LinShunKang committed Apr 27, 2023
1 parent c1b5c75 commit dd6690a
Show file tree
Hide file tree
Showing 15 changed files with 243 additions and 18 deletions.
Expand Up @@ -36,7 +36,6 @@
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

import java.io.Closeable;
Expand Down Expand Up @@ -296,15 +295,13 @@ ConsumerRecord<K, V> parseRecord(TopicPartition partition,
long timestamp = record.timestamp();
Headers headers = new RecordHeaders(record.headers());
ByteBuffer keyBytes = record.key();
byte[] keyByteArray = keyBytes == null ? null : org.apache.kafka.common.utils.Utils.toArray(keyBytes);
K key = keyBytes == null ? null : fetchConfig.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
K key = keyBytes == null ? null : fetchConfig.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
ByteBuffer valueBytes = record.value();
byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
V value = valueBytes == null ? null : fetchConfig.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
V value = valueBytes == null ? null : fetchConfig.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
timestamp, timestampType,
keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(),
valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(),
key, value, headers, leaderEpoch);
} catch (RuntimeException e) {
throw new RecordDeserializationException(partition, record.offset(),
Expand Down
Expand Up @@ -17,6 +17,9 @@
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;

import java.nio.ByteBuffer;

public class BooleanDeserializer implements Deserializer<Boolean> {
private static final byte TRUE = 0x01;
Expand All @@ -40,4 +43,24 @@ public Boolean deserialize(final String topic, final byte[] data) {
throw new SerializationException("Unexpected byte received by BooleanDeserializer: " + data[0]);
}
}

@Override
public Boolean deserialize(String topic, Headers headers, ByteBuffer data) {
if (data == null) {
return null;
}

if (data.remaining() != 1) {
throw new SerializationException("Size of data received by BooleanDeserializer is not 1");
}

final byte b = data.get(data.position());
if (b == TRUE) {
return true;
} else if (b == FALSE) {
return false;
} else {
throw new SerializationException("Unexpected byte received by BooleanDeserializer: " + b);
}
}
}
Expand Up @@ -16,13 +16,22 @@
*/
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.header.Headers;

import java.nio.ByteBuffer;

public class ByteBufferDeserializer implements Deserializer<ByteBuffer> {

@Override
public ByteBuffer deserialize(String topic, byte[] data) {
if (data == null)
return null;

return ByteBuffer.wrap(data);
}

@Override
public ByteBuffer deserialize(String topic, Headers headers, ByteBuffer data) {
return data;
}
}
Expand Up @@ -17,8 +17,10 @@
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Utils;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Map;

/**
Expand Down Expand Up @@ -60,6 +62,17 @@ default T deserialize(String topic, Headers headers, byte[] data) {
return deserialize(topic, data);
}

/**
* Deserialize a record value from a ByteBuffer into a value or object.
* @param topic topic associated with the data
* @param headers headers associated with the record; may be empty.
* @param data serialized ByteBuffer; may be null; implementations are recommended to handle null by returning a value or null rather than throwing an exception.
* @return deserialized typed data; may be null
*/
default T deserialize(String topic, Headers headers, ByteBuffer data) {
return deserialize(topic, headers, Utils.toNullableArray(data));
}

/**
* Close this deserializer.
* <p>
Expand Down
Expand Up @@ -17,6 +17,9 @@
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;

import java.nio.ByteBuffer;

public class DoubleDeserializer implements Deserializer<Double> {

Expand All @@ -35,4 +38,16 @@ public Double deserialize(String topic, byte[] data) {
}
return Double.longBitsToDouble(value);
}

@Override
public Double deserialize(String topic, Headers headers, ByteBuffer data) {
if (data == null) {
return null;
}

if (data.remaining() != 8) {
throw new SerializationException("Size of data received by DoubleDeserializer is not 8");
}
return data.getDouble(data.position());
}
}
Expand Up @@ -17,6 +17,9 @@
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;

import java.nio.ByteBuffer;

public class FloatDeserializer implements Deserializer<Float> {
@Override
Expand All @@ -34,4 +37,16 @@ public Float deserialize(final String topic, final byte[] data) {
}
return Float.intBitsToFloat(value);
}

@Override
public Float deserialize(String topic, Headers headers, ByteBuffer data) {
if (data == null) {
return null;
}

if (data.remaining() != 4) {
throw new SerializationException("Size of data received by Deserializer is not 4");
}
return data.getFloat(data.position());
}
}
Expand Up @@ -17,8 +17,12 @@
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;

import java.nio.ByteBuffer;

public class IntegerDeserializer implements Deserializer<Integer> {
@Override
public Integer deserialize(String topic, byte[] data) {
if (data == null)
return null;
Expand All @@ -33,4 +37,16 @@ public Integer deserialize(String topic, byte[] data) {
}
return value;
}

@Override
public Integer deserialize(String topic, Headers headers, ByteBuffer data) {
if (data == null) {
return null;
}

if (data.remaining() != 4) {
throw new SerializationException("Size of data received by IntegerDeserializer is not 4");
}
return data.getInt(data.position());
}
}
Expand Up @@ -17,8 +17,12 @@
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;

import java.nio.ByteBuffer;

public class LongDeserializer implements Deserializer<Long> {
@Override
public Long deserialize(String topic, byte[] data) {
if (data == null)
return null;
Expand All @@ -33,4 +37,16 @@ public Long deserialize(String topic, byte[] data) {
}
return value;
}

@Override
public Long deserialize(String topic, Headers headers, ByteBuffer data) {
if (data == null) {
return null;
}

if (data.remaining() != 8) {
throw new SerializationException("Size of data received by LongDeserializer is not 8");
}
return data.getLong(data.position());
}
}
Expand Up @@ -17,9 +17,13 @@
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;

import java.nio.ByteBuffer;

public class ShortDeserializer implements Deserializer<Short> {

@Override
public Short deserialize(String topic, byte[] data) {
if (data == null)
return null;
Expand All @@ -34,4 +38,16 @@ public Short deserialize(String topic, byte[] data) {
}
return value;
}

@Override
public Short deserialize(String topic, Headers headers, ByteBuffer data) {
if (data == null) {
return null;
}

if (data.remaining() != 2) {
throw new SerializationException("Size of data received by ShortDeserializer is not 2");
}
return data.getShort(data.position());
}
}
Expand Up @@ -17,8 +17,11 @@
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Utils;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;

Expand Down Expand Up @@ -50,4 +53,20 @@ public String deserialize(String topic, byte[] data) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}

@Override
public String deserialize(String topic, Headers headers, ByteBuffer data) {
if (data == null) {
return null;
}

try {
if (data.hasArray()) {
return new String(data.array(), data.position() + data.arrayOffset(), data.remaining(), encoding);
}
return new String(Utils.toArray(data), encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing ByteBuffer to string due to unsupported encoding " + encoding);
}
}
}
Expand Up @@ -17,8 +17,11 @@
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Utils;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -53,4 +56,22 @@ public UUID deserialize(String topic, byte[] data) {
throw new SerializationException("Error parsing data into UUID", e);
}
}

@Override
public UUID deserialize(String topic, Headers headers, ByteBuffer data) {
try {
if (data == null) {
return null;
}

if (data.hasArray()) {
return UUID.fromString(new String(data.array(), data.arrayOffset() + data.position(), data.remaining(), encoding));
}
return UUID.fromString(new String(Utils.toArray(data), encoding));
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing ByteBuffer to UUID due to unsupported encoding " + encoding, e);
} catch (IllegalArgumentException e) {
throw new SerializationException("Error parsing data into UUID", e);
}
}
}
Expand Up @@ -16,6 +16,10 @@
*/
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.header.Headers;

import java.nio.ByteBuffer;

public class VoidDeserializer implements Deserializer<Void> {
@Override
public Void deserialize(String topic, byte[] data) {
Expand All @@ -24,4 +28,12 @@ public Void deserialize(String topic, byte[] data) {

return null;
}

@Override
public Void deserialize(String topic, Headers headers, ByteBuffer data) {
if (data != null) {
throw new IllegalArgumentException("Data should be null for a VoidDeserializer.");
}
return null;
}
}
Expand Up @@ -55,6 +55,7 @@
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.HeartbeatResponseData;
Expand Down Expand Up @@ -302,6 +303,16 @@ public String deserialize(String topic, byte[] data) {
return super.deserialize(topic, data);
}
}

@Override
public String deserialize(String topic, Headers headers, ByteBuffer data) {
if (i == recordIndex) {
throw new SerializationException();
} else {
i++;
return super.deserialize(topic, headers, data);
}
}
};
}

Expand Down

0 comments on commit dd6690a

Please sign in to comment.