Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5682: Include partitions in exceptions raised during consumer record deserialization/validation #5410

Closed
wants to merge 6 commits into from

Conversation

stanislavkozlovski
Copy link
Contributor

@stanislavkozlovski stanislavkozlovski commented Jul 21, 2018

JIRA: KAFKA-5682
KIP: KIP-334
Mailing Thread: thread

  • Introduce new Exception - RecordDeserializationException - it extends SerializationException for backwards compatibility. Throw that exception where appropriate with attached partition/offset.
  • Introduce new Exception - InoperativeRecordException - it extends KafkaException for backwards compatibility. Throw that exception where appropriate with attached partition/offset.
  • Introduce new Interface - UnconsumableRecordException. InoperativeRecordException and RecordDeserializationException implement it
  • Make InvalidRecordException extend ApiException. It previously extended CorruptRecordException and thrown only for corrupt record scenarios. (as it's not in a public package).
  • Add some tests for KafkaConsumer#poll()

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

…ionException in consumers

Consumers would previously raise `SerializationException` with a message recommending the user seek past the failed offset at the given partition. The partition and offset was not given to the user and he had no way to know which partition caused the errors apart from manually parsing the message.
This new exception type contains the topic, partition and offset data in it
…ptRecordException` (public)

Also replace some usages to raise `CorruptRecordException`. this will allow subsequently raised `KafkaException` to add the exception as the cause
@stanislavkozlovski
Copy link
Contributor Author

@hachikuji Please review when possible

else if (cachedRecordException instanceof RecordDeserializationException)
corruptOffset = ((RecordDeserializationException) cachedRecordException).offset();
else
corruptOffset = nextFetchOffset;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not absolutely sure if setting it to the nextFetchOffset is correct, but I figured it's the only thing to do.
We enter this if (corruptLastRecord) block if Fetcher#nextFetchedRecord() throws

…tion` when consumer encounters corrupted/invalid record

This new exception contains the partition and offset information, allowing the user to skip past the record
* Make `RecordDeserializationException` and `InoperativeRecordException` internal classes
* Add new interface that exposes topic and partition for the given exception
…erializationException` for `FaultyRecordException`
@stanislavkozlovski
Copy link
Contributor Author

Closing in favor of #7499

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant