Skip to content

Commit

Permalink
KAFKA-16507 Add KeyDeserializationException and ValueDeserializationE…
Browse files Browse the repository at this point in the history
…xception with record content
  • Loading branch information
fred-ro committed Apr 22, 2024
1 parent f22ad66 commit 2938be3
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 20 deletions.
5 changes: 5 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@
<allow pkg="org.apache.kafka.common" />
</subpackage>

<subpackage name="errors">
<allow class="org.apache.kafka.common.header.Headers" />
<allow class="org.apache.kafka.common.record.TimestampType" />
</subpackage>

</subpackage>

<subpackage name="clients">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.KeyDeserializationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.ValueDeserializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.message.FetchResponseData;
Expand Down Expand Up @@ -311,25 +312,33 @@ <K, V> ConsumerRecord<K, V> parseRecord(Deserializers<K, V> deserializers,
Optional<Integer> leaderEpoch,
TimestampType timestampType,
Record record) {
long offset = record.offset();
long timestamp = record.timestamp();
ByteBuffer keyBytes = record.key();
ByteBuffer valueBytes = record.value();
Headers headers = new RecordHeaders(record.headers());
K key;
V value;
try {
long offset = record.offset();
long timestamp = record.timestamp();
Headers headers = new RecordHeaders(record.headers());
ByteBuffer keyBytes = record.key();
K key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
ByteBuffer valueBytes = record.value();
V value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
timestamp, timestampType,
keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(),
valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(),
key, value, headers, leaderEpoch);
key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
} catch (RuntimeException e) {
log.error("Deserializers with error: {}", deserializers);
throw new RecordDeserializationException(partition, record.offset(),
"Error deserializing key/value for partition " + partition +
throw new KeyDeserializationException(partition, offset, keyBytes, valueBytes, headers, timestampType, record.timestamp(),
"Error deserializing key for partition " + partition +
" at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
}
try {
value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
} catch (RuntimeException e) {
throw new ValueDeserializationException(partition, offset, keyBytes, valueBytes, headers, timestampType, record.timestamp(),
"Error deserializing value for partition " + partition +
" at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
}
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
timestamp, timestampType,
keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(),
valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(),
key, value, headers, leaderEpoch);

}

private Optional<Integer> maybeLeaderEpoch(int leaderEpoch) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;

import java.nio.ByteBuffer;

public class KeyDeserializationException extends RecordDeserializationException {
private final static long serialVersionUID = 1L;
public KeyDeserializationException(TopicPartition partition,
long offset,
ByteBuffer key,
ByteBuffer value,
Headers headers,
TimestampType timestampType,
long timestamp,
String message,
Throwable cause) {
super(partition, offset, key, value, headers, timestampType, timestamp, message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,60 @@
package org.apache.kafka.common.errors;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Utils;

import java.nio.ByteBuffer;

/**
* This exception is raised for any error that occurs while deserializing records received by the consumer using
* the configured {@link org.apache.kafka.common.serialization.Deserializer}.
*/
public class RecordDeserializationException extends SerializationException {

private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 2L;
private final TopicPartition partition;
private final long offset;
private final TimestampType timestampType;
private final long timestamp;
private final ByteBuffer key;
private final ByteBuffer value;
private final Headers headers;

public RecordDeserializationException(TopicPartition partition, long offset, String message, Throwable cause) {
@Deprecated
public RecordDeserializationException(TopicPartition partition,
long offset,
String message,
Throwable cause) {
super(message, cause);
this.partition = partition;
this.offset = offset;
this.timestampType = TimestampType.NO_TIMESTAMP_TYPE;
this.timestamp = 0;
this.key = null;
this.value = null;
this.headers = null;
}

// New constructor
protected RecordDeserializationException(TopicPartition partition,
long offset,
ByteBuffer key,
ByteBuffer value,
Headers headers,
TimestampType timestampType,
long timestamp,
String message,
Throwable cause) {
super(message, cause);
this.offset = offset;
this.timestampType = timestampType;
this.timestamp = timestamp;
this.partition = partition;
this.key = key;
this.value = value;
this.headers = headers;
}

public TopicPartition topicPartition() {
Expand All @@ -41,4 +80,24 @@ public TopicPartition topicPartition() {
public long offset() {
return offset;
}

public TimestampType timestampType() {
return timestampType;
}

public long timestamp() {
return timestamp;
}

public byte[] key() {
return Utils.toNullableArray(key);
}

public byte[] value() {
return Utils.toNullableArray(value);
}

public Headers headers() {
return headers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;

import java.nio.ByteBuffer;

public class ValueDeserializationException extends RecordDeserializationException {
private final static long serialVersionUID = 1L;
public ValueDeserializationException(TopicPartition partition,
long offset,
ByteBuffer key,
ByteBuffer value,
Headers headers,
TimestampType timestampType,
long timestamp,
String message,
Throwable cause) {
super(partition, offset, key, value, headers, timestampType, timestamp, message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.KeyDeserializationException;
import org.apache.kafka.common.errors.ValueDeserializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -48,7 +51,9 @@
import java.util.List;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class CompletedFetchTest {
Expand Down Expand Up @@ -161,6 +166,10 @@ public void testCorruptedMessage() {
final UUIDSerializer serializer = new UUIDSerializer()) {
builder.append(new SimpleRecord(serializer.serialize(TOPIC_NAME, UUID.randomUUID())));
builder.append(0L, "key".getBytes(), "value".getBytes());
builder.append(new SimpleRecord(serializer.serialize(TOPIC_NAME, UUID.randomUUID())));
Headers headers = new RecordHeaders();
headers.add("hkey", "hvalue".getBytes());
builder.append(10L, serializer.serialize("key", UUID.randomUUID()), "otherValue".getBytes(), headers.toArray());
Records records = builder.build();

FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
Expand All @@ -176,8 +185,27 @@ public void testCorruptedMessage() {

completedFetch.fetchRecords(fetchConfig, deserializers, 10);

assertThrows(RecordDeserializationException.class,
KeyDeserializationException thrown = assertThrows(KeyDeserializationException.class,
() -> completedFetch.fetchRecords(fetchConfig, deserializers, 10));
assertEquals(1, thrown.offset());
assertEquals(TOPIC_NAME, thrown.topicPartition().topic());
assertEquals(0, thrown.topicPartition().partition());
assertEquals(0, thrown.timestamp());
assertArrayEquals("key".getBytes(), thrown.key());
assertArrayEquals("value".getBytes(), thrown.value());
assertEquals(0, thrown.headers().toArray().length);

CompletedFetch completedFetch2 = newCompletedFetch(2, partitionData);
completedFetch2.fetchRecords(fetchConfig, deserializers, 10);
ValueDeserializationException valueThrown = assertThrows(ValueDeserializationException.class,
() -> completedFetch2.fetchRecords(fetchConfig, deserializers, 10));
assertEquals(3, valueThrown.offset());
assertEquals(TOPIC_NAME, valueThrown.topicPartition().topic());
assertEquals(0, valueThrown.topicPartition().partition());
assertEquals(10L, valueThrown.timestamp());
assertNotNull(valueThrown.key());
assertArrayEquals("otherValue".getBytes(), valueThrown.value());
assertEquals(headers, valueThrown.headers());
}
}
}
Expand Down

0 comments on commit 2938be3

Please sign in to comment.