Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5211: Do not skip a corrupted record in consumer #3114

Closed
wants to merge 5 commits into from

Conversation

becketqin
Copy link
Contributor

No description provided.

@becketqin
Copy link
Contributor Author

ping @lindong28 @hachikuji

@asfbot
Copy link

asfbot commented May 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4235/
Test FAILed (JDK 8 and Scala 2.12).

@hachikuji
Copy link

@becketqin Thanks for the patch. Can you clarify the expected behavior? Do we only avoid skipping records on checksum failures?

@becketqin
Copy link
Contributor Author

@hachikuji I was not sure about that when I was writing the patch. There are two kinds of exceptions thrown here. The corrupted message and the deserialization error. For corrupted message we should absolutely handle. For deserialization error, either the consumer or the deserializer can handle, I feel that it is more intuitive to let the deserializer to handle the deserialization error, i.e. retry if failed.

That being said, it seems that in the past we do not skip over deserialization either. So keeping it that way is probably also be fine. Do you prefer not skipping messages in either cases?

@hachikuji
Copy link

@becketqin I've been thinking about it and I'm still not sure the best option. The point you made in the JIRA about skipping over corrupted transaction markers is a good one, so I feel we're restricted in that case. For parsing errors, it's a bit trickier. One option that we've toyed around with on several occasions is making the parsing lazy (i.e. moving it into ConsumerRecord). In that case, the consumer's offset would be advanced when the record is returned and it would only be when the user calls key() or value() that an exception is raised. This is effectively like skipping over the failed records. The risk there is that the user will now be expected to rewind the offset to avoid having it committed (if that is indeed the behavior they want).

Personally, I don't like having the consumer skip records internally because it introduces an inconsistency with the behavior of the consumer given its current position (i.e. what I tried to explain before; that the result of a fetch depends not only on the current state, but the previous history). So if we want to let the consumer skip past parsing errors, then I think the lazy parsing would be a better option. However, I'd be a little reluctant to implement this for the current release, so my preference is probably to not do any skipping for now. What do you think?

@becketqin
Copy link
Contributor Author

@hachikuji Thanks for the explanation. That makes sense. I'll change the patch to just block on both message corruption and parsing error.

@asfbot
Copy link

asfbot commented May 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4336/
Test FAILed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4323/
Test PASSed (JDK 8 and Scala 2.12).

Copy link

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@becketqin Thanks for the patch and sorry for the delay reviewing. I left a few comments.

@@ -559,8 +545,6 @@ private long endTimestamp() {
} catch (KafkaException e) {
if (fetched.isEmpty())
throw e;
// To be thrown in the next call of this method
nextInLineExceptionMetadata = new ExceptionMetadata(fetchedPartition, fetchedOffset, e);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to consolidate the error handling (I was also wondering if it was possible). Since we're catching the exception in PartitionRecords.fetchRecords, do we still need this try/catch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This try/catch is needed because when PartitionRecords.fetchRecords is thrown, it is possible that there has already been some other records fetched from previous PartitionRecords in this case, we need to return those fetched records and delay the exception.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Makes sense.

currentBatch.ensureValid();
} else {
currentBatch = batches.next();
maybeEnsureValid(currentBatch);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set lastRecord to null here? Otherwise it seems like we might misinterpret which record had the error.

// We do not cache the exception thrown last time because the stack trace could be different
// from last time.
if (hasExceptionInLastFetch) {
maybeEnsureValid(lastRecord);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the possibilities for a corrupt record is an error transmitting over the network (the TCP checksum is only 2 bytes). We could recover from this error by discarding the current batch and refetching from the failed offset. The downside is, well, we have to refetch. In practice, I assume this would be super rare, but maybe it's still worth allowing for the possibility? For parsing errors, refetching may not help, so caching the error seems fair.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was wondering about that, too. We do the refetch for replica fetchers on the brokers. I was not doing refetch in the patch mostly due to the additional complexity (backoff time or maximum retries etc). Given that is an improvement for a very edge case and we need this fix for 0.11.0, maybe we can do that in a follow up patch?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do it in a follow-up if you prefer. I was thinking it was as simple as setting isFetched to true, but I could be wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. I was thinking that we may need to give it a couple of retries and a back off for each retry then throw exception. But it seems that we can just retry immediately and only once. If that is the case, we don't have to worry about the backoff/retries.

if (!currentBatch.isControlBatch())
return record;
// If an exception was thrown from the last record. We should throw the same exception here again.
// We do not cache the exception thrown last time because the stack trace could be different

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think caching the exception may actually be OK. For both SerializationException and InvalidRecordException, we've overridden fillInStackTrace anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I didn't notice that. Personally I still slightly prefer going through the stack again because the deserializer is user code so it may not throw exception when parse the same record again. For example, maybe a schema is registered after the last parse.

Copy link
Contributor Author

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Thanks for the review. Just updated the patch to address the comments.

@@ -559,8 +545,6 @@ private long endTimestamp() {
} catch (KafkaException e) {
if (fetched.isEmpty())
throw e;
// To be thrown in the next call of this method
nextInLineExceptionMetadata = new ExceptionMetadata(fetchedPartition, fetchedOffset, e);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This try/catch is needed because when PartitionRecords.fetchRecords is thrown, it is possible that there has already been some other records fetched from previous PartitionRecords in this case, we need to return those fetched records and delay the exception.

if (!currentBatch.isControlBatch())
return record;
// If an exception was thrown from the last record. We should throw the same exception here again.
// We do not cache the exception thrown last time because the stack trace could be different
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I didn't notice that. Personally I still slightly prefer going through the stack again because the deserializer is user code so it may not throw exception when parse the same record again. For example, maybe a schema is registered after the last parse.

// We do not cache the exception thrown last time because the stack trace could be different
// from last time.
if (hasExceptionInLastFetch) {
maybeEnsureValid(lastRecord);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was wondering about that, too. We do the refetch for replica fetchers on the brokers. I was not doing refetch in the patch mostly due to the additional complexity (backoff time or maximum retries etc). Given that is an improvement for a very edge case and we need this fix for 0.11.0, maybe we can do that in a follow up patch?

@asfbot
Copy link

asfbot commented May 29, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4522/
Test FAILed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 29, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4507/
Test PASSed (JDK 8 and Scala 2.12).

} else {
lastRecord = null;
currentBatch = batches.next();
maybeEnsureValid(currentBatch);
Copy link

@hachikuji hachikuji May 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't sure if you were pushing another change, but I'll go ahead and leave a comment. I'm unclear what should happen if the checksum fails on the batch. It seems like we would still catch the exception and set hasExceptionInLastFetch to true. But then we would fail because we are trying to call maybeEnsureValid on lastRecord which is now null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a batch validation failed, next time when it comes into the loop it will go to line 1001 and hit the same validation error again right? So we will not continue to record validation. But it is true that we should do that before line 995. I'll update patch to fix that.

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4601/
Test FAILed (JDK 7 and Scala 2.11).

@becketqin
Copy link
Contributor Author

@hachikuji Thanks for the review. I updated the patch and added a unit test for invalid DefaultRecordBatch. I gave it a shot on the refetch as well but I feel it is might be a little too much. There are a few reasons:

  1. In order to ensure it was the same batch failed validation, we need to keep track of this state and clear it up when necessary.
  2. Now we need to always check to see if a batch is a refetch or not. It is cheap but still feels a little weird to do that for such an edge case.
  3. Regardless we do the refetch or not, users that care about message loss will always have to handle the exceptions.

So I left the refetch out of this patch.

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4586/
Test PASSed (JDK 8 and Scala 2.12).

Copy link

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the patch and the test cases.

@hachikuji
Copy link

I think the failing test case has a dependence on HashMap ordering. I will fix it when I merge.

asfgit pushed a commit that referenced this pull request May 31, 2017
Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3114 from becketqin/KAFKA-5211

(cherry picked from commit d082563)
Signed-off-by: Jason Gustafson <jason@confluent.io>
@asfgit asfgit closed this in d082563 May 31, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants