Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15019: Improve handling of broker heartbeat timeouts #13759

Merged
merged 4 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, DescribeClusterResponse}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{Cluster, Endpoint, Reconfigurable, TopicPartition, TopicPartitionInfo}
import org.apache.kafka.controller.QuorumController
import org.apache.kafka.image.ClusterImage
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer._
Expand Down Expand Up @@ -1106,6 +1107,34 @@ class KRaftClusterTest {
cluster.close()
}
}

@Test
def testTimedOutHeartbeats(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(3).
setNumControllerNodes(1).build()).
setConfigProp(KafkaConfig.BrokerHeartbeatIntervalMsProp, 10.toString).
setConfigProp(KafkaConfig.BrokerSessionTimeoutMsProp, 1000.toString).
build()
try {
cluster.format()
cluster.startup()
val controller = cluster.controllers().values().iterator().next()
controller.controller.waitForReadyBrokers(3).get()
TestUtils.retry(60000) {
val latch = controller.controller.asInstanceOf[QuorumController].pause()
Thread.sleep(1001)
latch.countDown()
controller.controller.asInstanceOf[QuorumController].pause().countDown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this line? Seems it is an unnecessary extra pause/resume, and the test passes without it. Can we remove it?

Copy link
Contributor Author

@cmccabe cmccabe May 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

assertEquals(0, controller.sharedServer.controllerServerMetrics.fencedBrokerCount())
assertTrue(controller.quorumControllerMetrics.timedOutHeartbeats() > 0,
"Expected timedOutHeartbeats to be greater than 0.");
}
} finally {
cluster.close()
}
}
}

class BadAuthorizer() extends Authorizer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,14 +550,14 @@ BrokerControlStates calculateNextBrokerState(int brokerId,
} else if (!request.wantFence()) {
if (request.currentMetadataOffset() >= registerBrokerRecordOffset) {
log.info("The request from broker {} to unfence has been granted " +
"because it has caught up with the offset of it's register " +
"because it has caught up with the offset of its register " +
"broker record {}.", brokerId, registerBrokerRecordOffset);
return new BrokerControlStates(currentState, UNFENCED);
} else {
if (log.isDebugEnabled()) {
log.debug("The request from broker {} to unfence cannot yet " +
"be granted because it has not caught up with the offset of " +
"it's register broker record {}. It is still at offset {}.",
"its register broker record {}. It is still at offset {}.",
brokerId, registerBrokerRecordOffset, request.currentMetadataOffset());
}
return new BrokerControlStates(currentState, FENCED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
* brokers being fenced or unfenced, and broker feature versions.
*/
public class ClusterControlManager {
final static long DEFAULT_SESSION_TIMEOUT_NS = NANOSECONDS.convert(18, TimeUnit.SECONDS);
final static long DEFAULT_SESSION_TIMEOUT_NS = NANOSECONDS.convert(9, TimeUnit.SECONDS);

static class Builder {
private LogContext logContext = null;
Expand Down Expand Up @@ -236,6 +236,9 @@ boolean check() {
*/
private final FeatureControlManager featureControl;

/**
* True if migration from ZK is enabled.
*/
private final boolean zkMigrationEnabled;

private ClusterControlManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.errors.ControllerExceptions;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
Expand Down Expand Up @@ -2133,7 +2134,12 @@ public void processBatchEndOffset(long offset) {
}
}
},
EnumSet.of(RUNS_IN_PREMIGRATION));
EnumSet.of(RUNS_IN_PREMIGRATION)).whenComplete((__, t) -> {
if (ControllerExceptions.isNormalTimeoutException(t)) {
replicationControl.processExpiredBrokerHeartbeat(request);
controllerMetrics.incrementTimedOutHeartbeats();
}
});
}

@Override
Expand Down Expand Up @@ -2286,7 +2292,7 @@ public void close() throws InterruptedException {
}

// VisibleForTesting
CountDownLatch pause() {
public CountDownLatch pause() {
final CountDownLatch latch = new CountDownLatch(1);
appendControlEvent("pause", () -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,9 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType,
}

ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat(
BrokerHeartbeatRequestData request, long registerBrokerRecordOffset) {
BrokerHeartbeatRequestData request,
long registerBrokerRecordOffset
) {
int brokerId = request.brokerId();
long brokerEpoch = request.brokerEpoch();
clusterControl.checkBrokerEpoch(brokerId, brokerEpoch);
Expand Down Expand Up @@ -1419,6 +1421,15 @@ ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat(
return ControllerResult.of(records, reply);
}

void processExpiredBrokerHeartbeat(BrokerHeartbeatRequestData request) {
int brokerId = request.brokerId();
clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
clusterControl.heartbeatManager().touch(brokerId,
clusterControl.brokerRegistrations().get(brokerId).fenced(),
request.currentMetadataOffset());
log.error("processExpiredBrokerHeartbeat: timed out heartbeat from broker {}.", brokerId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the operator meant to do when they see this error message? Perhaps we can explain the implications in the log message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to "controller event queue overloaded. timed out heartbeat from broker {}"

}

public ControllerResult<Void> unregisterBroker(int brokerId) {
BrokerRegistration registration = clusterControl.brokerRegistrations().get(brokerId);
if (registration == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.controller.errors;

import org.apache.kafka.common.errors.TimeoutException;

import java.util.concurrent.ExecutionException;


public class ControllerExceptions {
/**
* Check if an exception is a normal timeout exception.
*
* @param exception The exception to check.
* @return True if the exception is a normal timeout exception and NOT a timeout
* exception generated by the controller shutting down.
*/
public static boolean isNormalTimeoutException(Throwable exception) {
if (exception == null) return false;
if (exception instanceof ExecutionException) {
exception = exception.getCause();
if (exception == null) return false;
}
if (!(exception instanceof TimeoutException)) return false;
if (exception.getMessage() != null &&
exception.getMessage().equals("The controller is shutting down.")) {
return false;
}
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems brittle. Is there a way we can identify the specific thing we are interested in?

Copy link
Contributor Author

@cmccabe cmccabe May 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the check for the exception text. (It wasn't actually doing anything here in any case!) This is something I'll revisit in a follow-on PR (there are a few corner cases around error handling and queue shutdown to fix up)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
private final Consumer<Long> eventQueueTimeUpdater;
private final Consumer<Long> eventQueueProcessingTimeUpdater;
private final AtomicLong timedOutHeartbeats = new AtomicLong(0);

private Consumer<Long> newHistogram(MetricName name, boolean biased) {
if (registry.isPresent()) {
Expand Down Expand Up @@ -150,6 +151,18 @@ public long lastAppliedRecordTimestamp() {
return lastAppliedRecordTimestamp.get();
}

public void incrementTimedOutHeartbeats() {
timedOutHeartbeats.addAndGet(1);
}

public void setTimedOutHeartbeats(long heartbeats) {
timedOutHeartbeats.set(heartbeats);
}

public long timedOutHeartbeats() {
return timedOutHeartbeats.get();
}

@Override
public void close() {
registry.ifPresent(r -> Arrays.asList(
Expand Down