Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15562: ensure commit request manager handles errors correctly #14639

Conversation

philipnee
Copy link
Collaborator

@philipnee philipnee commented Oct 25, 2023

The current commitRequestManager lacks logic handling various failures. In the patch I addressed the following gap:

  1. Network disconnection should disconnect the coordinator node
  2. Handling and testing various of fatal and retriable errors scenarios
  3. Ensure the time returned in the poll results is Long.MAX or the minimum of all the inflight request's remainingBackoffMs. Because we need to respect the backoff.

@philipnee philipnee marked this pull request as ready for review October 25, 2023 21:03
@philipnee philipnee added the ctr label Oct 25, 2023
@lucasbru
Copy link
Member

Thanks @philipnee, I left some comments

@philipnee
Copy link
Collaborator Author

Hi @lucasbru - Thanks for taking the time to review my PR. I addressed all but 2 comments:

  1. Mockito: Could you be more specific on how you expect to mock the response object?
  2. Error handling: Essentially all errors in continueHandlePartitionErrors can only happen in the response. I understand there are some redundancy there and can be a bit confusing. But the response and both throw a hard failures (response = null and throwable = non-null) or a server side error (response = non-null). That is why it was kept separated. If you find it unclear - how do I make it more readable?

log.debug("Offset fetch failed: {}", responseError.message());

// TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ?
if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: merge first two branches now?

@lucasbru
Copy link
Member

Changes look good to me. Did I understand david correctly that we want to add more changes to this PR?

  1. Mockito: Could you be more specific on how you expect to mock the response object?

That was just "out of interest", since you are already using mockito to mock things, why not use it for the response object as well. But I realize that the object is a pretty plain data container, so the answer I guess is that it's easier.

  1. Error handling: Essentially all errors in continueHandlePartitionErrors can only happen in the response. I understand there are some redundancy there and can be a bit confusing. But the response and both throw a hard failures (response = null and throwable = non-null) or a server side error (response = non-null). That is why it was kept separated. If you find it unclear - how do I make it more readable?

That's not what I meant. I just found the flow onResponse a bit hard to read, so I suggested structuring it a bit differently. In particular, continueHandlePartitionErrors sounds like that it's determining whether to continue handling errors, but it's actually completing the future inside.

I think the control flow may also lead to logging in a way that may not be intended? For fencing, we get one generic log.error message and one very similar log.info message with some specific details. For topic authorization errors, we get one log.error message for each partition, and then another aggregated log.error message, listing the partitions again. However, if we have 3 unauthorized topic errors and then one other error, we get three generic error messages for the topics, but we do not get the aggregated error message. Is that trying to be consistent with the old consumer?

I imagined something like:

if (error == Errors.NONE) {
    log.debug("OffsetCommit {} for partition {}", offset, tp);
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
    // Collect all unauthorized topics before failing
    unauthorizedTopics.add(tp.topic());
} else if (error instanceof RetriableException) {
    log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, error.message());
    handleRetriableError(error);
    retry(responseTime);
    return;
} else {
    log.error("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, error.message());
    handleFatalError(error);
    return;
}

But I'm not super familiar with the code style in the new consumer, and consistency with the rest of the code and the old consumer is also important. So I just wanted to give this to you as an inspiration, but you are probably in the best position to come up with the best way to implement it.

@philipnee
Copy link
Collaborator Author

Hi @lucasbru - Thanks for the response. To your first question: Yes, what is left is correcting the error handling at the user API level. At the time I implemented this, I wrapped all errors in KafkaException (I forgot the reason why), however, we should throw the exact exception instead. So I will follow up with a second part PR to make this Jira issue completed.

I get what you meant for onResponse and I think your suggestion makes a lot of sense. I'll patch it accordingly.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're having duplicate logging now.

continue;
}

if (error.exception() instanceof RetriableException) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to remove this if/else block completely now

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, as well as the error == Errors.NONE

@philipnee
Copy link
Collaborator Author

@lucasbru - Thanks for taking time reviewing this PR. I addressed your previous comment. Let me know if there's anything unclear on this PR.

@philipnee philipnee force-pushed the kafka-15562-ensure-commit-request-manager-handles-errors-correctly branch from 72a6de9 to 91f44f6 Compare October 31, 2023 18:48
trigger test
@philipnee
Copy link
Collaborator Author

Test failures seem irrelevant, but retriggering the test as there's a failed build

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks, @philipnee !

@lucasbru
Copy link
Member

lucasbru commented Nov 2, 2023

Test failures unrelated

@lucasbru lucasbru merged commit 7b0c076 into apache:trunk Nov 2, 2023
1 check failed
yyu1993 pushed a commit to yyu1993/kafka that referenced this pull request Feb 15, 2024
…pache#14639)

The current commitRequestManager lacks logic handling various failures. In the patch I addressed the following gap:

 * Network disconnection should disconnect the coordinator node
 * Handling and testing various of fatal and retriable errors scenarios
 * Ensure the time returned in the poll results is Long.MAX or the minimum of all the inflight request's remainingBackoffMs. Because we need to respect the backoff.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
AnatolyPopov pushed a commit to aiven/kafka that referenced this pull request Feb 16, 2024
…pache#14639)

The current commitRequestManager lacks logic handling various failures. In the patch I addressed the following gap:

 * Network disconnection should disconnect the coordinator node
 * Handling and testing various of fatal and retriable errors scenarios
 * Ensure the time returned in the poll results is Long.MAX or the minimum of all the inflight request's remainingBackoffMs. Because we need to respect the backoff.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants