Skip to content

Commit

Permalink
KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On S…
Browse files Browse the repository at this point in the history
…tate For Poll Timeout (#8834)

1. Split the consumer coordinator's REBALANCING state into PREPARING_REBALANCE and COMPLETING_REBALANCE. The first is when the join group request is sent, and the second is after the join group response is received. During the first state we should still not send hb since it shares the same socket with the join group request and the group coordinator has disabled timeout, however when we transit to the second state we should start sending hb in case leader's assign takes long time. This is also for fixing KAFKA-10122.

2. When deciding coordinator#timeToNextPoll, do not count in timeToNextHeartbeat if the state is in UNJOINED or PREPARING_REBALANCE since we would disable hb and hence its timer would not be updated.

3. On the broker side, allow hb received during PREPARING_REBALANCE, return NONE error code instead of REBALANCE_IN_PROGRESS. However on client side, we still need to ignore REBALANCE_IN_PROGRESS if state is COMPLETING_REBALANCE in case it is talking to an old versioned broker.

4. Piggy-backing a log4j improvement on the broker coordinator for triggering rebalance reason, as I found it a bit blurred during the investigation. Also subsumed #9038 with log4j improvements.

The tricky part for allowing hb during COMPLETING_REBALANCE is in two parts: 1) before the sync-group response is received, a hb response may have reset the generation; also after the sync-group response but before the callback is triggered, a hb response can still reset the generation, we need to handle both cases by checking the generation / state. 2) with the hb thread enabled, the sync-group request may be sent by the hb thread even if the caller thread did not call poll yet.

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Boyang Chen <boyang@confluent.io>, John Roesler <john@confluent.io>
  • Loading branch information
guozhangwang committed Sep 10, 2020
1 parent bbccf5d commit 16ec179
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1257,13 +1257,6 @@ private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetad
}
}

/**
* Visible for testing
*/
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
return updateAssignmentMetadataIfNeeded(timer, true);
}

boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
return false;
Expand Down Expand Up @@ -1297,6 +1290,8 @@ private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer tim
pollTimeout = retryBackoffMs;
}

log.trace("Polling for fetches with timeout {}", pollTimeout);

Timer pollTimer = time.timer(pollTimeout);
client.poll(pollTimer, () -> {
// since a fetch might be completed by the background thread, we need this poll condition
Expand Down Expand Up @@ -2478,8 +2473,12 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd
offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
}

// Visible for testing
// Functions below are for testing only
String getClientId() {
return clientId;
}

boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
return updateAssignmentMetadataIfNeeded(timer, true);
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -452,11 +452,6 @@ void maybeUpdateSubscriptionMetadata() {
}
}

// for testing
boolean poll(Timer timer) {
return poll(timer, true);
}

/**
* Poll for coordinator events. This ensures that the coordinator is known and that the consumer
* has joined the group (if it is using group management). This also handles periodic offset commits
Expand Down Expand Up @@ -511,6 +506,10 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) {

// if not wait for join group, we would just use a timer of 0
if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
// since we may use a different timer in the callee, we'd still need
// to update the original timer's current time after the call
timer.update(time.milliseconds());

return false;
}
}
Expand All @@ -532,7 +531,7 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) {
}

/**
* Return the time to the next needed invocation of {@link #poll(Timer)}.
* Return the time to the next needed invocation of {@link ConsumerNetworkClient#poll(Timer)}.
* @param now current time in milliseconds
* @return the maximum time in milliseconds the caller should wait before the next invocation of poll()
*/
Expand Down Expand Up @@ -1213,7 +1212,7 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> futu
if (generationUnchanged()) {
future.raise(error);
} else {
if (ConsumerCoordinator.this.state == MemberState.REBALANCING) {
if (ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
"consumer member's old generation is fenced by its group instance id, it is possible that " +
"this consumer has already participated another rebalance and got a new generation"));
Expand Down Expand Up @@ -1242,7 +1241,7 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> futu

// only need to reset generation and re-join group if generation has not changed or we are not in rebalancing;
// otherwise only raise rebalance-in-progress error
if (!generationUnchanged() && ConsumerCoordinator.this.state == MemberState.REBALANCING) {
if (!generationUnchanged() && ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
"consumer member's generation is already stale, meaning it has already participated another rebalance and " +
"got a new generation. You can try completing the rebalance by calling poll() and then retry commit again"));
Expand Down Expand Up @@ -1459,4 +1458,8 @@ public void invoke() {
RebalanceProtocol getProtocol() {
return protocol;
}

boolean poll(Timer timer) {
return poll(timer, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;

import org.slf4j.Logger;

/**
* A helper class for managing the heartbeat to the coordinator
*/
Expand All @@ -30,6 +33,7 @@ public final class Heartbeat {
private final Timer heartbeatTimer;
private final Timer sessionTimer;
private final Timer pollTimer;
private final Logger log;

private volatile long lastHeartbeatSend = 0L;
private volatile boolean heartbeatInFlight = false;
Expand All @@ -44,6 +48,9 @@ public Heartbeat(GroupRebalanceConfig config,
this.sessionTimer = time.timer(config.sessionTimeoutMs);
this.maxPollIntervalMs = config.rebalanceTimeoutMs;
this.pollTimer = time.timer(maxPollIntervalMs);

final LogContext logContext = new LogContext("[Heartbeat groupID=" + config.groupId + "] ");
this.log = logContext.logger(getClass());
}

private void update(long now) {
Expand All @@ -66,12 +73,18 @@ void sentHeartbeat(long now) {
heartbeatInFlight = true;
update(now);
heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);

if (log.isTraceEnabled()) {
log.trace("Sending heartbeat request with {}ms remaining on timer", heartbeatTimer.remainingMs());
}
}

void failHeartbeat() {
update(time.milliseconds());
heartbeatInFlight = false;
heartbeatTimer.reset(rebalanceConfig.retryBackoffMs);

log.trace("Heartbeat failed, reset the timer to {}ms remaining", heartbeatTimer.remainingMs());
}

void receiveHeartbeat() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,14 @@ public Queue<ClientRequest> requests() {
return this.requests;
}

public Queue<ClientResponse> responses() {
return this.responses;
}

public Queue<FutureResponse> futureResponses() {
return this.futureResponses;
}

public void respond(AbstractResponse response) {
respond(response, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,13 +508,17 @@ public void verifyPollTimesOutDuringMetadataUpdate() {

final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp0), null);
// Since we would enable the heartbeat thread after received join-response which could
// send the sync-group on behalf of the consumer if it is enqueued, we may still complete
// the rebalance and send out the fetch; in order to avoid it we do not prepare sync response here.
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, memberId, leaderId, Errors.NONE), coordinator);

consumer.poll(Duration.ZERO);

// The underlying client should NOT get a fetch request
final Queue<ClientRequest> requests = client.requests();
Assert.assertEquals(0, requests.size());
Assert.assertEquals(0, requests.stream().filter(request -> request.apiKey().equals(ApiKeys.FETCH)).count());
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -1253,9 +1257,6 @@ public void testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration() throws
time.sleep(heartbeatIntervalMs);
TestUtils.waitForCondition(heartbeatReceived::get, "Heartbeat response did not occur within timeout.");

consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
assertTrue(heartbeatReceived.get());

RuntimeException unsubscribeException = assertThrows(RuntimeException.class, consumer::unsubscribe);
assertEquals(partitionLost + singleTopicPartition, unsubscribeException.getCause().getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,26 @@ public void testNoGenerationWillNotTriggerProtocolNameCheck() {
&& syncGroupRequest.data.protocolName().equals(PROTOCOL_NAME);
}, syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, wrongProtocolName));

// let the retry to complete successfully to break out of the while loop
mockClient.prepareResponse(body -> {
if (!(body instanceof JoinGroupRequest)) {
return false;
}
JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
return joinGroupRequest.data().protocolType().equals(PROTOCOL_TYPE);
}, joinGroupFollowerResponse(1, memberId,
"memberid", Errors.NONE, PROTOCOL_TYPE));

mockClient.prepareResponse(body -> {
if (!(body instanceof SyncGroupRequest)) {
return false;
}

SyncGroupRequest syncGroupRequest = (SyncGroupRequest) body;
return syncGroupRequest.data.protocolType().equals(PROTOCOL_TYPE)
&& syncGroupRequest.data.protocolName().equals(PROTOCOL_NAME);
}, syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, PROTOCOL_NAME));

// No exception shall be thrown as the generation is reset.
coordinator.joinGroupIfNeeded(mockTime.timer(100L));
}
Expand Down Expand Up @@ -531,7 +551,7 @@ public void testSyncGroupUnknownMemberResponseWithOldGeneration() throws Interru

final AbstractCoordinator.Generation currGen = coordinator.generation();

coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();

TestUtils.waitForCondition(() -> {
Expand Down Expand Up @@ -571,7 +591,7 @@ public void testSyncGroupIllegalGenerationResponseWithOldGeneration() throws Int

final AbstractCoordinator.Generation currGen = coordinator.generation();

coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();

TestUtils.waitForCondition(() -> {
Expand Down Expand Up @@ -604,6 +624,25 @@ public void testSyncGroupIllegalGenerationResponseWithOldGeneration() throws Int
assertEquals(newGen, coordinator.generation());
}

@Test
public void testHeartbeatSentWhenCompletingRebalance() throws Exception {
setupCoordinator();
joinGroup();

final AbstractCoordinator.Generation currGen = coordinator.generation();

coordinator.setNewState(AbstractCoordinator.MemberState.COMPLETING_REBALANCE);

// the heartbeat should be sent out during a rebalance
mockTime.sleep(HEARTBEAT_INTERVAL_MS);
TestUtils.waitForCondition(() -> !mockClient.requests().isEmpty(), 2000,
"The heartbeat request was not sent");
assertTrue(coordinator.heartbeat().hasInflight());

mockClient.respond(heartbeatResponse(Errors.REBALANCE_IN_PROGRESS));
assertEquals(currGen, coordinator.generation());
}

@Test
public void testHeartbeatIllegalGenerationResponseWithOldGeneration() throws InterruptedException {
setupCoordinator();
Expand Down Expand Up @@ -673,7 +712,7 @@ public void testHeartbeatUnknownMemberResponseWithOldGeneration() throws Interru
}

@Test
public void testHeartbeatUnknownMemberResponseDuringRebalancing() throws InterruptedException {
public void testHeartbeatRebalanceInProgressResponseDuringRebalancing() throws InterruptedException {
setupCoordinator();
joinGroup();

Expand All @@ -687,8 +726,7 @@ public void testHeartbeatUnknownMemberResponseDuringRebalancing() throws Interru

assertTrue(coordinator.heartbeat().hasInflight());

// set the client to re-join group
mockClient.respond(heartbeatResponse(Errors.UNKNOWN_MEMBER_ID));
mockClient.respond(heartbeatResponse(Errors.REBALANCE_IN_PROGRESS));

coordinator.requestRejoin();

Expand All @@ -699,8 +737,8 @@ public void testHeartbeatUnknownMemberResponseDuringRebalancing() throws Interru
2000,
"The heartbeat response was not received");

// the generation should be reset but the rebalance should still proceed
assertEquals(AbstractCoordinator.Generation.NO_GENERATION, coordinator.generation());
// the generation would not be reset while the rebalance is in progress
assertEquals(currGen, coordinator.generation());

mockClient.respond(joinGroupFollowerResponse(currGen.generationId, memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.NONE));
mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
Expand Down Expand Up @@ -1098,44 +1136,6 @@ public void testWakeupAfterJoinGroupReceivedExternalCompletion() throws Exceptio
awaitFirstHeartbeat(heartbeatReceived);
}

@Test
public void testWakeupAfterSyncGroupSent() throws Exception {
setupCoordinator();

mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
mockClient.prepareResponse(new MockClient.RequestMatcher() {
private int invocations = 0;
@Override
public boolean matches(AbstractRequest body) {
invocations++;
boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
if (isSyncGroupRequest && invocations == 1)
// simulate wakeup after the request sent
throw new WakeupException();
return isSyncGroupRequest;
}
}, syncGroupResponse(Errors.NONE));
AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();

try {
coordinator.ensureActiveGroup();
fail("Should have woken up from ensureActiveGroup()");
} catch (WakeupException ignored) {
}

assertEquals(1, coordinator.onJoinPrepareInvokes);
assertEquals(0, coordinator.onJoinCompleteInvokes);
assertFalse(heartbeatReceived.get());

coordinator.ensureActiveGroup();

assertEquals(1, coordinator.onJoinPrepareInvokes);
assertEquals(1, coordinator.onJoinCompleteInvokes);

awaitFirstHeartbeat(heartbeatReceived);
}

@Test
public void testWakeupAfterSyncGroupSentExternalCompletion() throws Exception {
setupCoordinator();
Expand All @@ -1149,8 +1149,8 @@ public boolean matches(AbstractRequest body) {
invocations++;
boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
if (isSyncGroupRequest && invocations == 1)
// simulate wakeup after the request sent
throw new WakeupException();
// wakeup after the request returns
consumerClient.wakeup();
return isSyncGroupRequest;
}
}, syncGroupResponse(Errors.NONE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2119,7 +2119,7 @@ public void testCommitOffsetIllegalGenerationWithNewGeneration() {
"memberId-new",
null);
coordinator.setNewGeneration(newGen);
coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);

assertTrue(consumerClient.poll(future, time.timer(30000)));
assertTrue(future.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception()));
Expand Down Expand Up @@ -2174,7 +2174,7 @@ public void testCommitOffsetUnknownMemberWithNewGenearion() {
"memberId-new",
null);
coordinator.setNewGeneration(newGen);
coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);

assertTrue(consumerClient.poll(future, time.timer(30000)));
assertTrue(future.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception()));
Expand Down Expand Up @@ -2218,7 +2218,7 @@ public void testCommitOffsetFencedInstanceWithRebalancingGenearion() {
"memberId",
null);
coordinator.setNewGeneration(currGen);
coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);

prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.FENCED_INSTANCE_ID);
RequestFuture<Void> future = coordinator.sendOffsetCommitRequest(singletonMap(t1p,
Expand Down Expand Up @@ -2818,14 +2818,18 @@ public void testConsumerRejoinAfterRebalance() {
res = coordinator.joinGroupIfNeeded(time.timer(1));

assertFalse(res);

// should have retried sending a join group request already
assertFalse(client.hasPendingResponses());
assertFalse(client.hasInFlightRequests());
assertEquals(1, client.inFlightRequestCount());

System.out.println(client.requests());

// Retry join should then succeed
client.prepareResponse(joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE));
client.respond(joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE));
client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));

res = coordinator.joinGroupIfNeeded(time.timer(2));
res = coordinator.joinGroupIfNeeded(time.timer(3000));

assertTrue(res);
assertFalse(client.hasPendingResponses());
Expand Down
Loading

0 comments on commit 16ec179

Please sign in to comment.