Skip to content

Commit

Permalink
KAFKA-15019: Improve handling of broker heartbeat timeouts (apache#13759
Browse files Browse the repository at this point in the history
)

When the active KRaft controller is overloaded, it will not be able to process broker heartbeat
requests. Instead, they will be timed out. When using the default configuration, this will happen
if the time needed to process a broker heartbeat climbs above a second for a sustained period.
This, in turn, could lead to brokers being improperly fenced when they are still alive.

With this PR, timed out heartbeats will still update the lastContactNs and metadataOffset of the
broker in the BrokerHeartbeatManager. While we don't generate any records, this should still be
adequate to prevent spurious fencing. We also log a message at ERROR level so that this condition
will be more obvious.

Other small changes in this PR: fix grammar issue in log4j of BrokerHeartbeatManager. Add JavaDoc
for ClusterControlManager#zkMigrationEnabled field. Add builder for ReplicationControlTestContext
to avoid having tons of constructors. Update ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS to
match the default in KafkaConfig.

Reviewers: Ismael Juma <ijuma@apache.org>, Ron Dagostino <rdagostino@confluent.io>
  • Loading branch information
cmccabe committed Jun 1, 2023
1 parent fb3346c commit c2f6f29
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 56 deletions.
Expand Up @@ -34,6 +34,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, DescribeClusterResponse}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{Cluster, Endpoint, Reconfigurable, TopicPartition, TopicPartitionInfo}
import org.apache.kafka.controller.QuorumController
import org.apache.kafka.image.ClusterImage
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer._
Expand Down Expand Up @@ -1106,6 +1107,33 @@ class KRaftClusterTest {
cluster.close()
}
}

@Test
def testTimedOutHeartbeats(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(3).
setNumControllerNodes(1).build()).
setConfigProp(KafkaConfig.BrokerHeartbeatIntervalMsProp, 10.toString).
setConfigProp(KafkaConfig.BrokerSessionTimeoutMsProp, 1000.toString).
build()
try {
cluster.format()
cluster.startup()
val controller = cluster.controllers().values().iterator().next()
controller.controller.waitForReadyBrokers(3).get()
TestUtils.retry(60000) {
val latch = controller.controller.asInstanceOf[QuorumController].pause()
Thread.sleep(1001)
latch.countDown()
assertEquals(0, controller.sharedServer.controllerServerMetrics.fencedBrokerCount())
assertTrue(controller.quorumControllerMetrics.timedOutHeartbeats() > 0,
"Expected timedOutHeartbeats to be greater than 0.");
}
} finally {
cluster.close()
}
}
}

class BadAuthorizer() extends Authorizer {
Expand Down
Expand Up @@ -550,14 +550,14 @@ BrokerControlStates calculateNextBrokerState(int brokerId,
} else if (!request.wantFence()) {
if (request.currentMetadataOffset() >= registerBrokerRecordOffset) {
log.info("The request from broker {} to unfence has been granted " +
"because it has caught up with the offset of it's register " +
"because it has caught up with the offset of its register " +
"broker record {}.", brokerId, registerBrokerRecordOffset);
return new BrokerControlStates(currentState, UNFENCED);
} else {
if (log.isDebugEnabled()) {
log.debug("The request from broker {} to unfence cannot yet " +
"be granted because it has not caught up with the offset of " +
"it's register broker record {}. It is still at offset {}.",
"its register broker record {}. It is still at offset {}.",
brokerId, registerBrokerRecordOffset, request.currentMetadataOffset());
}
return new BrokerControlStates(currentState, FENCED);
Expand Down
Expand Up @@ -74,7 +74,7 @@
* brokers being fenced or unfenced, and broker feature versions.
*/
public class ClusterControlManager {
final static long DEFAULT_SESSION_TIMEOUT_NS = NANOSECONDS.convert(18, TimeUnit.SECONDS);
final static long DEFAULT_SESSION_TIMEOUT_NS = NANOSECONDS.convert(9, TimeUnit.SECONDS);

static class Builder {
private LogContext logContext = null;
Expand Down Expand Up @@ -236,6 +236,9 @@ boolean check() {
*/
private final FeatureControlManager featureControl;

/**
* True if migration from ZK is enabled.
*/
private final boolean zkMigrationEnabled;

private ClusterControlManager(
Expand Down
Expand Up @@ -78,6 +78,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.errors.ControllerExceptions;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
Expand Down Expand Up @@ -2141,7 +2142,12 @@ public void processBatchEndOffset(long offset) {
}
}
},
EnumSet.of(RUNS_IN_PREMIGRATION));
EnumSet.of(RUNS_IN_PREMIGRATION)).whenComplete((__, t) -> {
if (ControllerExceptions.isTimeoutException(t)) {
replicationControl.processExpiredBrokerHeartbeat(request);
controllerMetrics.incrementTimedOutHeartbeats();
}
});
}

@Override
Expand Down Expand Up @@ -2294,7 +2300,7 @@ public void close() throws InterruptedException {
}

// VisibleForTesting
CountDownLatch pause() {
public CountDownLatch pause() {
final CountDownLatch latch = new CountDownLatch(1);
appendControlEvent("pause", () -> {
try {
Expand Down
Expand Up @@ -1384,7 +1384,9 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType,
}

ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat(
BrokerHeartbeatRequestData request, long registerBrokerRecordOffset) {
BrokerHeartbeatRequestData request,
long registerBrokerRecordOffset
) {
int brokerId = request.brokerId();
long brokerEpoch = request.brokerEpoch();
clusterControl.checkBrokerEpoch(brokerId, brokerEpoch);
Expand Down Expand Up @@ -1419,6 +1421,23 @@ ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat(
return ControllerResult.of(records, reply);
}

/**
* Process a broker heartbeat which has been sitting on the queue for too long, and has
* expired. With default settings, this would happen after 1 second. We process expired
* heartbeats by updating the lastSeenNs of the broker, so that the broker won't get fenced
* incorrectly. However, we don't perform any state changes that we normally would, such as
* unfencing a fenced broker, etc.
*/
void processExpiredBrokerHeartbeat(BrokerHeartbeatRequestData request) {
int brokerId = request.brokerId();
clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
clusterControl.heartbeatManager().touch(brokerId,
clusterControl.brokerRegistrations().get(brokerId).fenced(),
request.currentMetadataOffset());
log.error("processExpiredBrokerHeartbeat: controller event queue overloaded. Timed out " +
"heartbeat from broker {}.", brokerId);
}

public ControllerResult<Void> unregisterBroker(int brokerId) {
BrokerRegistration registration = clusterControl.brokerRegistrations().get(brokerId);
if (registration == null) {
Expand Down
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.controller.errors;

import org.apache.kafka.common.errors.TimeoutException;

import java.util.concurrent.ExecutionException;


public class ControllerExceptions {
/**
* Check if an exception is a normal timeout exception.
*
* @param exception The exception to check.
* @return True if the exception is a timeout exception.
*/
public static boolean isTimeoutException(Throwable exception) {
if (exception == null) return false;
if (exception instanceof ExecutionException) {
exception = exception.getCause();
if (exception == null) return false;
}
if (!(exception instanceof TimeoutException)) return false;
return true;
}
}
Expand Up @@ -60,6 +60,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
private final Consumer<Long> eventQueueTimeUpdater;
private final Consumer<Long> eventQueueProcessingTimeUpdater;
private final AtomicLong timedOutHeartbeats = new AtomicLong(0);

private Consumer<Long> newHistogram(MetricName name, boolean biased) {
if (registry.isPresent()) {
Expand Down Expand Up @@ -150,6 +151,18 @@ public long lastAppliedRecordTimestamp() {
return lastAppliedRecordTimestamp.get();
}

public void incrementTimedOutHeartbeats() {
timedOutHeartbeats.addAndGet(1);
}

public void setTimedOutHeartbeats(long heartbeats) {
timedOutHeartbeats.set(heartbeats);
}

public long timedOutHeartbeats() {
return timedOutHeartbeats.get();
}

@Override
public void close() {
registry.ifPresent(r -> Arrays.asList(
Expand Down

0 comments on commit c2f6f29

Please sign in to comment.