New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll #15742
Conversation
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
Show resolved
Hide resolved
rebalanceListenerInvoker, | ||
event.methodName(), | ||
event.partitions(), | ||
event.future() | ||
); | ||
applicationEventHandler.add(invokedEvent); | ||
if (invokedEvent.error().isPresent()) { | ||
throw invokedEvent.error().get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was expecting to see here the logic for wrapping the callback error into a KafkaException
, but I see it is at a lower level in the invokeRebalanceCallbacks
, which it's a bit more obfuscated I would say? Still I see how it's deeply tied to the ConsumerRebalanceListenerCallbackCompletedEvent
so ok for me to leave as it is if we feel it's clear enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConsumerRebalanceListenerCallbackNeededEvent
handles 'assign', 'revoke', and 'lose' callbacks. It was my understanding—I could be wrong—that we wanted to wait to throw the exception after the reconciliation was fully processed. That is, not necessarily on the first callback 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, this implementation has the interesting property that it will both throw the exception and continue processing. It seems like this could potentially yield two exceptions, if, say, both the onPartitionsRevoked()
and onPartitionsAssigned()
threw exceptions. Is that the intent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the reconciliation logic, I think if onPartitionsRevoked
throws, we'll not execute onPartitionsAssigned
. And the call to onPartitionsLost
seems to be independent of reconciliation. So not sure how we'd end up with two exceptions.
You are right that there is a behavioral difference around finishing the reconciliation. The old consumer throws after finishing the reconciliation, while the new consumer throws on a different thread, so there is no strict time ordering between finishing the reconciliation and throwing. But I'm struggling to see how one can observe the difference. The reconciliation will have finished the next time the background thread processes any events, so in a sense, you cannot observe the difference based on the queue architecture. The difference may only be observable through shared state that breaks the queue-based architecture. SubscriptionState comes to mind here. Thinking of something like
- application thread enters poll, fails during rebalance listener execution and throws
- application thread somehow reads subscription state
- background thread updates subscription state as part of reconciliation
Now the application thread has observed an "incomplete reconciliation". But after a listener execution has failed, we don't seem to update the subscription state in the reconciliation.
So in summary - not sure if we are going to notice the different behaviors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of aligned with @lucasbru here. I totally get your concern @kirktrue , but as I see it we're in a very different territory, not only with the new consumer architecture (all that @lucasbru described), but also with the new protocol (which is the only one supported here), so I lean towards keeping it simple as an initial approach, based on how we expect things to happen in practice here. With the new protocol, we get revocations first, and then new partitions in a following reconciliation loop. If revocation callback fails, the reconciliation will continue to be retried on the next poll loop, triggering callbacks continuously (that's what will be happening in the background). At the same time, in the foreground, we'll be raising the revocation callback failure to the user (with this PR).
But after a listener execution has failed, we don't seem to update the subscription state in the reconciliation.
Agree, just for the record, that holds true for the listeners of partitions revoked and lost (subscription state is only updated when the callbacks complete). In the case of assigned partitions, the subscription is updated before the callback, just aligning with the onPartitionsAssigned contract, which is that it is called when the rebalance completes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kirktrue let me know if you are okay with this, otherwise maybe we can discuss it on Thursday
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay!
Yes, I'm in agreement with the perspectives you and @lianetm stated. No qualms from me 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes @lucasbru, closing an important gap! LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick turnaround and simple implementation @lucasbru!
I just had the one question about how many exceptions are thrown at the user if both revoke and assign callbacks have errors.
Thanks!
rebalanceListenerInvoker, | ||
event.methodName(), | ||
event.partitions(), | ||
event.future() | ||
); | ||
applicationEventHandler.add(invokedEvent); | ||
if (invokedEvent.error().isPresent()) { | ||
throw invokedEvent.error().get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConsumerRebalanceListenerCallbackNeededEvent
handles 'assign', 'revoke', and 'lose' callbacks. It was my understanding—I could be wrong—that we wanted to wait to throw the exception after the reconciliation was fully processed. That is, not necessarily on the first callback 🤔
rebalanceListenerInvoker, | ||
event.methodName(), | ||
event.partitions(), | ||
event.future() | ||
); | ||
applicationEventHandler.add(invokedEvent); | ||
if (invokedEvent.error().isPresent()) { | ||
throw invokedEvent.error().get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, this implementation has the interesting property that it will both throw the exception and continue processing. It seems like this could potentially yield two exceptions, if, say, both the onPartitionsRevoked()
and onPartitionsAssigned()
threw exceptions. Is that the intent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @lucasbru !
Here my comments.
Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, error, 0, 0, 1, wrappedException), | ||
|
||
// Tests that we invoke our listener even if it encounters an exception. Special case to test that a kafka exception is not wrapped. | ||
Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, kafkaException, 0, 0, 1, kafkaException), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you need to repeat this test also for partition assigned and partition revoked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I did not add them because I didn't think they'd add interesting coverage. If you think we need it, let me add it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update!
LGTM!
When user-defined rebalance listeners fail with an exception, the expectation is that the error should be propagated to the user as a KafkaExpception and break the poll loop (behaviour in the legacy coordinator). The new consumer executes callbacks in the application thread, and sends an event to the background with the callback result and error if any, passing the error along with the event here to the background thread, but does not seem to propagate the exception to the user.
Committer Checklist (excluded from commit message)