Skip to content

Commit

Permalink
KAFKA-15649: Handle directory failure timeout (apache#15697)
Browse files Browse the repository at this point in the history
A broker that is unable to communicate with the controller will shut down
after the configurable log.dir.failure.timeout.ms.

The implementation adds a new event to the Kafka EventQueue. This event
is deferred by the configured timeout and will execute the shutdown
if the heartbeat communication containing the failed log dir is still
pending with the controller.

Reviewers: Igor Soarez <soarez@apple.com>
  • Loading branch information
viktorsomogyi committed May 23, 2024
1 parent 8d117a1 commit 5a48984
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 22 deletions.
42 changes: 30 additions & 12 deletions core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}

import java.util.{Comparator, OptionalLong}
import scala.collection.mutable
import scala.jdk.CollectionConverters._

/**
Expand All @@ -58,7 +59,8 @@ class BrokerLifecycleManager(
val time: Time,
val threadNamePrefix: String,
val isZkBroker: Boolean,
val logDirs: Set[Uuid]
val logDirs: Set[Uuid],
val shutdownHook: () => Unit = () => {}
) extends Logging {

private def logPrefix(): String = {
Expand Down Expand Up @@ -149,10 +151,11 @@ class BrokerLifecycleManager(
private var readyToUnfence = false

/**
* List of accumulated offline directories.
* Map of accumulated offline directories. The value is true if the directory couldn't be communicated
* to the Controller.
* This variable can only be read or written from the event queue thread.
*/
private var offlineDirs = Set[Uuid]()
private var offlineDirs = mutable.Map[Uuid, Boolean]()

/**
* True if we sent a event queue to the active controller requesting controlled
Expand Down Expand Up @@ -253,8 +256,12 @@ class BrokerLifecycleManager(
* Propagate directory failures to the controller.
* @param directory The ID for the directory that failed.
*/
def propagateDirectoryFailure(directory: Uuid): Unit = {
def propagateDirectoryFailure(directory: Uuid, timeout: Long): Unit = {
eventQueue.append(new OfflineDirEvent(directory))
// If we can't communicate the offline directory to the controller, we should shut down.
eventQueue.scheduleDeferred("offlineDirFailure",
new DeadlineFunction(time.nanoseconds() + MILLISECONDS.toNanos(timeout)),
new OfflineDirBrokerFailureEvent(directory))
}

def handleKraftJBODMetadataVersionUpdate(): Unit = {
Expand Down Expand Up @@ -327,16 +334,25 @@ class BrokerLifecycleManager(
private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event {
override def run(): Unit = {
if (offlineDirs.isEmpty) {
offlineDirs = Set(dir)
offlineDirs = mutable.Map(dir -> false)
} else {
offlineDirs = offlineDirs + dir
offlineDirs += (dir -> false)
}
if (registered) {
scheduleNextCommunicationImmediately()
}
}
}

private class OfflineDirBrokerFailureEvent(offlineDir: Uuid) extends EventQueue.Event {
override def run(): Unit = {
if (!offlineDirs.getOrElse(offlineDir, false)) {
error(s"Shutting down because couldn't communicate offline log dir $offlineDir with controllers")
shutdownHook()
}
}
}

private class StartupEvent(highestMetadataOffsetProvider: () => Long,
channelManager: NodeToControllerChannelManager,
clusterId: String,
Expand Down Expand Up @@ -456,30 +472,31 @@ class BrokerLifecycleManager(
setCurrentMetadataOffset(metadataOffset).
setWantFence(!readyToUnfence).
setWantShutDown(_state == BrokerState.PENDING_CONTROLLED_SHUTDOWN).
setOfflineLogDirs(offlineDirs.toSeq.asJava)
setOfflineLogDirs(offlineDirs.keys.toSeq.asJava)
if (isTraceEnabled) {
trace(s"Sending broker heartbeat $data")
}
val handler = new BrokerHeartbeatResponseHandler()
val handler = new BrokerHeartbeatResponseHandler(offlineDirs.keys)
_channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), handler)
communicationInFlight = true
}

// the response handler is not invoked from the event handler thread,
// so it is not safe to update state here, instead, schedule an event
// to continue handling the response on the event handler thread
private class BrokerHeartbeatResponseHandler extends ControllerRequestCompletionHandler {
private class BrokerHeartbeatResponseHandler(currentOfflineDirs: Iterable[Uuid]) extends ControllerRequestCompletionHandler {
override def onComplete(response: ClientResponse): Unit = {
eventQueue.prepend(new BrokerHeartbeatResponseEvent(response, false))
eventQueue.prepend(new BrokerHeartbeatResponseEvent(response, false, currentOfflineDirs))
}

override def onTimeout(): Unit = {
info("Unable to send a heartbeat because the RPC got timed out before it could be sent.")
eventQueue.prepend(new BrokerHeartbeatResponseEvent(null, true))
eventQueue.prepend(new BrokerHeartbeatResponseEvent(null, true, currentOfflineDirs))
}
}

private class BrokerHeartbeatResponseEvent(response: ClientResponse, timedOut: Boolean) extends EventQueue.Event {
private class BrokerHeartbeatResponseEvent(response: ClientResponse, timedOut: Boolean,
currentOfflineDirs: Iterable[Uuid]) extends EventQueue.Event {
override def run(): Unit = {
communicationInFlight = false
if (timedOut) {
Expand Down Expand Up @@ -507,6 +524,7 @@ class BrokerLifecycleManager(
if (errorCode == Errors.NONE) {
val responseData = message.data()
failedAttempts = 0
currentOfflineDirs.foreach(cur => offlineDirs.put(cur, true))
_state match {
case BrokerState.STARTING =>
if (responseData.isCaughtUp) {
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import java.util
import java.util.Optional
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.{Condition, ReentrantLock}
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, TimeoutException}
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeoutException, TimeUnit}
import scala.collection.Map
import scala.compat.java8.OptionConverters.RichOptionForJava8
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -210,7 +210,8 @@ class BrokerServer(
time,
s"broker-${config.nodeId}-",
isZkBroker = false,
logDirs = logManager.directoryIdsSet)
logDirs = logManager.directoryIdsSet,
() => new Thread(() => shutdown(), "kafka-shutdown-thread").start())

// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
// This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
Expand Down Expand Up @@ -304,7 +305,7 @@ class BrokerServer(
assignmentsManager.onAssignment(partition, directoryId, reason, callback)

override def handleFailure(directoryId: Uuid): Unit =
lifecycleManager.propagateDirectoryFailure(directoryId)
lifecycleManager.propagateDirectoryFailure(directoryId, config.logDirFailureTimeoutMs)
}

this._replicaManager = new ReplicaManager(
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ object KafkaConfig {
.define(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, BOOLEAN, ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DEFAULT, LOW, ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DOC)
.defineInternal(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, LONG, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT, atLeast(0), LOW, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DOC)
.define(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG, LONG, ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DEFAULT, atLeast(1), LOW, ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DOC)

/** ********* Replication configuration ***********/
.define(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, INT, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, MEDIUM, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DOC)
Expand Down Expand Up @@ -974,6 +975,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami

def logMessageDownConversionEnable: Boolean = getBoolean(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG)

def logDirFailureTimeoutMs: Long = getLong(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG)

/** ********* Replication configuration ***********/
val controllerSocketTimeoutMs: Int = getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG)
val defaultReplicationFactor: Int = getInt(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG)
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2440,7 +2440,9 @@ class ReplicaManager(val config: KafkaConfig,
def handleLogDirFailure(dir: String, notifyController: Boolean = true): Unit = {
if (!logManager.isLogDirOnline(dir))
return
warn(s"Stopping serving replicas in dir $dir")
// retrieve the UUID here because logManager.handleLogDirFailure handler removes it
val uuid = logManager.directoryId(dir)
warn(s"Stopping serving replicas in dir $dir with uuid $uuid because the log directory has failed.")
replicaStateChangeLock synchronized {
val newOfflinePartitions = onlinePartitionsIterator.filter { partition =>
partition.log.exists { _.parentDir == dir }
Expand All @@ -2465,8 +2467,6 @@ class ReplicaManager(val config: KafkaConfig,
warn(s"Broker $localBrokerId stopped fetcher for partitions ${newOfflinePartitions.mkString(",")} and stopped moving logs " +
s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.")
}
// retrieve the UUID here because logManager.handleLogDirFailure handler removes it
val uuid = logManager.directoryId(dir)
logManager.handleLogDirFailure(dir)
if (dir == new File(config.metadataLogDir).getAbsolutePath && (config.processRoles.nonEmpty || config.migrationEnabled)) {
fatal(s"Shutdown broker because the metadata log dir $dir has failed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,11 @@ class BrokerLifecycleManagerTest {
poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())))
.data().offlineLogDirs().asScala.map(_.toString).toSet
assertEquals(Set.empty, nextHeartbeatDirs())
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"), Integer.MAX_VALUE)
assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs())
manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"), Integer.MAX_VALUE)
assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow"), nextHeartbeatDirs())
manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg"))
manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg"), Integer.MAX_VALUE)
assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow", "1iF76HVNRPqC7Y4r6647eg"), nextHeartbeatDirs())
manager.close()
}
Expand Down
30 changes: 29 additions & 1 deletion core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import java.util.Collections
import java.util.concurrent.{ExecutionException, TimeUnit}
import kafka.api.IntegrationTestHarness
import kafka.controller.{OfflineReplica, PartitionAndReplica}
import kafka.utils.TestUtils.{Checkpoint, LogDirFailureType, Roll, waitUntilTrue}
import kafka.utils.TestUtils.{waitUntilTrue, Checkpoint, LogDirFailureType, Roll}
import kafka.utils.{CoreUtils, Exit, TestUtils}
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderOrFollowerException}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
Expand All @@ -52,6 +53,8 @@ class LogDirFailureTest extends IntegrationTestHarness {

this.serverConfig.setProperty(ReplicationConfigs.REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_CONFIG, "60000")
this.serverConfig.setProperty(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "1")
this.serverConfig.setProperty(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG, "5000")
this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false")

@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
Expand All @@ -66,6 +69,31 @@ class LogDirFailureTest extends IntegrationTestHarness {
testProduceErrorsFromLogDirFailureOnLeader(Roll)
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testLogDirNotificationTimeout(quorum: String): Unit = {
// Disable retries to allow exception to bubble up for validation
this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
this.producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
val producer = createProducer()

val partition = new TopicPartition(topic, 0)

val leaderServerId = producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id()
val leaderServer = brokers.find(_.config.brokerId == leaderServerId).get

// shut down the controller to simulate the case where the broker is not able to send the log dir notification
controllerServer.shutdown()
controllerServer.awaitShutdown()

TestUtils.causeLogDirFailure(Checkpoint, leaderServer, partition)

TestUtils.waitUntilTrue(() => leaderServer.brokerState == BrokerState.SHUTTING_DOWN,
s"Expected broker to be in NOT_RUNNING state but was ${leaderServer.brokerState}", 15000)
// wait for actual shutdown (by default max 5 minutes for graceful shutdown)
leaderServer.awaitShutdown()
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testIOExceptionDuringLogRoll(quorum: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,9 @@ public class ServerLogConfigs {
public static final long LOG_INITIAL_TASK_DELAY_MS_DEFAULT = 30 * 1000L;
public static final String LOG_INITIAL_TASK_DELAY_MS_DOC = "The initial task delay in millisecond when initializing " +
"tasks in LogManager. This should be used for testing only.";

public static final String LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG = LOG_PREFIX + "dir.failure.timeout.ms";
public static final Long LOG_DIR_FAILURE_TIMEOUT_MS_DEFAULT = 30000L;
public static final String LOG_DIR_FAILURE_TIMEOUT_MS_DOC = "If the broker is unable to successfully communicate to the controller that some log " +
"directory has failed for longer than this time, the broker will fail and shut down.";
}

0 comments on commit 5a48984

Please sign in to comment.