New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10337: await async commits in commitSync even if no offsets given #13678
Conversation
This PR is a copy of #9111 so all credits go to @thomaslee. |
The failing tests are not related to this PR. |
@philipnee Did you get a chance to look at this PR already? |
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
Outdated
Show resolved
Hide resolved
Hey @erikvanoosten, Thanks for the PR! Could you add checks for inflightCommits count gets set to 0 in a few of the callback testing function like |
Sure, I'll try. I am new in this code base and these changes are not from me, but I'll try nonetheless. |
@showuon - do you have a cycle to take a look at this issue? |
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
Show resolved
Hide resolved
Another small weirdness is that when |
I looked for all invocations of commitAsync and added additional asserts in 1fa48f53f0e6f5a2a9821075fa053e01cba6b0b2. |
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
Outdated
Show resolved
Hide resolved
@erikvanoosten - Thanks for following up on this PR, I think we are really close here. Also apologize about the misleading comment. I left a few comments above. |
ef9ce53
to
1c033b6
Compare
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
Outdated
Show resolved
Hide resolved
@philipnee Shall we merge? The failing tests are not related to this PR. |
@dajac - Would you have some time to review and help Erik to merge this? I've done a pass and I think it's okay. |
@philipnee I will try to review it this week. Thanks! |
Hi @dajac, did you already get the chance to look at this PR? |
@erikvanoosten I am sorry but I haven't had the time to get to it yet. Will do! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@erikvanoosten I finally had time to make a pass over it. Overall, it looks good to me. I left one minor comment. Could you take a look?
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I think it looks good.
The failures seem unrelated
|
🥳 What is the process now? Who will press the merge button? |
I'm leaving it for David to press that button! |
2bfedf0
to
78881f3
Compare
Commits were squashed. No further changes. |
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
Outdated
Show resolved
Hide resolved
The contract for commitSync() guarantees that the callbacks for all prior async commits will be invoked before it (successfully?) returns. Prior to this change the contract could be violated if an empty offsets map were passed in to commitSync(). Co-authored-by: Erik van Oosten <e.vanoosten@grons.nl> Co-authored-by: Philip Nee <philipnee@gmail.com>
78881f3
to
e4ba315
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the patch @erikvanoosten! I will merge when the build completes.
@erikvanoosten It seems that you don't have an account for jira so I can't assign the ticket to you. You should request one. |
I do have an account. My Jira userid is erikvanoosten. |
The contract for commitSync() guarantees that the callbacks for all prior async commits will be invoked before it (successfully?) returns. Prior to this change the contract could be violated if an empty offsets map were passed in to commitSync().
Committer Checklist (excluded from commit message)