-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19357: AsyncConsumer#close hangs as commitAsync never completes when coordinator is missing #19914
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
@@ -99,7 +99,7 @@ public void signalClose() { | |||
*/ | |||
@Override | |||
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { | |||
if (closing || this.coordinator != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes the consumer find the coordinator during closing, right? If the consumer doesn't have a coordinator running, does it make sense to find one during closing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we do need to be careful. There is also code to make sure that finding a coordinator does not block the progress of closing. If you start a consumer when there are no running brokers, close needs to complete promptly. If you stop the brokers when a consumer has been running and then attempt a close, it also needs to complete promptly.
@@ -99,7 +99,7 @@ public void signalClose() { | |||
*/ | |||
@Override | |||
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { | |||
if (closing || this.coordinator != null) | |||
if (this.coordinator != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, left a comment.
Should we add an IT for this scenario: when closing the consumer, commitAsync, and verify that the consumer can shut down properly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @Mirai1129! As @DL1231 mentioned, because closing the AsyncKafkaConsumer
is such a tricky area, we really do need to have some tests to assure us that this works and doesn't break something else.
Merge branch 'trunk' into KAFKA-19357
…inatorRequestManager`
Hi everyone, Thanks for all the feedback and suggestions! And sorry for late reply. I've updated the PR with the timeout handling solution - when I'm currently working on adding comprehensive integration tests to cover this scenario and will push them shortly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that the coordinator might keep an unknown state and probably won't recover in time.
To prevent blocking the closing process, we should complete the pending commits request exceptionally (e.g., CommitFailedException
) if the consumer is closing and the coordinator is unknown.
This could inform the user that the last commits have not been submitted correctly and also ensure a smooth closing process.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Mirai1129: Thanks for the update.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank @Mirai1129 for this patch, left some comments
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM assuming CI pass
// Try without looking up the coordinator first | ||
var callback = new CountConsumerCommitCallback(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this callback need this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually idk why, cuz I just duplicated testCommitAsyncCompletedBeforeConsumerCloses
and modified it. But if it doesn't need let me remove it 🥹.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Just a couple of questions on the test. Thanks!
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
@Mirai1129 could you merge trunk to get fix testMultipleRemoteFetchesInOneFetchRequest()? |
consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)), callback); | ||
|
||
long startTime = System.currentTimeMillis(); | ||
consumer.close(CloseOptions.timeout(Duration.ofMillis(500))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
consumer.close(CloseOptions.timeout(Duration.ofMillis(500))); | |
assertDoesNotThrow(() -> consumer.close(CloseOptions.timeout(Duration.ofMillis(1000)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't add much value to this test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Could you update the PR description?
@Yunyung I've updated the solution! Could you please take a look at the changes when you have a moment? Thanks! |
Not very accurate, it should be "causing close() to hang until timeout".
CoordinatorRequestManager -> CommitRequestManager |
@Yunyung Thank you so much, I have updated the description. |
Problem: When AsyncConsumer is closing, CoordinatorRequestManager stops
looking for coordinator by returning EMPTY in poll() method when closing
flag is true. This prevents commitAsync() and other
coordinator-dependent operations from completing, causing close() to
hang until timeout.
Solution:
Modified the closing flag check in poll() method of
CommitRequestManager to be more targeted:
return EMPTY
requests with CommitFailedException
available during shutdown, while preventing indefinite hangs when
coordinator is unreachable