New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16507 Add raw record into RecordDeserialisationException #15691
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @fred-ro!
Since parseRecord
catches all RuntimeException
s, it's very forgiving. No doubt most of the time the errors it catches are related to deserialization issues. However, with this change, the catch
block would now also call methods on the incoming Record
. Is it possible that an error could be thrown in our efforts to create a ConsumerRecord
for the exception?
Perhaps we can rely on the existing upstream code that creates the Record
to prevent that?
Anyway, just wondering 🤔
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java
Outdated
Show resolved
Hide resolved
3d88fcf
to
1e2dcdf
Compare
1e2dcdf
to
eee8b2b
Compare
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/errors/KeyDeserializationException.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java
Show resolved
Hide resolved
2938be3
to
c3ccd58
Compare
…xception with record content
c3ccd58
to
526803e
Compare
log.error("Deserializers with error: {}", deserializers); | ||
throw new RecordDeserializationException(partition, record.offset(), | ||
"Error deserializing key/value for partition " + partition + | ||
throw new RecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about using a helper method like this to throw the exception?
private void throwRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin origin,
TopicPartition partition,
TimestampType timestampType,
Record record,
RuntimeException e) {
throw new RecordDeserializationException(origin, partition, record.offset(), record.timestamp(), timestampType, record.key(),
record.value(), new RecordHeaders(record.headers()), "Error deserializing " + origin.toString() + " for partition "
+ partition + " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
}
Add raw ConsumerRecord data to RecordDeserialisationException to make DLQ implementation easier
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)