Skip to content

KAFKA-20385: Add consumer-aware methods to ConsumerRebalanceListener#22242

Open
adikou wants to merge 2 commits into
apache:trunkfrom
adikou:akousik/KAFKA-20385
Open

KAFKA-20385: Add consumer-aware methods to ConsumerRebalanceListener#22242
adikou wants to merge 2 commits into
apache:trunkfrom
adikou:akousik/KAFKA-20385

Conversation

@adikou
Copy link
Copy Markdown

@adikou adikou commented May 8, 2026

Introduce RebalanceConsumer, a restricted view of the Consumer passed to
rebalance callbacks that exposes only operations safe during a rebalance
(seek, commit, position, pause/resume, etc.) and is invalidated after
the callback returns.

Add two-argument variants to each ConsumerRebalanceListener callback
that receive a RebalanceConsumer, with defaults delegating to the
existing one-argument methods for backward compatibility.

Add setConsumerRebalanceListener() to Consumer as the new way to
register a listener, decoupling listener registration from
subscription. Internally, ConsumerAwareRebalanceListener wraps the
user listener and manages DelegatingRebalanceConsumer view lifecycle,
with the Consumer reference provided at registration time.

Deprecate the Optional subscribe() variants
on SubscriptionState in favor of listenerless overloads plus
setConsumerRebalanceListener(listener, consumer).

The subscribe(topics, listener) overloads on Consumer are not yet
annotated @deprecated to avoid
-Werror across the repo to keep the diff minimal.

Reviewers: Andrew Schofield aschofield@confluent.io

adikou added 2 commits April 30, 2026 07:47
…hods (KIP-1306)

  Introduce RebalanceConsumer, a restricted view of the Consumer passed to rebalance callbacks that exposes only operations safe during a rebalance (seek, commit, position, pause/resume, etc.) and is
  invalidated after the callback returns. Add two-argument variants to each ConsumerRebalanceListener callback that receive a RebalanceConsumer, with defaults delegating to the existing one-argument
  methods for backward compatibility. Add setConsumerRebalanceListener() to Consumer as the new way to register a listener, decoupling listener registration from subscription. Internally,
  ConsumerAwareRebalanceListener wraps the user listener and manages DelegatingRebalanceConsumer view lifecycle, with the Consumer reference provided at registration time to eliminate the this-escape
  pattern. Deprecate the Optional<ConsumerRebalanceListener> subscribe() variants on SubscriptionState in favor of listenerless overloads plus setConsumerRebalanceListener(listener, consumer). The
  subscribe(topics, listener) overloads on Consumer are not yet annotated @deprecated to avoid -Werror across the repo; a follow-on PR will migrate all internal callers and add the annotation.
# Conflicts:
#	clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
#	clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java
#	clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@github-actions github-actions Bot added triage PRs from the community streams consumer clients and removed triage PRs from the community labels May 8, 2026
@adikou
Copy link
Copy Markdown
Author

adikou commented May 8, 2026

Hi @AndrewJSchofield @lianetm would you be able to review this PR? Thanks.

Copy link
Copy Markdown
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I will do a proper review and so far I just had a quick look to see the scope of the work. I do notice that you're calling subscribe() and then setConsumerRebalanceListener(). I would expect them strictly to be the other way around to make sure your listener does not ever missing any callback events.

AtomicReference<Collection<TopicPartition>> partitionsFromCallback = new AtomicReference<>();
consumer.subscribe(topics, new ConsumerRebalanceListener() {
consumer.subscribe(topics);
consumer.setConsumerRebalanceListener(new ConsumerRebalanceListener() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd set the rebalance listener before subscribing. Otherwise, you have a tiny race condition.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll go over the diff to swap the calls.
But, is there a concern since the KafkaConsumer is explicitly called out to be from single threads and we acquireAndEnsureOpen to check same threaded access in each call post subscription?

@lianetm
Copy link
Copy Markdown
Member

lianetm commented May 8, 2026

Thanks for the PR @adikou! I will also take a look next week, but wonder if it would be helpful to split this PR into smaller ones to make progress better?

E.g, we could try smaller PRs for the new components/func introduced with the KIP at the lower level (without wiring them into the consumers): RebalanceConsumer/ConsumerRebalanceListener/RebalanceDelegate, their tests. Then maybe worth splitting the wiring into the Consumers (Classic and Async). Also can consider handling other components separately (e.g, MockConsumer). Up to you that have the code in hand, just suggesting for consideration in case it helps. Thanks!

@adikou
Copy link
Copy Markdown
Author

adikou commented May 8, 2026

Thanks for the PR @adikou! I will also take a look next week, but wonder if it would be helpful to split this PR into smaller ones to make progress better?

E.g, we could try smaller PRs for the new components/func introduced with the KIP at the lower level (without wiring them into the consumers): RebalanceConsumer/ConsumerRebalanceListener/RebalanceDelegate, their tests. Then maybe worth splitting the wiring into the Consumers (Classic and Async). Also can consider handling other components separately (e.g, MockConsumer). Up to you that have the code in hand, just suggesting for consideration in case it helps. Thanks!

Thanks Lianet. I was leaning towards smaller PRs and generally prefer it that way. I wasn't sure was the preferred method for the community: can I open each PR against the same JIRA? Do you prefer squash and merge as opposed to smaller thread of PRs?

@lianetm
Copy link
Copy Markdown
Member

lianetm commented May 8, 2026

Nice! In this case you can keep the same jira if you prefer I would say, and just prefix the PRs with "KAFKA-20385 [1/N]: ...". I don't expect we need too many, but it will help review/delivery. Thanks!

@adikou
Copy link
Copy Markdown
Author

adikou commented May 12, 2026

Hi @lianetm @AndrewJSchofield I've opened #22270 , #22271, and #22272 by splitting up this PR.

Please take a look when you can. #22271 and #22272 both point to trunk but are both dependent on #22270 so their compilation will break. But I wanted to leave the diffs around for review first. When I merge the changes to trunk I'll rebase and push new commits upstream.

If you'd like, I can close this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants