Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16528: Client HB timing fix #15698

Merged
merged 15 commits into from
Apr 26, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) {

boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight();
if (!heartbeatRequestState.canSendRequest(currentTimeMs) && !heartbeatNow) {
return new NetworkClientDelegate.PollResult(heartbeatRequestState.nextHeartbeatMs(currentTimeMs));
return new NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
}

NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, false);
Expand Down Expand Up @@ -246,7 +246,7 @@ public long maximumTimeToWait(long currentTimeMs) {
) {
return 0L;
}
return Math.min(pollTimer.remainingMs() / 2, heartbeatRequestState.nextHeartbeatMs(currentTimeMs));
return Math.min(pollTimer.remainingMs() / 2, heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
}

/**
Expand All @@ -269,6 +269,7 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long curr
heartbeatRequestState.onSendAttempt(currentTimeMs);
membershipManager.onHeartbeatRequestSent();
metricsManager.recordHeartbeatSentMs(currentTimeMs);
heartbeatRequestState.resetTimer();
return request;
}

Expand Down Expand Up @@ -325,7 +326,6 @@ private void onResponse(final ConsumerGroupHeartbeatResponse response, long curr
if (Errors.forCode(response.data().errorCode()) == Errors.NONE) {
heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs());
heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
heartbeatRequestState.resetTimer();
membershipManager.onHeartbeatSuccess(response.data());
return;
}
Expand Down Expand Up @@ -380,7 +380,7 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
break;

case UNRELEASED_INSTANCE_ID:
logger.error("GroupHeartbeatRequest failed due to the instance id {} was not released: {}",
logger.error("GroupHeartbeatRequest failed due to unreleased instance id {}: {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: are these logging changes of the ‘I'll just clean this up as long as I'm in here?’ variety, or dow it have some bearing on the correctness of the logs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both. The change for the default log is needed: it was not including the errorMessage, and that makes it hard to know what happened when you get errors like INVALID_REQUEST (I personally got it and lost time investigating, so fixed it). The other log changes are just improvements because I was already there.

membershipManager.groupInstanceId().orElse("null"), errorMessage);
handleFatalFailure(Errors.UNRELEASED_INSTANCE_ID.exception(errorMessage));
break;
Expand All @@ -389,7 +389,7 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
case GROUP_MAX_SIZE_REACHED:
case UNSUPPORTED_ASSIGNOR:
case UNSUPPORTED_VERSION:
logger.error("GroupHeartbeatRequest failed due to error: {}", error);
logger.error("GroupHeartbeatRequest failed due to {}: {}", error, errorMessage);
handleFatalFailure(error.exception(errorMessage));
break;

Expand All @@ -413,7 +413,7 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,

default:
// If the manager receives an unknown error - there could be a bug in the code or a new error code
logger.error("GroupHeartbeatRequest failed due to unexpected error: {}", error);
logger.error("GroupHeartbeatRequest failed due to unexpected error {}: {}", error, errorMessage);
handleFatalFailure(error.exception(errorMessage));
break;
}
Expand Down Expand Up @@ -469,19 +469,33 @@ public void resetTimer() {
this.heartbeatTimer.reset(heartbeatIntervalMs);
}

/**
* Check if a heartbeat request should be sent on the current time. A heartbeat should be
* sent if the heartbeat timer has expired, backoff has expired, and there is no request
* in-flight.
*/
@Override
public boolean canSendRequest(final long currentTimeMs) {
update(currentTimeMs);
return heartbeatTimer.isExpired() && super.canSendRequest(currentTimeMs);
}

public long nextHeartbeatMs(final long currentTimeMs) {
if (heartbeatTimer.remainingMs() == 0) {
public long timeToNextHeartbeatMs(final long currentTimeMs) {
if (heartbeatTimer.isExpired()) {
return this.remainingBackoffMs(currentTimeMs);
}
return heartbeatTimer.remainingMs();
}

public void onFailedAttempt(final long currentTimeMs) {
// Reset timer to allow sending HB after a failure without waiting for the interval.
// After a failure, a next HB may be needed with backoff (ex. errors that lead to
// retries, like coordinator load error), or immediately (ex. errors that lead to
// rejoining, like fencing errors).
heartbeatTimer.reset(0);
super.onFailedAttempt(currentTimeMs);
Comment on lines +491 to +496
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the decision to reset the heartbeat timer depend on what type of error is received?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it doesn't. The timer is only to indicate that an interval should be respected. In cases of failures, we don't want to follow the interval (so we reset timer to 0). Each error will :

  • send a next HB based on other conditions (ex. as soon as the coordinator is discovered, when releasing assignment finishes after getting fenced)
  • not send a next HB at all (fatal errors)

}

private void updateHeartbeatIntervalMs(final long heartbeatIntervalMs) {
if (this.heartbeatIntervalMs == heartbeatIntervalMs) {
// no need to update the timer if the interval hasn't changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MS;
import static org.apache.kafka.common.utils.Utils.mkSortedSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -164,6 +165,35 @@ public void testHeartbeatOnStartup() {
assertEquals(0, result2.unsentRequests.size());
}

@Test
public void testSuccessfulHeartbeatTiming() {
mockStableMember();
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.unsentRequests.size(),
"No heartbeat should be sent while interval has not expired");
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), result.timeUntilNextPollMs);
assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);

result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need a verification here that ensures that the heartbeat timer was reset after the poll?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! I missed it. Added.

NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0);
assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS,
heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()),
"Heartbeat timer was not reset to the interval when the heartbeat request was sent.");

long partOfInterval = DEFAULT_HEARTBEAT_INTERVAL_MS / 3;
time.sleep(partOfInterval);
result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.unsentRequests.size(),
"No heartbeat should be sent while only part of the interval has passed");
assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - partOfInterval,
heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()),
"Time to next interval was not properly updated.");

inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE));
assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS - partOfInterval);
}

@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
public void testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short version) {
Expand Down Expand Up @@ -231,6 +261,35 @@ public void testTimerNotDue() {
assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
}

@Test
public void testHeartbeatNotSentIfAnotherOneInFlight() {
mockStableMember();
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);

// Heartbeat sent (no response received)
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());
NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0);

result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a " +
"previous one is in-flight");

time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent when the " +
"interval expires if there is a previous HB request in-flight");

// Receive response for the inflight after the interval expired. The next HB should be sent
// on the next poll waiting only for the minimal backoff.
inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE));
time.sleep(DEFAULT_RETRY_BACKOFF_MS);
result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size(), "A next heartbeat should be sent on " +
"the first poll after receiving a response that took longer than the interval, " +
"waiting only for the minimal backoff.");
}

@Test
public void testHeartbeatOutsideInterval() {
when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
Expand Down Expand Up @@ -402,38 +461,32 @@ public void testHeartbeatResponseOnErrorHandling(final Errors error, final boole
switch (error) {
case NONE:
verify(membershipManager).onHeartbeatSuccess(mockResponse.data());
assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
break;

case COORDINATOR_LOAD_IN_PROGRESS:
verify(backgroundEventHandler, never()).add(any());
assertEquals(DEFAULT_RETRY_BACKOFF_MS,
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()), "Request should " +
"backoff after receiving a coordinator load in progress error. ");
assertNextHeartbeatTiming(DEFAULT_RETRY_BACKOFF_MS);
break;

case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
verify(backgroundEventHandler, never()).add(any());
verify(coordinatorRequestManager).markCoordinatorUnknown(any(), anyLong());
assertEquals(0, heartbeatRequestState.nextHeartbeatMs(time.milliseconds()),
"Request should not apply backoff so that the next heartbeat is sent " +
"as soon as the new coordinator is discovered.");
assertNextHeartbeatTiming(0);
break;
case UNKNOWN_MEMBER_ID:
case FENCED_MEMBER_EPOCH:
verify(backgroundEventHandler, never()).add(any());
assertEquals(0, heartbeatRequestState.nextHeartbeatMs(time.milliseconds()),
"Request should not apply backoff so that the next heartbeat to rejoin is " +
"sent as soon as the fenced member releases its assignment.");
assertNextHeartbeatTiming(0);
break;
default:
if (isFatal) {
// The memberStateManager should have stopped heartbeat at this point
ensureFatalError();
} else {
verify(backgroundEventHandler, never()).add(any());
assertEquals(0, heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
assertNextHeartbeatTiming(0);
}
break;
}
Expand All @@ -446,6 +499,16 @@ public void testHeartbeatResponseOnErrorHandling(final Errors error, final boole
}
}

private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) {
long currentTimeMs = time.milliseconds();
assertEquals(expectedTimeToNextHeartbeatMs, heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
if (expectedTimeToNextHeartbeatMs != 0) {
assertFalse(heartbeatRequestState.canSendRequest(currentTimeMs));
time.sleep(expectedTimeToNextHeartbeatMs);
}
assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds()));
}

@Test
public void testHeartbeatState() {
// The initial ConsumerGroupHeartbeatRequest sets most fields to their initial empty values
Expand Down