Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15555: Ensure wakeups are handled correctly in poll() #14746

Merged
merged 12 commits into from
Nov 23, 2023

Conversation

cadonna
Copy link
Contributor

@cadonna cadonna commented Nov 13, 2023

We need to be careful when aborting a long poll with wakeup() since the consumer might never return records if the poll is interrupted after the consumer position has been updated but the records have not been returned to the caller of poll().

This PR avoids wake-ups during this critical period in the PrototypeAsyncConsumer, similarly as in the current consumer.

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@@ -335,10 +336,17 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}

final CompletableFuture<Void> wakeupFuture = setupWakeupTrigger();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed to create a dedicated future since fetches are not passed to the background thread as events since fetches happen continuously on the background thread.

@cadonna
Copy link
Contributor Author

cadonna commented Nov 13, 2023

I will add tests after somebody confirms that this makes sense.

Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @cadonna!

I believe poll() should throw a WakeupException, right @philipnee? If so, the ExecutionException's should be inspected for being a WakeupException. If it's not a WakeupException, I'm not sure what to do. I also don't remember how we handle interruptions, though I believe that's in Javadoc comments somewhere.

@philipnee
Copy link
Contributor

Hi @cadonna - Thank you for putting time into this PR. Based on my understanding this PR does 2 things: if wakeup() is invoked before calling poll(), the consumer will return immediately. If wakeup() is invoked during poll(), we should get a wakeupException and return. Overall I think it looks right.

*while writing this I think @kirktrue has asked the questions I wanted to ask.

@philipnee
Copy link
Contributor

Another point I want to make here is that the wakeup call also wakes-up the blocking client. I wonder if we also need to do that to the network thread - @kirktrue

@cadonna
Copy link
Contributor Author

cadonna commented Nov 14, 2023

@kirktrue I am not sure, I understand your comment about ExecutionException. The future is not bound to any task, so can an ExecutionException ever be thrown?

@philipnee
Copy link
Contributor

Hi @cadonna - When the consumer is woken up. The WakeupTrigger should complete the future exceptionally with WakeupException. To rethrow that exception during future.get(), you will need to examine the ExecutionException kind of like this:

catch (ExecutionException e) {
            Throwable t = e.getCause();

            if (t instanceof WakeupException)
                throw new WakeupException();
            else if (t instanceof KafkaException)
                throw (KafkaException) t;
            else
                throw new KafkaException(t);

@philipnee
Copy link
Contributor

Basically future.get() API only return 3 types of exceptions: ExecutionException, InterruptedException, and Cancellation per documentation.

@cadonna
Copy link
Contributor Author

cadonna commented Nov 15, 2023

@philipnee @kirktrue Thanks for your comments and explanation! I totally missed that if the future is completed exceptionally it throws an ExecutionException. The javadocs of completeExceptionally() state:

If not already completed, causes invocations of get() and related methods to throw the given exception.

It does not say anything about wrapping. I just saw now that the javadocs of CompletableFuture state it more clearly:

In case of exceptional completion with a CompletionException, methods get() and get(long, TimeUnit) throw an ExecutionException with the same cause as held in the corresponding CompletionException.

I misunderstood the javadocs on get(). I modified the code to use getNow() which makes the handling a bit simpler.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna Thanks for the PR! Generally looks good to me, but I had an idea for a slightly different implementation, and a question about testing.


private CompletableFuture<Void> setupWakeupTrigger() {
final CompletableFuture<Void> wakeupFuture = new CompletableFuture<>();
return wakeupTrigger.setActiveTask(wakeupFuture);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to scratch my head a little bit over this.

Since we already have the Wakeupable interface, I wonder if it wouldn't be cleaner to have a new static subclass:

/** Placeholder for a task that delays wake-up until it's manually triggered */
DelayedWakeupTask(final boolean wakeUpMarker)

Then replace DelayedWakeupTask(false) by DelayedWakeupTask(true) in WakeupTrigger.wakeup and move maybeTriggerWakeup to WakeupTrigger, rename it to maybeTriggerDelayedWakeup.

Then we'd have fewer code paths to worry about (no non-exceptional completion, no other possible exceptions where we ourselves don't know what to do and just throw and IllegalStateException).

Its just an idea, could be that I overlooked something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to not use a future, but a little bit different from you proposal. The code became indeed simpler.

@@ -415,6 +439,27 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
return ConsumerRecords.empty();
} finally {
kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
wakeupTrigger.clearActiveTask();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not 100% sure we are providing the guarantee stated in the javadoc of the consumer here.

This would be:

     * If no thread is blocking in a method which can throw {@link org.apache.kafka.common.errors.WakeupException}, the next call to such a method will raise it instead.

But if the current fetch returns records, we will never throw a WakeupException, not from this poll nor the next. Should we keep the "activeTask" around strictly until we are ready to throw a WakeupException?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right @lucasbru - We need to ensure WakeupException will also be thrown when wakeup() is invoked. I wonder if we need to wrap the whole poll() in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed it. We no longer need it.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, that's indeed a lot cleaner without the Future! LGTM

@cadonna
Copy link
Contributor Author

cadonna commented Nov 15, 2023

I am still not sure what the conclusion is regarding: #14746 (comment)
Do we need/ Can we wake up the network thread.

@philipnee
Copy link
Contributor

hi @cadonna - I think we probably don't have to worry about it here. I have one comment: Is it possible to test poll would return normally after the consumer is woken up and invoke poll again?

@cadonna
Copy link
Contributor Author

cadonna commented Nov 16, 2023

Is it possible to test poll would return normally after the consumer is woken up and invoke poll

That is a good idea!

@cadonna
Copy link
Contributor Author

cadonna commented Nov 16, 2023

@philipnee @kirktrue @lucasbru I am concerned that when one calls wakeup() the application thread might stay blocked in waiting for a non-empty fetch buffer.
See https://github.com/apache/kafka/blob/a16cde3b1f67ea3248466444fd3562878428fecf/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L968
Currently, I do not know how to interrupt the application thread with the current code. I cannot do it in AsyncKafkaConsumer#wakeup() because I do not have a handle on the application thread and I cannot do it in WakeupTrigger#maybeTriggerWakeup() because there it is too late. Any ideas?

@lucasbru
Copy link
Member

@philipnee @kirktrue @lucasbru I am concerned that when one calls wakeup() the application thread might stay blocked in waiting for a non-empty fetch buffer.

Yes. The application thread remains blocked, so this doesn't really wake the application thread up.

Currently, I do not know how to interrupt the application thread with the current code. I cannot do it in AsyncKafkaConsumer#wakeup() because I do not have a handle on the application thread and I cannot do it in WakeupTrigger#maybeTriggerWakeup() because there it is too late. Any ideas?

I'd say we signal notEmptyCondition in FetchBuffer. That could possibly be achieved this way:

  • We implement a new subclass of Wakeupable that is a FetchAction(FetchBuffer), similar to my idea above.
  • In AsynConsumer.poll, FetchAction is registered before the poll and removed after the poll.
  • In WakeupTrigger.wakeup(), FetchAction is replaced by a WakeupFuture, and FetchBuffer.wakeup is called.
  • FetchBuffer.wakeup calls notEmptyCondition.signalAll().

There are other ideas, but I think they are more problematic:

  1. If we wanted to stick more to the original architecture, we could attempt to solve "waiting for a non-empty fetch buffer" by putting a future in the background queue and let the background thread poll loop complete the future. But that would be a significant rewrite and may have other problems.

  2. I'm implementing a change where only one thread can use the consumer API, just like in the original consumer. In principle, you could store a reference to the application thread in the consumer when it is opened and interrupt it that way.

@cadonna
Copy link
Contributor Author

cadonna commented Nov 16, 2023

Thanks for the options, @lucasbru!

In the meanwhile, I also had an idea to store the reference to the application thread in a wakeupable task similar to your FetchAction task. A call to wakeup() would then interrupt the application thread stored in the wakeupable task. However, I will try your signal idea since that seems more safe to me than interrupting the application thread.

@kirktrue
Copy link
Collaborator

Can we add the ctr label here for easier discovery?

@philipnee philipnee added the ctr Consumer Threading Refactor (KIP-848) label Nov 17, 2023
@philipnee
Copy link
Contributor

Hi @cadonna - I was about to reply with the same idea you proposed. I think that would work. I wonder if we could just use a BlockingQueue for the fetchBuffer because fetchBuffer.poll(time) blocks until non-empty or timeout.

Copy link
Contributor

@philipnee philipnee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continue on my comment - Do you think it is worth adding a ticket about using the BackgroundEventQueue for the fetches? Let me know if you have a concrete idea about how to implement this.

@philipnee
Copy link
Contributor

@cadonna - There are some build failures. Can we rerun the tests?

@cadonna
Copy link
Contributor Author

cadonna commented Nov 17, 2023

I wonder if we could just use a BlockingQueue for the fetchBuffer because fetchBuffer.poll(time) blocks until non-empty or timeout.

As far as I see from the javadocs of a BlockingQueue there is no way to wake up from poll() other than interrupting the thread. I think signalling on a condition variable is safer than interrupting a thread since its scope is well-defined.

@cadonna
Copy link
Contributor Author

cadonna commented Nov 17, 2023

Do you think it is worth adding a ticket about using the BackgroundEventQueue for the fetches? Let me know if you have a concrete idea about how to implement this.

Let me think about it.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks a lot for solving this tricky problem. I added two nits, use at your own discretion

@@ -52,6 +53,8 @@ public class FetchBuffer implements AutoCloseable {
private final Condition notEmptyCondition;
private final IdempotentCloser idempotentCloser = new IdempotentCloser();

private final AtomicBoolean wakedUp = new AtomicBoolean(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wokenUp?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, boy! How embarrassing!

@@ -185,6 +188,16 @@ void awaitNotEmpty(Timer timer) {
}
}

void wakeup() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wakeup vs. wakeUp capitalization

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the same format as on the Consumer interface. Same for my recent change to wokenup.

@@ -166,7 +169,7 @@ void awaitNotEmpty(Timer timer) {
try {
lock.lock();

while (isEmpty()) {
while (isEmpty() && !wakedUp.compareAndSet(true, false)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I was first convinced that we have an interleaving here that misses the wakeup, but wakedUp takes care.

@cadonna cadonna force-pushed the AK15555-consumer-wakeups branch 2 times, most recently from 643662b to c5c19d6 Compare November 20, 2023 17:51
…nsumer.poll()

We need to be careful when aborting a long poll with wakeup() since the
consumer might never return records if the poll is interrupted after the
consumer position has been updated but the records have not been returned
to the caller of poll().

This PR avoid wake-ups during this critical period.
@cadonna
Copy link
Contributor Author

cadonna commented Nov 23, 2023

Build failures are unrelated.

@cadonna cadonna merged commit 75572f9 into apache:trunk Nov 23, 2023
1 check failed
@cadonna cadonna deleted the AK15555-consumer-wakeups branch November 23, 2023 10:35
yyu1993 pushed a commit to yyu1993/kafka that referenced this pull request Feb 15, 2024
…4746)

We need to be careful when aborting a long poll with wakeup() since the
consumer might never return records if the poll is interrupted after the
consumer position has been updated but the records have not been returned
to the caller of poll().

This PR avoid wake-ups during this critical period.

Reviewers: Philip Nee <pnee@confluent.io>, Kirk True <ktrue@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
AnatolyPopov pushed a commit to aiven/kafka that referenced this pull request Feb 16, 2024
…4746)

We need to be careful when aborting a long poll with wakeup() since the
consumer might never return records if the poll is interrupted after the
consumer position has been updated but the records have not been returned
to the caller of poll().

This PR avoid wake-ups during this critical period.

Reviewers: Philip Nee <pnee@confluent.io>, Kirk True <ktrue@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
clolov pushed a commit to clolov/kafka that referenced this pull request Apr 5, 2024
…4746)

We need to be careful when aborting a long poll with wakeup() since the
consumer might never return records if the poll is interrupted after the
consumer position has been updated but the records have not been returned
to the caller of poll().

This PR avoid wake-ups during this critical period.

Reviewers: Philip Nee <pnee@confluent.io>, Kirk True <ktrue@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ctr Consumer Threading Refactor (KIP-848)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants