Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16514: backdoor for modifying internal leave.group.on.close config #15819

Closed

Conversation

ableegoldman
Copy link
Contributor

POC for one option to fix the issue described in KAFKA-16514

This is definitely a hacky approach to introduce a backdoor solely for Kafka Streams to implement its special close semantics. But to be fair, that is exactly the same thing that happens right now, except that instead of a backdoor internal method we have a backdoor internal ConsumerConfig (ie "leave.group.on.close"). This just adds a way to modify that config so Kafka Streams operators can decide at close time whether they want to leave the group or not.

For example, a Streams application may want to scale in by permanently stopping some nodes, in which case the consumers should all immediately leave the group. However it may also want to simply bounce the node, in which case it makes sense to not leave the group when closed. Both are valid scenarios but having to choose one up front when the consumer is first created due to the immutability of configs means that in practice, users are forced to guess which one they will want/need and can't decide at close time.

@ableegoldman ableegoldman changed the title [DO NOT MERGE] KAFKA-16514: backdoor for modifying internal leave.group.on.close config KAFKA-16514: backdoor for modifying internal leave.group.on.close config Apr 26, 2024
Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman thanks for this patch. I feel the CloseOption is a good way to make the close more flexible.

@@ -1469,6 +1469,10 @@ private Thread shutdownHelper(final boolean error, final long timeoutMs, final b
// save the current thread so that if it is a stream thread
// we don't attempt to join it and cause a deadlock
return new Thread(() -> {
if (leaveGroup) {
processStreamThread(streamThread -> streamThread.setLeaveGroupOnClose(leaveGroup));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaveGroup is always true, so setLeaveGroupOnClose could be renamed to leaveGroupOnClose. Also, it does not need the argument.

if (leaveGroupOnClose) {
log.info("Leaving consumer group on close");
}
((LegacyKafkaConsumer<?, ?>) mainConsumer).overrideLeaveGroupOnClose();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the casting is a acceptable way, we can enhance LegacyKafkaConsumer#close to take CloseOption right now. LegacyKafkaConsumer is not a public class so the change is ok. Also, that would be a compatible way with new feature in the future.

@ableegoldman
Copy link
Contributor Author

Will do this via a KIP since there's some agreement that it makes sense for this to be configurable for plain consumer apps as well as Streams

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants