Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16599: LegacyConsumer should always await pending async commits on commitSync and close #15693

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

lucasbru
Copy link
Member

The javadoc for KafkaConsumer.commitSync says:

Note that asynchronous offset commits sent previously with the {https://github.com/link #commitAsync(OffsetCommitCallback)}
(or similar) are guaranteed to have their callbacks invoked prior to completion of this method.

This is not always true in the legacy consumer, when the set of offsets is empty, the execution of the commit callback is not always awaited. There are also various races possible that can avoid callback handler execution.

Similarly, there is code in the legacy consumer to await the completion of the commit callback before closing, however, the code doesn't cover all cases and the behavior is therefore inconsistent. While the javadoc doesn't explicitly promise callback execution, it promises "completing commits", which one would reasonably expect to include callback execution. Either way, the current behavior of the legacy consumer is inconsistent.

This change proposed a number of fixes to clean up the callback execution guarantees:

  • We also need to await async commits that are "pending" instead of "in-flight", because we do not know the coordinator yet.
  • In close, we need do not only execute the commit listeners of "pending" commits, but also those of "in-flight" commits.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@lucasbru
Copy link
Member Author

@lianetm You have reviewed those changes already as part of #15613 .

lucasbru added a commit that referenced this pull request Apr 22, 2024
…itSync and close (#15613)

The javadoc for KafkaConsumer.commitSync says:

Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
(or similar) are guaranteed to have their callbacks invoked prior to completion of this method.

This is not always true in the async consumer, where there is no code at all to make sure that the callback is executed before commitSync returns.

Similarly, the async consumer is also missing logic to await callback execution in close. While the javadoc doesn't explicitly promise callback execution, it promises "completing commits", which one would reasonably expect to include callback execution. Also, the legacy consumer contains some code to execute callbacks before closing.

This change proposed a number of fixes to clean up the callback execution guarantees in the async consumer:

We keep track of the incomplete async commit
futures and wait for them to complete before returning from
commitSync or close (if there is time).
Since we need to block to make sure that our previous commits are
completed, we allow the consumer to wake up.
Some similar gaps are addressed in the legacy consumer, see #15693

Testing
Two new integration tests and a couple of unit tests.

Reviewers: Bruno Cadonna <cadonna@apache.org>, Kirk True <ktrue@confluent.io>, Lianet Magrans <lianetmr@gmail.com>
@lucasbru lucasbru changed the title KAFKA-16103: LegacyConsumer should always await pending async commits on commitSync and close KAFKA-16599: LegacyConsumer should always await pending async commits on commitSync and close Apr 22, 2024
@lucasbru lucasbru force-pushed the kip848_await_async_commit_legacy branch from a666908 to 59b2e10 Compare April 22, 2024 16:03
Copy link
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @lucasbru.

Just some brief comments/questions, nothing concerning.

Comment on lines +987 to +988
// Super-class close may wait for more commit callbacks to complete.
invokeCompletedOffsetCommitCallbacks();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar enough with this code. What situations would we want to invoke callbacks after we've closed the coordinator?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may async commits that are completed during coordinator close. This is done to provide the guarantee that we want to complete async commits in Consumer.close (as long as we don't timeout). We also provide the guarantee that if an async commit completes, the corresponding callback is executed. So then we need to execute callbacks here.

This change could cause issues if we somehow interact with the consumer within the commit callback, but the consumer is already partially closed. Note that callback is already called during close (further above) where the coordinator isn't called yet. So essentially, this change will just make it "more commonplace" to call the callbacks during close.

But it would be an option to just leave the legacy consumer to have somewhat inconsistent behavior here, and just change it for the new consumer. WDYT?

Comment on lines -1043 to +1053
pendingAsyncCommits.decrementAndGet();
completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
try {
completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
new RetriableCommitFailedException(e)));
} finally {
pendingAsyncCommits.decrementAndGet();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand moving the counter decrement to a finally block, but are we expecting that the add() method or constructor would throw an exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is just defensive programming

commitException = new RetriableCommitFailedException(e);
}
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
if (commitException instanceof FencedInstanceIdException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you didn't write the original logic, but it seems funny to compare the exception against a FencedInstanceIdException here if up above commitException was converted to a RetriableCommitFailedException. I guess there's not a better way to construct this without potentially changing the logic to be incorrect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FencedInstanceIdException is not retriable, so this shouldn't make a difference, right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants