From daea0fdff09c3ef4327add531e71a56fbbe6ca8c Mon Sep 17 00:00:00 2001 From: Vincent Jiang <84371940+vincent81jiang@users.noreply.github.com> Date: Wed, 30 Aug 2023 09:19:24 -0700 Subject: [PATCH] KAFKA-15375: fix broken clean shutdown detection logic in LogManager When running in kraft mode, LogManager.startup is called in a different thread than the main broker (#14239) startup thread (by BrokerMetadataPublisher when the first metadata update is received.) If a fatal error happens during broker startup, before LogManager.startup is completed, LogManager.shutdown may mark log dirs as clean shutdown improperly. This PR includes following change: 1. During LogManager startup time: - track hadCleanShutdwon info for each log dir - track loadLogsCompleted status for each log dir 2. During LogManager shutdown time: - do not write clean shutdown marker file for log dirs which have hadCleanShutdown==false and loadLogsCompleted==false Reviewers: Colin P. McCabe --- .../src/main/scala/kafka/log/LogManager.scala | 29 ++++++++-- .../scala/unit/kafka/log/LogManagerTest.scala | 53 +++++++++++++++++++ 2 files changed, 77 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 216063ae6ead..579d3aba424d 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -130,6 +130,12 @@ class LogManager(logDirs: Seq[File], logDirsSet } + // A map that stores hadCleanShutdown flag for each log dir. + private val hadCleanShutdownFlags = new ConcurrentHashMap[String, Boolean]() + + // A map that tells whether all logs in a log dir had been loaded or not at startup time. + private val loadLogsCompletedFlags = new ConcurrentHashMap[String, Boolean]() + @volatile private var _cleaner: LogCleaner = _ private[kafka] def cleaner: LogCleaner = _cleaner @@ -372,6 +378,7 @@ class LogManager(logDirs: Seq[File], Files.deleteIfExists(cleanShutdownFile.toPath) hadCleanShutdown = true } + hadCleanShutdownFlags.put(logDirAbsolutePath, hadCleanShutdown) var recoveryPoints = Map[TopicPartition, Long]() try { @@ -398,7 +405,8 @@ class LogManager(logDirs: Seq[File], !logDir.getName.equals(RemoteIndexCache.DIR_NAME) && UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic) numTotalLogs += logsToLoad.length - numRemainingLogs.put(dir.getAbsolutePath, logsToLoad.length) + numRemainingLogs.put(logDirAbsolutePath, logsToLoad.length) + loadLogsCompletedFlags.put(logDirAbsolutePath, logsToLoad.isEmpty) if (logsToLoad.isEmpty) { info(s"No logs found to be loaded in $logDirAbsolutePath") @@ -427,7 +435,7 @@ class LogManager(logDirs: Seq[File], // And while converting IOException to KafkaStorageException, we've already handled the exception. So we can ignore it here. } finally { val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs - val remainingLogs = decNumRemainingLogs(numRemainingLogs, dir.getAbsolutePath) + val remainingLogs = decNumRemainingLogs(numRemainingLogs, logDirAbsolutePath) val currentNumLoaded = logsToLoad.length - remainingLogs log match { case Some(loadedLog) => info(s"Completed load of $loadedLog with ${loadedLog.numberOfSegments} segments, " + @@ -435,6 +443,11 @@ class LogManager(logDirs: Seq[File], s"($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)") case None => info(s"Error while loading logs in $logDir in ${logLoadDurationMs}ms ($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)") } + + if (remainingLogs == 0) { + // loadLog is completed for all logs under the logDdir, mark it. + loadLogsCompletedFlags.put(logDirAbsolutePath, true) + } } } runnable @@ -628,9 +641,15 @@ class LogManager(logDirs: Seq[File], debug(s"Updating log start offsets at $dir") checkpointLogStartOffsetsInDir(dir, logs) - // mark that the shutdown was clean by creating marker file - debug(s"Writing clean shutdown marker at $dir") - CoreUtils.swallow(Files.createFile(new File(dir, LogLoader.CleanShutdownFile).toPath), this) + // mark that the shutdown was clean by creating marker file for log dirs that: + // 1. had clean shutdown marker file; or + // 2. had no clean shutdown marker file, but all logs under it have been recovered at startup time + val logDirAbsolutePath = dir.getAbsolutePath + if (hadCleanShutdownFlags.getOrDefault(logDirAbsolutePath, false) || + loadLogsCompletedFlags.getOrDefault(logDirAbsolutePath, false)) { + debug(s"Writing clean shutdown marker at $dir") + CoreUtils.swallow(Files.createFile(new File(dir, LogLoader.CleanShutdownFile).toPath), this) + } } } } finally { diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 824ce7ea3278..ee70a5189105 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -136,6 +136,59 @@ class LogManagerTest { } } + /* + * Test that LogManager.shutdown() doesn't create clean shutdown file for a log directory that has not completed + * recovery. + */ + @Test + def testCleanShutdownFileWhenShutdownCalledBeforeStartupComplete(): Unit = { + // 1. create two logs under logDir + val topicPartition0 = new TopicPartition(name, 0) + val topicPartition1 = new TopicPartition(name, 1) + val log0 = logManager.getOrCreateLog(topicPartition0, topicId = None) + val log1 = logManager.getOrCreateLog(topicPartition1, topicId = None) + val logFile0 = new File(logDir, name + "-0") + val logFile1 = new File(logDir, name + "-1") + assertTrue(logFile0.exists) + assertTrue(logFile1.exists) + + log0.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0) + log0.takeProducerSnapshot() + + log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0) + log1.takeProducerSnapshot() + + // 2. simulate unclean shutdown by deleting clean shutdown marker file + logManager.shutdown() + assertTrue(Files.deleteIfExists(new File(logDir, LogLoader.CleanShutdownFile).toPath)) + + // 3. create a new LogManager and start it in a different thread + @volatile var loadLogCalled = 0 + logManager = spy(createLogManager()) + doAnswer { invocation => + // intercept LogManager.loadLog to sleep 5 seconds so that there is enough time to call LogManager.shutdown + // before LogManager.startup completes. + Thread.sleep(5000) + invocation.callRealMethod().asInstanceOf[UnifiedLog] + loadLogCalled = loadLogCalled + 1 + }.when(logManager).loadLog(any[File], any[Boolean], any[Map[TopicPartition, Long]], any[Map[TopicPartition, Long]], + any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String, Int]]) + + val t = new Thread() { + override def run(): Unit = { logManager.startup(Set.empty) } + } + t.start() + + // 4. shutdown LogManager after the first log is loaded but before the second log is loaded + TestUtils.waitUntilTrue(() => loadLogCalled == 1, + "Timed out waiting for only the first log to be loaded") + logManager.shutdown() + logManager = null + + // 5. verify that CleanShutdownFile is not created under logDir + assertFalse(Files.exists(new File(logDir, LogLoader.CleanShutdownFile).toPath)) + } + /** * Test that getOrCreateLog on a non-existent log creates a new log and that we can append to the new log. * The LogManager is configured with one invalid log directory which should be marked as offline.