Skip to content

KAFKA-19357: AsyncConsumer#close hangs as commitAsync never completes when coordinator is missing #19914

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 26 commits into
base: trunk
Choose a base branch
from

Conversation

Mirai1129
Copy link
Contributor

@Mirai1129 Mirai1129 commented Jun 6, 2025

Problem: When AsyncConsumer is closing, CoordinatorRequestManager stops
looking for coordinator by returning EMPTY in poll() method when closing
flag is true. This prevents commitAsync() and other
coordinator-dependent operations from completing, causing close() to
hang until timeout.

Solution:
Modified the closing flag check in poll() method of
CommitRequestManager to be more targeted:

  • When both coordinators are unknown and the consumer is closing, only
    return EMPTY
  • When this condition is met, proactively fail all pending commit
    requests with CommitFailedException
  • This allows coordinator lookup to continue when coordinator is
    available during shutdown, while preventing indefinite hangs when
    coordinator is unreachable

@github-actions github-actions bot added triage PRs from the community consumer clients small Small PRs labels Jun 6, 2025
@@ -99,7 +99,7 @@ public void signalClose() {
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
if (closing || this.coordinator != null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the consumer find the coordinator during closing, right? If the consumer doesn't have a coordinator running, does it make sense to find one during closing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we do need to be careful. There is also code to make sure that finding a coordinator does not block the progress of closing. If you start a consumer when there are no running brokers, close needs to complete promptly. If you stop the brokers when a consumer has been running and then attempt a close, it also needs to complete promptly.

@@ -99,7 +99,7 @@ public void signalClose() {
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
if (closing || this.coordinator != null)
if (this.coordinator != null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, left a comment.
Should we add an IT for this scenario: when closing the consumer, commitAsync, and verify that the consumer can shut down properly?

@github-actions github-actions bot removed the triage PRs from the community label Jun 7, 2025
Copy link
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @Mirai1129! As @DL1231 mentioned, because closing the AsyncKafkaConsumer is such a tricky area, we really do need to have some tests to assure us that this works and doesn't break something else.

@Mirai1129
Copy link
Contributor Author

Hi everyone,

Thanks for all the feedback and suggestions! And sorry for late reply. I've updated the PR with the timeout handling solution - when CommitRequestManager times out, it will now immediately clear remaining callbacks to prevent system deadlock.

I'm currently working on adding comprehensive integration tests to cover this scenario and will push them shortly.

Copy link
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the coordinator might keep an unknown state and probably won't recover in time.
To prevent blocking the closing process, we should complete the pending commits request exceptionally (e.g., CommitFailedException) if the consumer is closing and the coordinator is unknown.
This could inform the user that the last commits have not been submitted correctly and also ensure a smooth closing process.
Thoughts?

Copy link
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Mirai1129: Thanks for the update.

Copy link
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank @Mirai1129 for this patch, left some comments

@Mirai1129 Mirai1129 requested a review from chia7712 July 14, 2025 09:24
Copy link
Collaborator

@TaiJuWu TaiJuWu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM assuming CI pass

Comment on lines 467 to 468
// Try without looking up the coordinator first
var callback = new CountConsumerCommitCallback();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this callback need this comment?

Copy link
Contributor Author

@Mirai1129 Mirai1129 Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually idk why, cuz I just duplicated testCommitAsyncCompletedBeforeConsumerCloses and modified it. But if it doesn't need let me remove it 🥹.

Copy link
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Just a couple of questions on the test. Thanks!

@TaiJuWu
Copy link
Collaborator

TaiJuWu commented Jul 17, 2025

@Mirai1129 could you merge trunk to get fix testMultipleRemoteFetchesInOneFetchRequest()?

consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)), callback);

long startTime = System.currentTimeMillis();
consumer.close(CloseOptions.timeout(Duration.ofMillis(500)));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

Suggested change
consumer.close(CloseOptions.timeout(Duration.ofMillis(500)));
assertDoesNotThrow(() -> consumer.close(CloseOptions.timeout(Duration.ofMillis(1000))));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't add much value to this test case.

Copy link
Collaborator

@Yunyung Yunyung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Could you update the PR description?

@Mirai1129
Copy link
Contributor Author

@Yunyung I've updated the solution! Could you please take a look at the changes when you have a moment? Thanks!

@Yunyung
Copy link
Collaborator

Yunyung commented Jul 18, 2025

causing close() to hang indefinitely.

Not very accurate, it should be "causing close() to hang until timeout".

Modified the closing flag check in poll() method of
CoordinatorRequestManager to be more targeted:

CoordinatorRequestManager -> CommitRequestManager

@Mirai1129
Copy link
Contributor Author

@Yunyung Thank you so much, I have updated the description.

@Mirai1129 Mirai1129 changed the title KAFKA-19357: AsyncConsumer#close hangs during closing because the commitAsync request never completes due to a missing coordinator KAFKA-19357: AsyncConsumer#close hangs as commitAsync never completes when coordinator is missing Jul 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants