Skip to content

Commit

Permalink
KAFKA-16528: Client HB timing fix (#15698)
Browse files Browse the repository at this point in the history
Fix for resetting HB timer when the request is sent, rather than when a response is received. This ensures a more accurate timing of the HB, so that a member always sends HB on the interval (not in the interval + any delay in receiving the response).
This change, along with the logic already in place for checking in-flights, ensures that if the interval expires but there is a HB in-flight, the next HB is only send after the response for the in-flight is received, without waiting for another full interval. This is btw consistent with the timer reset & inflight behaviour for the auto-commit interval.

Reviewers: Kirk True <ktrue@confluent.io>, Bruno Cadonna <cadonna@apache.org>
  • Loading branch information
lianetm committed Apr 26, 2024
1 parent 4958777 commit 82844c0
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) {

boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight();
if (!heartbeatRequestState.canSendRequest(currentTimeMs) && !heartbeatNow) {
return new NetworkClientDelegate.PollResult(heartbeatRequestState.nextHeartbeatMs(currentTimeMs));
return new NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
}

NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, false);
Expand Down Expand Up @@ -246,7 +246,7 @@ public long maximumTimeToWait(long currentTimeMs) {
) {
return 0L;
}
return Math.min(pollTimer.remainingMs() / 2, heartbeatRequestState.nextHeartbeatMs(currentTimeMs));
return Math.min(pollTimer.remainingMs() / 2, heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
}

/**
Expand All @@ -269,6 +269,7 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long curr
heartbeatRequestState.onSendAttempt(currentTimeMs);
membershipManager.onHeartbeatRequestSent();
metricsManager.recordHeartbeatSentMs(currentTimeMs);
heartbeatRequestState.resetTimer();
return request;
}

Expand Down Expand Up @@ -325,7 +326,6 @@ private void onResponse(final ConsumerGroupHeartbeatResponse response, long curr
if (Errors.forCode(response.data().errorCode()) == Errors.NONE) {
heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs());
heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
heartbeatRequestState.resetTimer();
membershipManager.onHeartbeatSuccess(response.data());
return;
}
Expand Down Expand Up @@ -380,7 +380,7 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
break;

case UNRELEASED_INSTANCE_ID:
logger.error("GroupHeartbeatRequest failed due to the instance id {} was not released: {}",
logger.error("GroupHeartbeatRequest failed due to unreleased instance id {}: {}",
membershipManager.groupInstanceId().orElse("null"), errorMessage);
handleFatalFailure(Errors.UNRELEASED_INSTANCE_ID.exception(errorMessage));
break;
Expand All @@ -389,7 +389,7 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
case GROUP_MAX_SIZE_REACHED:
case UNSUPPORTED_ASSIGNOR:
case UNSUPPORTED_VERSION:
logger.error("GroupHeartbeatRequest failed due to error: {}", error);
logger.error("GroupHeartbeatRequest failed due to {}: {}", error, errorMessage);
handleFatalFailure(error.exception(errorMessage));
break;

Expand All @@ -413,7 +413,7 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,

default:
// If the manager receives an unknown error - there could be a bug in the code or a new error code
logger.error("GroupHeartbeatRequest failed due to unexpected error: {}", error);
logger.error("GroupHeartbeatRequest failed due to unexpected error {}: {}", error, errorMessage);
handleFatalFailure(error.exception(errorMessage));
break;
}
Expand Down Expand Up @@ -469,19 +469,33 @@ public void resetTimer() {
this.heartbeatTimer.reset(heartbeatIntervalMs);
}

/**
* Check if a heartbeat request should be sent on the current time. A heartbeat should be
* sent if the heartbeat timer has expired, backoff has expired, and there is no request
* in-flight.
*/
@Override
public boolean canSendRequest(final long currentTimeMs) {
update(currentTimeMs);
return heartbeatTimer.isExpired() && super.canSendRequest(currentTimeMs);
}

public long nextHeartbeatMs(final long currentTimeMs) {
if (heartbeatTimer.remainingMs() == 0) {
public long timeToNextHeartbeatMs(final long currentTimeMs) {
if (heartbeatTimer.isExpired()) {
return this.remainingBackoffMs(currentTimeMs);
}
return heartbeatTimer.remainingMs();
}

public void onFailedAttempt(final long currentTimeMs) {
// Reset timer to allow sending HB after a failure without waiting for the interval.
// After a failure, a next HB may be needed with backoff (ex. errors that lead to
// retries, like coordinator load error), or immediately (ex. errors that lead to
// rejoining, like fencing errors).
heartbeatTimer.reset(0);
super.onFailedAttempt(currentTimeMs);
}

private void updateHeartbeatIntervalMs(final long heartbeatIntervalMs) {
if (this.heartbeatIntervalMs == heartbeatIntervalMs) {
// no need to update the timer if the interval hasn't changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MS;
import static org.apache.kafka.common.utils.Utils.mkSortedSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -164,6 +165,35 @@ public void testHeartbeatOnStartup() {
assertEquals(0, result2.unsentRequests.size());
}

@Test
public void testSuccessfulHeartbeatTiming() {
mockStableMember();
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.unsentRequests.size(),
"No heartbeat should be sent while interval has not expired");
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), result.timeUntilNextPollMs);
assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);

result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires");
NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0);
assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS,
heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()),
"Heartbeat timer was not reset to the interval when the heartbeat request was sent.");

long partOfInterval = DEFAULT_HEARTBEAT_INTERVAL_MS / 3;
time.sleep(partOfInterval);
result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.unsentRequests.size(),
"No heartbeat should be sent while only part of the interval has passed");
assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - partOfInterval,
heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()),
"Time to next interval was not properly updated.");

inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE));
assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS - partOfInterval);
}

@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
public void testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short version) {
Expand Down Expand Up @@ -231,6 +261,35 @@ public void testTimerNotDue() {
assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
}

@Test
public void testHeartbeatNotSentIfAnotherOneInFlight() {
mockStableMember();
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);

// Heartbeat sent (no response received)
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());
NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0);

result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a " +
"previous one is in-flight");

time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent when the " +
"interval expires if there is a previous HB request in-flight");

// Receive response for the inflight after the interval expired. The next HB should be sent
// on the next poll waiting only for the minimal backoff.
inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE));
time.sleep(DEFAULT_RETRY_BACKOFF_MS);
result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size(), "A next heartbeat should be sent on " +
"the first poll after receiving a response that took longer than the interval, " +
"waiting only for the minimal backoff.");
}

@Test
public void testHeartbeatOutsideInterval() {
when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
Expand Down Expand Up @@ -402,38 +461,32 @@ public void testHeartbeatResponseOnErrorHandling(final Errors error, final boole
switch (error) {
case NONE:
verify(membershipManager).onHeartbeatSuccess(mockResponse.data());
assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
break;

case COORDINATOR_LOAD_IN_PROGRESS:
verify(backgroundEventHandler, never()).add(any());
assertEquals(DEFAULT_RETRY_BACKOFF_MS,
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()), "Request should " +
"backoff after receiving a coordinator load in progress error. ");
assertNextHeartbeatTiming(DEFAULT_RETRY_BACKOFF_MS);
break;

case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
verify(backgroundEventHandler, never()).add(any());
verify(coordinatorRequestManager).markCoordinatorUnknown(any(), anyLong());
assertEquals(0, heartbeatRequestState.nextHeartbeatMs(time.milliseconds()),
"Request should not apply backoff so that the next heartbeat is sent " +
"as soon as the new coordinator is discovered.");
assertNextHeartbeatTiming(0);
break;
case UNKNOWN_MEMBER_ID:
case FENCED_MEMBER_EPOCH:
verify(backgroundEventHandler, never()).add(any());
assertEquals(0, heartbeatRequestState.nextHeartbeatMs(time.milliseconds()),
"Request should not apply backoff so that the next heartbeat to rejoin is " +
"sent as soon as the fenced member releases its assignment.");
assertNextHeartbeatTiming(0);
break;
default:
if (isFatal) {
// The memberStateManager should have stopped heartbeat at this point
ensureFatalError();
} else {
verify(backgroundEventHandler, never()).add(any());
assertEquals(0, heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
assertNextHeartbeatTiming(0);
}
break;
}
Expand All @@ -446,6 +499,16 @@ public void testHeartbeatResponseOnErrorHandling(final Errors error, final boole
}
}

private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) {
long currentTimeMs = time.milliseconds();
assertEquals(expectedTimeToNextHeartbeatMs, heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
if (expectedTimeToNextHeartbeatMs != 0) {
assertFalse(heartbeatRequestState.canSendRequest(currentTimeMs));
time.sleep(expectedTimeToNextHeartbeatMs);
}
assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds()));
}

@Test
public void testHeartbeatState() {
// The initial ConsumerGroupHeartbeatRequest sets most fields to their initial empty values
Expand Down

0 comments on commit 82844c0

Please sign in to comment.