Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15592: Allow member to not sending a heartbeat when a group ID is configured #14390

Closed
wants to merge 12 commits into from

Conversation

philipnee
Copy link
Contributor

@philipnee philipnee commented Sep 14, 2023

The PR consists of two changes

  1. Replace the GroupState with the membershipManager
  2. Introduce an "inactive" state

We are planning to replace GroupState with MembershipManager as the latter already includes the necessary fields (groupId, groupInstanceId). This patch is a minor refactor of swapping the two components as the follow up of #14364

However, during the patching, we've realized the heartbeat will be sent continuously unless the member enters a fatal state. This needs to be patched up because the member should be able to join/leave a group at anytime during its life cycle.

@philipnee philipnee added the KIP-848 The Next Generation of the Consumer Rebalance Protocol label Sep 14, 2023
@@ -180,19 +180,19 @@ CompletableFuture<ClientResponse> sendAutoCommit(final Map<TopicPartition, Offse
private class OffsetCommitRequestState {
private final Map<TopicPartition, OffsetAndMetadata> offsets;
private final String groupId;
private final GroupState.Generation generation;
private final int memberEpoch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, how do we plan to update the member epoch when stale member epoch is received? Are we going to recreate a new OffsetCommitRequestState?

Copy link
Contributor Author

@philipnee philipnee Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Stale member epcoh error isn't retriable, so the commit request would fail. since the request state is tight to individual request, so the subsequent request will get a new state.

Though - When we receive such error - It seems like we should also update membershipManager which is not done here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale member epcoh error isn't retriable

A stale member epoch error is retriable... but it should be retried with the new epoch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do it either way - recreate this RequestState object or resend the same object with the updated epoch.

.setTopics(new ArrayList<>(requestTopicDataMap.values())));
OffsetCommitRequestData data = new OffsetCommitRequestData()
.setGroupId(groupId)
.setGenerationIdOrMemberEpoch(membershipManager.memberEpoch())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We end up taking the epoch, memberId, and groupInstanceId directly from the membershipMgr, even though they are defined as local variables (unused) in this OffsetCommitRequestState class. In the case of the groupId though, we do use the local var. Let's review and make it consistent.

config,
coordinatorRequestManager,
membershipManager.get());
MembershipManager membershipManager = new MembershipManagerImpl(groupId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant here, already created above.

@philipnee
Copy link
Contributor Author

Hello @dajac @lianetm @kirktrue - I think the scope of this PR extended a bit beyond "replacing GroupState" because we've realized the member may join/leave the group anytime during its lifecycle. In particular, when the member hasn't joined/ has already left the group, we should stop sending the heartbeat.

The current implementation assumes the member should always try to join the group if the groupId is provided, due to the fact that the member's initial state is UNJOINED. However, as previously mentioned:

  1. the user may only want to commit offsets
  2. the user may unsubscribe and then assign (so we should keep the consumer alive and allows it to continue to commit offset)
  3. the user might leave the group due to lack of polling (At least I believe that's the current behavior)
  4. The user can unsubscribe, and then subscribe.

In any of the scenarios above, I think we wouldn't want the member to actively send heartbeats to the GC.

As the original intent of the membershipManager is to manage the state only when the member is in a group. However, due to the fact that the member may join/leave the group any time, I feel it might be sensible to always keep an instance active and introduce an inactive state such as NOT_IN_GROUP to indicate the member does not actively try to join a group.

However, it confuses with UNJOINED a bit. I wonder if we should rename the states.

@philipnee philipnee changed the title MINOR: Replace GroupState with MembershipManager KAFKA-15592: Allow member to not sending a heartbeat when a group ID is configured Oct 12, 2023
private final CompletableFuture<Void> future;
private int memberEpoch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be Optional here too, right? Just as the memberId and groupInstanceId (groupId is the only required one to be able to commit offsets)

.setTopics(new ArrayList<>(requestTopicDataMap.values())));
OffsetCommitRequestData data = new OffsetCommitRequestData()
.setGroupId(groupId)
.setGenerationIdOrMemberEpoch(memberEpoch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aligned with the idea that memberEpoch may not be present, I expect we should only set this if we have one (like it is done for the memberID)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Updated.

public OffsetFetchRequestState(final Set<TopicPartition> partitions,
final GroupState.Generation generation,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do need member epoch (and member ID), to be included in the request if we have them, exactly as it is done for the OffsetCommit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comment is really only to make sure we are on the same page regarding the need of epoch here. But ok if we don't include the changes in this PR, because I also have them in the changes for OffsetFetch v9 that will follow this. I added id and epoch here not only to send it in the request, but also to differentiate the requests that will be retried.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the current impl, it sends a generation of -1 and id of "". I see both fields are "ignorable", not sure if that means we don't need to send them, @dajac - I assume ignorable means we don't need to send them if we don't have one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agree we don't need to send them if we don't have them (member not using the group management capabilities), but the point is member ID/epoch should be here to be included if the member has them.


@Override
public void leaveGroup() {
resetEpoch();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get the full idea here. What's the intended usage for this? If the intention is to use the transition as a trigger to send the last HB, then resetEpoch is incorrect, because we need epoch -1 (or -2 if static member). If the intention is just to call this after the last HB has been sent, then how are you seeing we'll trigger that last HB? (ex. from the unsubsbcribe)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I leave it as TODO, because I'm not sure how leaveGroup would integrate as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I remember why I wrote this. I think the epoch should never be -1/-2, it is only used for sending a leave group request. When we need/want to leave the group, then I think we would first send the heartbeat with negative epoch, then transition the state to not_in_group, it should also join with epoch = 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of reset the Epoch, which seems confusing, I made the epcoh = OptionalInt.empty() so that when the member tries to send a commit, it won't send the epoch after leaving the group.

Ensure initial states and states are set correctly after updating

make epoch optional
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
KIP-848 The Next Generation of the Consumer Rebalance Protocol
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants