KAFKA-20332: Fix to ensure app thread not collecting records for partitions being revoked#21897
Conversation
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Thanks for the PR. I'm not sure whether this is tight enough. I'm not quite the expert in this area and it's certainly better, but I'm thinking about the following:
maybeReconcilehas asynchronous actions, including committing if auto-commit is enabled.- This auto-commit can complete asynchronously relative to
maybeReconcile. - The revocation doesn't happen until the commit has completed, but it is inevitable because it will be performed in the completion lambda for the commit.
- The new flag in the AsyncPollEvent will be set as soon as
maybeReconcilereturns, but that's not quite the same as it having completed.
The key seems to be that the application thread has a stable view of the SubscriptionState which it can check in order to discard records from revoked partitions. Mightn't the SubscriptionState change a little later once the commit completes?
| * so the app thread can safely process to fetch/collect records. | ||
| */ | ||
| public boolean isReconciliationCheckComplete() { | ||
| return pendingReconciliationChecked; |
There was a problem hiding this comment.
To follow the convention you used with isValidatePositionsComplete, this ought to be isReconciliationCheckComplete.
This case is already handled because we mark the partitions as non-fetchable right before issuing the commit ( You're totally right that in if auto-commit enabled, the revocation will happen when that commit completes, outside the app thread scope, but that's fine (because the partitions were already marked as pending revocation, which is the key to ensure the app thread does not fetch/collect for them while the async operations complete). Makes sense? |
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Thanks for the PR. Looks good to me.
| } | ||
|
|
||
| /** | ||
| * |
There was a problem hiding this comment.
Super nitpicky: extra line.
| if (inflightPoll != null && !inflightPoll.isReconciliationCheckComplete()) { | ||
| return Fetch.empty(); | ||
| } |
There was a problem hiding this comment.
My main concern is if this affect the CPU performance 🤔
When reconciliation hasn't occurred and this method returns an empty Fetch, the calling code will "fall through" to block on the FetchBuffer. Having the application thread block for the background thread to produce data really hurts the CPU load metrics.
But in the end, correctness wins. It would be ideal if we could see if this change increases CPU load, and if so, by how much.
There was a problem hiding this comment.
Good call out about the empty leading to blocking on buffer data. Made me realize that even though we have to wait (for correctness), all we do is really to wait for the background to do the reconciliation check. Made the improvement, let me know your thoughts, thanks!
There was a problem hiding this comment.
It's not necessarily the length of time we wait, it's that we enter a waiting state at all. Even if the waiting time was literally 0 ns, the overhead of the thread scheduling is what matters. So blocking on the reconciliation check is still going to incur the thread scheduler's wrath 😄
That said, I'm definitely in the correctness before performance camp, so let's go ahead and merge this to resolve the core bug. It won't tank CPU load, but it might be a small step backward in those efforts.
There was a problem hiding this comment.
yeah, agree, the good thing is that here we only wait if !inflightPoll.isReconciliationCheckComplete() and that check should be done most of the time I expect (background only needs to check/set some vars to mark this done)
| public PollResult poll(final long currentTimeMs) { | ||
| maybeReconcile(true); | ||
| return PollResult.EMPTY; | ||
| } |
There was a problem hiding this comment.
Is this directly related to this change? I assume that the changes made to maybeReconcile() trigger the need to add this.
There was a problem hiding this comment.
Also, do we need a new unit test to validate this behavior? Or is it covered by an existing test?
There was a problem hiding this comment.
This is to keep the existing behaviour in the shareconsumer, and it's already covered (the tests were the ones that catch this gap for me :) many share failed when I hadn't added this initially)
| AtomicReference<AsyncPollEvent> capturedEvent = new AtomicReference<>(); | ||
| doAnswer(invocation -> { | ||
| AsyncPollEvent event = invocation.getArgument(0); | ||
| capturedEvent.set(event); |
There was a problem hiding this comment.
In some cases AsyncKafkaConsumer.poll() can technically submit more than AsyncPollEvent. Perhaps I'm being too paranoid, but defensively, this line could ensure that doesn't happen by a CAS:
| capturedEvent.set(event); | |
| assertTrue(capturedEvent.compareAndSet(null, event)); |
| if (inflightPoll != null && !inflightPoll.isReconciliationCheckComplete()) { | ||
| return Fetch.empty(); | ||
| } |
There was a problem hiding this comment.
It's not necessarily the length of time we wait, it's that we enter a waiting state at all. Even if the waiting time was literally 0 ns, the overhead of the thread scheduling is what matters. So blocking on the reconciliation check is still going to incur the thread scheduler's wrath 😄
That said, I'm definitely in the correctness before performance camp, so let's go ahead and merge this to resolve the core bug. It won't tank CPU load, but it might be a small step backward in those efforts.
|
Thanks for the reviews! Waiting for the build... |
|
@kirktrue added a minor update to ensure that we don't propagate errors via the reconciliation check future (we want to leave errors flowing in the same way they flow already). Will just open a separate jira because this test failure made me notice that we may be able to improve and avoid blocking on the buffer if there are metadata errors (unrelated to this PR, it's the behaviour in trunk atm). Thanks! -- update (improvement to consider separately, unrelated to this PR, applies to trunk) |
| // safely collect records from the buffer. | ||
| if (inflightPoll != null && !inflightPoll.isReconciliationCheckComplete()) { | ||
| // If the background hasn't had the time to check for pending reconciliation, | ||
| // we need to wait for that check before moving on (instead of returning empty righ away, |
| public void completeExceptionally(KafkaException e) { | ||
| // Complete reconciliation future to unblock any waiters - the error will be surfaced | ||
| // through the normal checkInflightPoll() mechanism via the error field | ||
| reconciliationCheckFuture.complete(null); |
There was a problem hiding this comment.
completeExceptionally completes reconciliationCheckFuture with a normal completion so collectFetch’s wait unblocks, but collectFetch doesn’t recheck inflightPoll.error() afterward. So we could still return buffered records from this poll() and only throw on the next poll() via checkInflightPoll. Is that ordering intentional for the public contract, or should we fail fast / skip collection once the wait returns and the inflight event already has an error?
There was a problem hiding this comment.
we're intentionally not completing exceptionally the reconciliation check because it would start propagating poll core errors via the reconciliation check (we want to unblock the app thread from waiting on a reconciliation trigger, and let the app thread errors flowing through the same mechanism that we had in place, checkInflight errors). So just keeping the same behaviour we had before this PR (where agree with what you described, if let's say a metadata error is discovered after the checkInflight error check, the poll would return records, and throw the error on the next poll.
I had filed https://issues.apache.org/jira/browse/KAFKA-20397 to review that, but separately, as it's not directly related to this PR (and from the reconciliation pov, this case of failure on poll event here it's ok to return buffered records, because if the poll event failed before the successful completion of the reconciliation check, it's the case where the poll event wasn't even processed, so no reconciliation triggered, no risk of ongoing revocation (this is the failure before processing events I refer to
). Makes sense?There was a problem hiding this comment.
Thanks, that clears it up. I’m aligned on not completing reconciliationCheckFuture exceptionally so poll errors stay on the existing checkInflight path, and on treating the post-wait / next-poll error ordering as pre-existing. Your point about failure before the poll event is processed (no reconciliation / revocation in play) is convincing for this PR. I’ve noted the metadata-error vs. buffer-blocking improvement for KAFKA-20397 and will try to narrow it down with a small test there.
|
Thanks for the review @nileshkumar3 ! |
| long timeoutMs = inflightPoll.deadlineMs() - time.milliseconds(); | ||
| if (timeoutMs > 0) { | ||
| try { | ||
| ConsumerUtils.getResult(inflightPoll.reconciliationCheckFuture(), timeoutMs); |
There was a problem hiding this comment.
Should we calculate the pollTimeout after collectFetch, since collectFetch can now be a blocking operation?
…itions being revoked (#21897) This addresses race conditions where the app thread could collect/return records for revoked partitions. Fix by ensuring that the app thread does not return buffered records if it hasn't checked pending reconciliations. Once it checked pending reconciliations, we know that partitions being revoked were marked as non-fetchable (so it's when we can safely move onto fetching/collecting in the app thread). Also ensure that background reconciliations do not trigger revocations (the app thread could already have records in memory, collected from the buffer, for those partitions, which would lead to the consumer returning records for revoked partitions if the background completes the revocation before the app thread returns). With these fixes we are sure that the app thread only collects/returns records after it has marked revoked partitions as non-fetchable. This fix applies to the consumer only (share consumer remains unchanged with this PR, can trigger full reconciliation & assignment update from the background) Reviewers: Andrew Schofield <aschofield@confluent.io>, nileshkumar3 <nileshkumar3@gmail.com>, PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>, Kirk True <ktrue@confluent.io>
…itions being revoked (#21897) This addresses race conditions where the app thread could collect/return records for revoked partitions. Fix by ensuring that the app thread does not return buffered records if it hasn't checked pending reconciliations. Once it checked pending reconciliations, we know that partitions being revoked were marked as non-fetchable (so it's when we can safely move onto fetching/collecting in the app thread). Also ensure that background reconciliations do not trigger revocations (the app thread could already have records in memory, collected from the buffer, for those partitions, which would lead to the consumer returning records for revoked partitions if the background completes the revocation before the app thread returns). With these fixes we are sure that the app thread only collects/returns records after it has marked revoked partitions as non-fetchable. This fix applies to the consumer only (share consumer remains unchanged with this PR, can trigger full reconciliation & assignment update from the background) Reviewers: Andrew Schofield <aschofield@confluent.io>, nileshkumar3 <nileshkumar3@gmail.com>, PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>, Kirk True <ktrue@confluent.io>
…itions being revoked (apache#21897) This addresses race conditions where the app thread could collect/return records for revoked partitions. Fix by ensuring that the app thread does not return buffered records if it hasn't checked pending reconciliations. Once it checked pending reconciliations, we know that partitions being revoked were marked as non-fetchable (so it's when we can safely move onto fetching/collecting in the app thread). Also ensure that background reconciliations do not trigger revocations (the app thread could already have records in memory, collected from the buffer, for those partitions, which would lead to the consumer returning records for revoked partitions if the background completes the revocation before the app thread returns). With these fixes we are sure that the app thread only collects/returns records after it has marked revoked partitions as non-fetchable. This fix applies to the consumer only (share consumer remains unchanged with this PR, can trigger full reconciliation & assignment update from the background) Reviewers: Andrew Schofield <aschofield@confluent.io>, nileshkumar3 <nileshkumar3@gmail.com>, PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>, Kirk True <ktrue@confluent.io>
This addresses race conditions where the app thread could collect/return
records for revoked partitions.
Fix by ensuring that the app thread does not return buffered records if
it hasn't checked pending reconciliations. Once it checked pending
reconciliations, we know that partitions being revoked were marked as
non-fetchable (so it's when we can safely move onto fetching/collecting
in the app thread). Also ensure that background reconciliations do not
trigger revocations (the app thread could already have records in
memory, collected from the buffer, for those partitions, which would
lead to the consumer returning records for revoked partitions if the
background completes the revocation before the app thread returns).
With these fixes we are sure that the app thread only collects/returns
records after it has marked revoked partitions as non-fetchable.
This fix applies to the consumer only (share consumer remains unchanged
with this PR, can trigger full reconciliation & assignment update from
the background)
Reviewers: Andrew Schofield aschofield@confluent.io, nileshkumar3
nileshkumar3@gmail.com, PoAn Yang payang@apache.org, Chia-Ping Tsai
chia7712@gmail.com, Kirk True ktrue@confluent.io