Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17066: new consumer updateFetchPositions all in background thread #16885

Merged
merged 40 commits into from
Sep 15, 2024

Conversation

lianetm
Copy link
Contributor

@lianetm lianetm commented Aug 15, 2024

Fix for the known issue that the logic for updating fetch positions in the new consumer was being performed partly in the app thread, party in the background thread, potentially leading to race conditions on the subscription state.

This PR moves the logic for updateFetchPositions to the background thread as a single event (instead of triggering separate events to validate, fetchOffsets, listOffsets). A new UpdateFetchPositionsEvent is triggered from the app thread and processed in the background, where it performs those same operations and updates the subscription state accordingly, without blocking the background thread.

This PR maintains the existing logic for keeping a pendingOffsetFetchRequest that does not complete within the lifetime of the updateFetchPositions attempt, and may be used on the next call to updateFetchPositions.

@lianetm lianetm added consumer KIP-848 The Next Generation of the Consumer Rebalance Protocol labels Aug 15, 2024
@lianetm lianetm changed the title [WIP]: updateFetchPositions event processed in background thread KAFKA-17066: new consumer updateFetchPositions all in background thread Aug 16, 2024
@lianetm lianetm marked this pull request as ready for review August 16, 2024 15:01
Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this PR, @lianetm! Very tricky 😄

I think it's the correct approach, but I'm still wrapping my head around it. I left some basic feedback, but I'll do a follow up review in a day or two.

Thanks.

// has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch
// request, retrieve the partition end offsets, and validate the current position
// against it. It will throw an exception if log truncation is detected.
requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validatePositionsIfNeeded() returns a CompletableFuture. We need to wait for it to be completed before continuing, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it should be chained. Also, I don't think it does throw an exception if log truncation is detected. Completing the future exceptionally is not quite the same thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it throws an exception (here) when it checks for the partitions it needs to validate. Even though it does return a future, this is one of the parts I was trying to simplify with this refactoring. This is my reasoning:

  1. validate positions is an operation whose only purpose is to detect log truncation. So it fires a request, and when it gets a response it looks for truncation. If it detects it, it saves the exception to be thrown on the next call to validate (common concept and behaviour up to here, on the 2 consumers)
  2. based on the conceptual definition above, the classic consumer triggers it as an async operation and does not wait for a response to move on and attempt to reset positions with committed offsets or partition offsets
    So, with the async consumer doing all the updates in the background now, seemed easy to simplify and do the same: trigger validation as an async (no waiting for the result future to complete), carry on with reset, throw log truncation if any on the next call.

Note that one fair concern with not chaining the validate request is how to ensure it won't be storming the broker with requests. That does not happen because it already sets the pace of requests to send based on the subscriptionState allowedRetries (see here). This ensures that whenever a validation request is sent, it waits up to the requestTimeout before sending a new one).

Makes sense? I could be missing something but seems to me we already get the behaviour we want without having to play with the futures here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm—thanks for the explanation. We don't want to require the position validation to finish before performing the rest of the logic.

Out of curiosity, regarding "storming the broker with requests," does the OffsetRequestManager already handle duplicate concurrent requests?

Comment on lines 64 to 72
/**
* OffsetFetch request triggered to update fetch positions. The request is kept. It will be
* cleared every time a response with the committed offsets is received and used to update
* fetch positions. If the response cannot be used because the UpdateFetchPositions expired,
* it will be kept to be used on the next attempt to update fetch positions if partitions
* remain the same.
*/
private FetchCommittedOffsetsEvent pendingOffsetFetchEvent;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've introduced a lot of logic and state in this class. I'd really like to consider moving the bulk of process(), initWithPartitionOffsetsIfNeeded(), and initWithPartitionOffsetsIfNeeded() to a dedicated class. I'd prefer to keep ApplicationEventProcessor focused on dispatching events to their corresponding RequestManager method(s).

Copy link
Contributor Author

@lianetm lianetm Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, totally agree. I debated on that too. It would be a new UpdatePositionsEventProcessor that still "dispatches events to their corresponding managers", but agree that in this case it requires more logic to achieve that.

I didn't go ahead with a separate class since the beginning just to see how others felt about this. I saw this options:

  1. leave it in the same processor where all events are indeed processed
  2. have a new UpdatePositionsEventProcessor (just to leave it no specific manager, and mix all the ones needed here)
  3. have it in the offsetsRequestManager, given that most of the logic to update positions is about partition offsets, mixing in a bit of committed offsets.

I do like the 3rd one, mainly because I find it consistent with the MembershipManager, that does what it needs and may require the commitRequestManager. I felt I could be biased here of course :), but @AndrewJSchofield also suggested this option 3, so if it makes sense to all I'll go for it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 3 makes the most sense. I don't think we need to be afraid of making a dedicated class for this, if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, all moved to the OffsetsRequestManager (option 3) and looks much better indeed. Let me know

// (with potentially a longer timeout) and stored. The event is used for the first attempt, but in the
// case it times out, subsequent attempts will also use the event in order to wait for the results.
if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
final long deadlineMs = Math.max(updateFetchPositionsEvent.deadlineMs(), updateFetchPositionsEvent.fetchOffsetsDeadlineMs());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we could possibly calculate the fetch offsets deadline here instead of including two deadlines in the event. Sorry, I'm kind of hung up on the two deadlines in the event 😞

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally get your point. The alternative was to get the config for "defaultApiTimeout" in the UpdateFetchPositionsEvent (that's what we need for the 2nd deadline). This is changing now anyways, because of the changes to address other comments, so we won't have it anymore.

}
});
} catch (Exception e) {
updateFetchPositionsEvent.future().completeExceptionally(e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to wrap this exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uhm I wasn't completely sure about this, but after checking in detail seems to make sense to wrap here. I was concerned that maybe we would mistakenly wrap something that was expected from the API, given that we know perform in the background lots of operations that were in the app thread before. But we do handle the expected errors in the updatePositions flow and requests, and use them to complete the future). So this would really be for unexpected errors in the flow I would say, and ok to wrap. Done

Copy link
Collaborator

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. This is definitely a nice design change to make in one of the more tricky areas of the consumer.

I wonder whether the timeout handling needs to work quite like this. There's the amount of time that the consumer's application thread is prepared to wait, and there's the amount of time that the offsets request manager is prepared to spend fetching and validating offsets. Unless the set of partitions to process changes, I can't see why it wouldn't just continue until it has an answer, which it can cache. We are trying to guard against a situation where KafkaConsumer.poll(Duration) is being issued repeatedly with a timeout so short that the offset management cannot complete. Couldn't the OffsetsRequestManager simply make the sequence of RPCs to complete the little dance, regardless of a timeout? I don't like caching the event, because that seems to me to apply to a single invocation from the AsyncKafkaConsumer, rather than being an embodiment of the operation in hand.

// has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch
// request, retrieve the partition end offsets, and validate the current position
// against it. It will throw an exception if log truncation is detected.
requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it should be chained. Also, I don't think it does throw an exception if log truncation is detected. Completing the future exceptionally is not quite the same thing.

Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @lianetm! I did another really quick review to read through your responses. I have not yet gotten as far as reviewing the tests 😞

Comment on lines 64 to 72
/**
* OffsetFetch request triggered to update fetch positions. The request is kept. It will be
* cleared every time a response with the committed offsets is received and used to update
* fetch positions. If the response cannot be used because the UpdateFetchPositions expired,
* it will be kept to be used on the next attempt to update fetch positions if partitions
* remain the same.
*/
private FetchCommittedOffsetsEvent pendingOffsetFetchEvent;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 3 makes the most sense. I don't think we need to be afraid of making a dedicated class for this, if needed.

// has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch
// request, retrieve the partition end offsets, and validate the current position
// against it. It will throw an exception if log truncation is detected.
requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm—thanks for the explanation. We don't want to require the position validation to finish before performing the rest of the logic.

Out of curiosity, regarding "storming the broker with requests," does the OffsetRequestManager already handle duplicate concurrent requests?

@lianetm
Copy link
Contributor Author

lianetm commented Aug 22, 2024

Hey! I just pushed all the requested changes. The most important change is addressing @AndrewJSchofield 's comment regarding timeout handling, totally agree.

Couldn't the OffsetsRequestManager simply make the sequence of RPCs to complete the little dance, regardless of a timeout?

Yes, I think we could and I went ahead and did it. Basically the updatePositions does its sequence of async operations (not blocking the background thread), and ends up actually updating the positions when it gets all the info it needs only if the set of initializing partitions (partitions requiring positions is the same). The only consideration is that errors could occur also, so we need to keep them to be thrown on the next call to update positions if the triggering updatePositionsEvent already expired. Note that this is exactly the same logic already in place for reset/validate positions, for both consumers. Now we just need it also at the updatePositions level given that it moves on and attempts to do what it needs in the background.

I'm currently adding more tests, but all existing test (unit and integration) pass, so please take a look and let me know. Thank you both!

Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm—thanks for all of the updates! I'm scraping the bottom of the barrel with most of my change requests, sorry 😞

applicationEventHandler.addAndGet(new ResetPositionsEvent(calculateDeadlineMs(timer)));
return true;
UpdateFetchPositionsEvent updateFetchPositionsEvent = new UpdateFetchPositionsEvent(calculateDeadlineMs(timer));
wakeupTrigger.setActiveTask(updateFetchPositionsEvent.future());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to involve the wakeup trigger for this case?

Copy link
Contributor Author

@lianetm lianetm Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure that we throw the WakeupException if we get a wakeup call while waiting for the event to complete while polling (part of the contract of poll).

This logic was already in place btw, here, closer to the OffsetFetch, but since it's all in one event now it got moved here. Makes sense?

Comment on lines +385 to +391
private boolean canReusePendingOffsetFetchEvent(Set<TopicPartition> partitions) {
if (pendingOffsetFetchEvent == null) {
return false;
}

return pendingOffsetFetchEvent.requestedPartitions.equals(partitions);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The corresponding code in AsyncKafkaConsumer.canReusePendingOffsetFetchEvent also checked that the pendingOffsetFetchEvent’s deadline hadn't passed. Is that no longer a concern after the refactor?

Comment on lines -93 to +91
private MockTime time;
private final Time time = mock(Time.class);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what prompted this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a simplification, I noticed none of the tests really need to advance the time (that's when we usually bring in the custom Time implementation that MockTime provides), so a mock is enough.

@kirktrue kirktrue added the ctr Consumer Threading Refactor (KIP-848) label Aug 27, 2024
Copy link
Collaborator

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I do like putting the logic into the offsets request manager although it's getting a bit hard to follow now.

// Need to generate a new request to fetch committed offsets
final long fetchCommittedDeadlineMs = Math.max(deadlineMs, time.milliseconds() + defaultApiTimeoutMs);
fetchCommittedFuture = commitRequestManager.fetchOffsets(initializingPartitions, fetchCommittedDeadlineMs);
pendingOffsetFetchEvent = new PendingFetchCommittedRequest(initializingPartitions,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This would be more legible as a single line.

@@ -194,14 +402,15 @@ public CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchO
* an error is received in the response, it will be saved to be thrown on the next call to
* this function (ex. {@link org.apache.kafka.common.errors.TopicAuthorizationException})
*/
public CompletableFuture<Void> resetPositionsIfNeeded() {
protected CompletableFuture<Void> resetPositionsIfNeeded() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always feel that protected has an implication of subclassing. I don't believe this is being subclassed, so I'd prefer to see package-private which should give accessibility for testing. Of course, this is a matter of taste so feel free to ignore this :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree with you on the subclassing expectation, I missed it here. Changed.

// positions.
applicationEventHandler.addAndGet(new ResetPositionsEvent(calculateDeadlineMs(timer)));
return true;
UpdateFetchPositionsEvent updateFetchPositionsEvent = new UpdateFetchPositionsEvent(calculateDeadlineMs(timer));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that this will create an UpdateFetchPositionsEvent for every iteration of the inner loop in AsyncKafkaConsumer.poll(Duration). Previously, the cachedSubscriptionsHasAllFetchPositions was an attempt at optimising this area, which was probably not entirely satisfactory but it was well intentioned. Generally, once the positions have been validated, there's no need for any of this unless something nasty happens. I feel there's an optimisation waiting here. We know when the set of assigned partitions changed. We also know when an error such as log truncation occurred. We know when an UpdateFetchPositionsEvent completed successfully. So, I think we probably can be a bit more deliberate about creating the event only when it's needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The trick is we do need to generate an event on every iteration of the poll just because we need to go to the background thread (even if we end up finding that we have all the positions already). This is to:

  1. Get errors that may have been found in the background on a previous update positions attempt
  2. Trigger validate positions (we do it on every poll as the classic consumer, and the validate common logic internally determines if it really needs to do something or not)
  3. check subscriptions.hasAllFetchPositions() (this we want to do it in the background to avoid races on the assignment that it checks)

And I'll stop here, just to point out that this is what we expect will happen on every poll iteration, under happy/regular scenarios (all partitions have valid positions). This is what that UpdateFetchPositionsEvent is needed for, (not really updating anything, but doing what it needs to determine that it doesn't need to). It's probably more of a MaybeUpdateFetchPositionsEvents (weird or better?)

Then, to cater for the scenarios where something fails/changes, the same event gets more interesting. If it does not "hasAllFetchPositions", it will attempt to retrieve committed offsets or partition offsets as needed, the meaty part.

This is the reasoning behind having an event on every poll, what do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. Thanks for explanation. We're well on the way to removing the SubscriptionState references from the application thread entirely.

return true;
UpdateFetchPositionsEvent updateFetchPositionsEvent = new UpdateFetchPositionsEvent(calculateDeadlineMs(timer));
wakeupTrigger.setActiveTask(updateFetchPositionsEvent.future());
cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(updateFetchPositionsEvent);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, this variable was being set based on subscriptions.hasAllFetchPositions() and did not rely on successfully retrieving the result of the new event. I wonder whether it has become stickier as a result which might not be desirable. Perhaps you should set it to false before waiting for the result of the event so that it's false if the result does not arrive. What do you think the "default" value would be here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The default is false but it could definitely stick to a true result if the UpdateFetchPositions fails after (wrong). I added your suggestion of setting it to false before waiting, seems sensible to me.

return false;
}

public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This CompletableFuture<Boolean> is a slightly tricky thing. The boolean I think refers to whether we have all of the offset information we need. But there are a lot of "results" flying around and I'm finding it tricky to follow this boolean through the various methods. I wonder whether a specific variable name for the future which contains the flag which indicates whether we have all of the offset information used at all points whether it's currently a variable or a method argument would help my tiny brain.

Copy link
Contributor Author

@lianetm lianetm Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, agree is not straightforward. The boolean only indicates if the event hasAllFetchPositions when it starts (meaning that it didn't have to fetch offsets). The results flying around really have no direct impact on that result (only indirect, setting a position, so next time the hasAllFetchPositions will find it).

Definitely having specific var names for the result will help, going for it now...

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm this is a nice patch, and I have only few minor comments :)

return false;
}

return pendingOffsetFetchEvent.requestedPartitions.equals(partitions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use containsAll instead of equals?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it makes sense to me. Just for the record, when the changes for caching the request were initially implemented (in the app thread), a separate Jira was created for this (https://issues.apache.org/jira/browse/KAFKA-16966). Still, I'm ok with changing the containsAll here, and keep that other Jira maybe only to review this in the classic consumer (I won't get there on this PR)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get squeamish about changing things like this as part of a refactor. I'm OK to change it here, but as @lianetm mentioned, KAFKA-16966 was split out separately as an optimization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we can address that in KAFKA-16966

// been assigned a position manually)
if (error == null) {
Map<TopicPartition, OffsetAndMetadata> offsetsToApply = offsetsForInitializingPartitions(offsets);
refreshCommittedOffsets(offsetsToApply, metadata, subscriptionState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it legal to call refreshCommittedOffsets multi times when there are many calls reusing the "first attempt"? i.e we register many actions to whenComplete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. I would say it's harmless because it has the check to be no-op if the partitions already got positions, but it's definitely conceptually wrong. We want to trigger the OffsetFetch, keep it as pending, and refreshCommittedOffsets only once when we get a response (what we need to do "multi times" is not the refreshCommittedOffsets, it's only to call "complete" on all the many attempts that may be waiting for that operation). I made the change.

@lianetm
Copy link
Contributor Author

lianetm commented Sep 4, 2024

Hey @chia7712 , thanks for the review! All comments addressed

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm thanks for updated patch!

return false;
}

return pendingOffsetFetchEvent.requestedPartitions.containsAll(partitions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussion (#16885 (comment)), could you please keep using equals 😄

// this case, on the first attempt to fetch the committed offsets, a FetchCommittedOffsetsEvent is created
// (with potentially a longer timeout) and stored. The event is used for the first attempt, but in the
// case it times out, subsequent attempts will also use the event in order to wait for the results.
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> refreshWithCommitted;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't need refreshWithCommitted.

        if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
            // Generate a new OffsetFetch request and update positions when a response is received
            final long fetchCommittedDeadlineMs = Math.max(deadlineMs, time.milliseconds() + defaultApiTimeoutMs);
            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchOffsets =
                    commitRequestManager.fetchOffsets(initializingPartitions, fetchCommittedDeadlineMs);
            pendingOffsetFetchEvent = new PendingFetchCommittedRequest(initializingPartitions,
                    refreshOffsetsAndCompleteResultOnResponseReceived(fetchOffsets, result));
        } else {
            // Reuse pending OffsetFetch request
            pendingOffsetFetchEvent.result.whenComplete((__, error) -> {
                if (error == null) {
                    result.complete(null);
                } else {
                    result.completeExceptionally(error);
                }
            });
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was to ensure that we only move into resetting positions (with partition offset) after refreshing offsets (not after receiving the response to the OffsetFetch).

That being said, I did remove the refreshWithCommitted and followed your suggestion here + considering the above, and it did lead to what seems to me a better version, the flow seems clearer. Thanks! Let me know.

final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchCommittedFuture,
final CompletableFuture<Void> result) {
return fetchCommittedFuture.whenComplete((offsets, error) -> {
pendingOffsetFetchEvent = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if fetchCommittedFuture is a already completed future, whenComplete will be evaluated before return. That means pendingOffsetFetchEvent will never be null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this changed shape with the latest changes, so I believe we don't have such situation anymore. Now the pending goes to null here (even if the OffsetFetch future is somehow done already).

@lianetm
Copy link
Contributor Author

lianetm commented Sep 6, 2024

Hello @chia7712, thanks for the comments! All addressed.

@lianetm
Copy link
Contributor Author

lianetm commented Sep 11, 2024

Hey @chia7712 , I just added a small fix 2957cc4 after noticing some suspicious failures in the tests after the previous changes. I was indeed missing the fact that we need to allow to reset positions after refreshing committed offsets (even if no offset-fetch request is generated). That was affecting several tests (probably the best one to see the flow would be testOffsetOfPausedPartitions, passes consistently now, locally). I'll keep an eye on the build to check the test results again. Thanks!

@lianetm
Copy link
Contributor Author

lianetm commented Sep 12, 2024

Last build completed with 3 unrelated failures. I just rebased and will check the next one too, but I would say this is ready for another look when you have a chance @chia7712. Thanks a lot!

refreshOffsetsAndResetPositionsStillMissing(offsets, error, result);
});
pendingOffsetFetchEvent = new PendingFetchCommittedRequest(initializingPartitions, fetchOffsets);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchOffsetsAndRefresh =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced this var here just to make sure that we when reusing a fetch, we only complete the whole operation and move onto reset positions once we have applied the retrieved offsets. Without this var, I was adding 2 whenComplete to the same future (ln 376 and ln 384), and those would execute in reverse order of addition.

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm thanks for updates.

Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate =
offsetFetcherUtils.getPartitionsToValidate();
void validatePositionsIfNeeded() {
Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = offsetFetcherUtils.getPartitionsToValidate();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offsetFetcherUtils.getPartitionsToValidate can throw exception from cache, so should we propagate the exception by CompletableFuture?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, and that's the intention but it's done on the catch further down (exception here would short-circuit the flow as we want, and complete exceptionally on catch).

result.completeExceptionally(maybeWrapAsKafkaException(e));

Makes sense?

}

return sendOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate);
sendOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return value of sendOffsetsForLeaderEpochRequestsAndValidatePositions is useless now. Could you please do a bit cleanup for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, done

Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate =
offsetFetcherUtils.getPartitionsToValidate();
void validatePositionsIfNeeded() {
Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = offsetFetcherUtils.getPartitionsToValidate();
if (partitionsToValidate.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to make sure all partitions get validated already? the partitionsToValidate could be empty if there are in-flight OffsetsForLeaderEpochRequest, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're totally right that here we could have empty partitions to validate (only because we may have some but there's already a OffsetsForLeaderEpochRequest in-flight for them). But my expectation is that even in that case, we make sure that all partitions end up being validated because of how we handle the OffsetsForLeaderEpochRequest response :

Makes sense?

@lianetm
Copy link
Contributor Author

lianetm commented Sep 13, 2024

Hello @chia7712 , thanks for the review! All comments addressed.

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lianetm
Copy link
Contributor Author

lianetm commented Sep 13, 2024

Thanks @chia7712! Build completed with 3 unrelated failures.

(filed https://issues.apache.org/jira/browse/KAFKA-17554 for the one that is on the ConsumerNetworkClientTest, classic consumer, that I see has been flaky for a while but wasn't reported yet)

@chia7712 chia7712 merged commit 6744a71 into apache:trunk Sep 15, 2024
6 of 9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
consumer ctr Consumer Threading Refactor (KIP-848) KIP-848 The Next Generation of the Consumer Rebalance Protocol
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants