Skip to content

Commit

Permalink
KAFKA-15375: fix broken clean shutdown detection logic in LogManager
Browse files Browse the repository at this point in the history
When running in kraft mode, LogManager.startup is called in a different thread than the main broker (#14239)
startup thread (by BrokerMetadataPublisher when the first metadata update is received.) If a fatal
error happens during broker startup, before LogManager.startup is completed, LogManager.shutdown may
 mark log dirs as clean shutdown improperly.

This PR includes following change:
1. During LogManager startup time:
  - track hadCleanShutdwon info for each log dir
  - track loadLogsCompleted status for each log dir
2. During LogManager shutdown time:
  - do not write clean shutdown marker file for log dirs which have hadCleanShutdown==false and loadLogsCompleted==false

Reviewers: Colin P. McCabe <cmccabe@apache.org>
  • Loading branch information
vincent81jiang authored and cmccabe committed Aug 30, 2023
1 parent 67c3d96 commit daea0fd
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 5 deletions.
29 changes: 24 additions & 5 deletions core/src/main/scala/kafka/log/LogManager.scala
Expand Up @@ -130,6 +130,12 @@ class LogManager(logDirs: Seq[File],
logDirsSet
}

// A map that stores hadCleanShutdown flag for each log dir.
private val hadCleanShutdownFlags = new ConcurrentHashMap[String, Boolean]()

// A map that tells whether all logs in a log dir had been loaded or not at startup time.
private val loadLogsCompletedFlags = new ConcurrentHashMap[String, Boolean]()

@volatile private var _cleaner: LogCleaner = _
private[kafka] def cleaner: LogCleaner = _cleaner

Expand Down Expand Up @@ -372,6 +378,7 @@ class LogManager(logDirs: Seq[File],
Files.deleteIfExists(cleanShutdownFile.toPath)
hadCleanShutdown = true
}
hadCleanShutdownFlags.put(logDirAbsolutePath, hadCleanShutdown)

var recoveryPoints = Map[TopicPartition, Long]()
try {
Expand All @@ -398,7 +405,8 @@ class LogManager(logDirs: Seq[File],
!logDir.getName.equals(RemoteIndexCache.DIR_NAME) &&
UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
numTotalLogs += logsToLoad.length
numRemainingLogs.put(dir.getAbsolutePath, logsToLoad.length)
numRemainingLogs.put(logDirAbsolutePath, logsToLoad.length)
loadLogsCompletedFlags.put(logDirAbsolutePath, logsToLoad.isEmpty)

if (logsToLoad.isEmpty) {
info(s"No logs found to be loaded in $logDirAbsolutePath")
Expand Down Expand Up @@ -427,14 +435,19 @@ class LogManager(logDirs: Seq[File],
// And while converting IOException to KafkaStorageException, we've already handled the exception. So we can ignore it here.
} finally {
val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
val remainingLogs = decNumRemainingLogs(numRemainingLogs, dir.getAbsolutePath)
val remainingLogs = decNumRemainingLogs(numRemainingLogs, logDirAbsolutePath)
val currentNumLoaded = logsToLoad.length - remainingLogs
log match {
case Some(loadedLog) => info(s"Completed load of $loadedLog with ${loadedLog.numberOfSegments} segments, " +
s"local-log-start-offset ${loadedLog.localLogStartOffset()} and log-end-offset ${loadedLog.logEndOffset} in ${logLoadDurationMs}ms " +
s"($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)")
case None => info(s"Error while loading logs in $logDir in ${logLoadDurationMs}ms ($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)")
}

if (remainingLogs == 0) {
// loadLog is completed for all logs under the logDdir, mark it.
loadLogsCompletedFlags.put(logDirAbsolutePath, true)
}
}
}
runnable
Expand Down Expand Up @@ -628,9 +641,15 @@ class LogManager(logDirs: Seq[File],
debug(s"Updating log start offsets at $dir")
checkpointLogStartOffsetsInDir(dir, logs)

// mark that the shutdown was clean by creating marker file
debug(s"Writing clean shutdown marker at $dir")
CoreUtils.swallow(Files.createFile(new File(dir, LogLoader.CleanShutdownFile).toPath), this)
// mark that the shutdown was clean by creating marker file for log dirs that:
// 1. had clean shutdown marker file; or
// 2. had no clean shutdown marker file, but all logs under it have been recovered at startup time
val logDirAbsolutePath = dir.getAbsolutePath
if (hadCleanShutdownFlags.getOrDefault(logDirAbsolutePath, false) ||
loadLogsCompletedFlags.getOrDefault(logDirAbsolutePath, false)) {
debug(s"Writing clean shutdown marker at $dir")
CoreUtils.swallow(Files.createFile(new File(dir, LogLoader.CleanShutdownFile).toPath), this)
}
}
}
} finally {
Expand Down
53 changes: 53 additions & 0 deletions core/src/test/scala/unit/kafka/log/LogManagerTest.scala
Expand Up @@ -136,6 +136,59 @@ class LogManagerTest {
}
}

/*
* Test that LogManager.shutdown() doesn't create clean shutdown file for a log directory that has not completed
* recovery.
*/
@Test
def testCleanShutdownFileWhenShutdownCalledBeforeStartupComplete(): Unit = {
// 1. create two logs under logDir
val topicPartition0 = new TopicPartition(name, 0)
val topicPartition1 = new TopicPartition(name, 1)
val log0 = logManager.getOrCreateLog(topicPartition0, topicId = None)
val log1 = logManager.getOrCreateLog(topicPartition1, topicId = None)
val logFile0 = new File(logDir, name + "-0")
val logFile1 = new File(logDir, name + "-1")
assertTrue(logFile0.exists)
assertTrue(logFile1.exists)

log0.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
log0.takeProducerSnapshot()

log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
log1.takeProducerSnapshot()

// 2. simulate unclean shutdown by deleting clean shutdown marker file
logManager.shutdown()
assertTrue(Files.deleteIfExists(new File(logDir, LogLoader.CleanShutdownFile).toPath))

// 3. create a new LogManager and start it in a different thread
@volatile var loadLogCalled = 0
logManager = spy(createLogManager())
doAnswer { invocation =>
// intercept LogManager.loadLog to sleep 5 seconds so that there is enough time to call LogManager.shutdown
// before LogManager.startup completes.
Thread.sleep(5000)
invocation.callRealMethod().asInstanceOf[UnifiedLog]
loadLogCalled = loadLogCalled + 1
}.when(logManager).loadLog(any[File], any[Boolean], any[Map[TopicPartition, Long]], any[Map[TopicPartition, Long]],
any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String, Int]])

val t = new Thread() {
override def run(): Unit = { logManager.startup(Set.empty) }
}
t.start()

// 4. shutdown LogManager after the first log is loaded but before the second log is loaded
TestUtils.waitUntilTrue(() => loadLogCalled == 1,
"Timed out waiting for only the first log to be loaded")
logManager.shutdown()
logManager = null

// 5. verify that CleanShutdownFile is not created under logDir
assertFalse(Files.exists(new File(logDir, LogLoader.CleanShutdownFile).toPath))
}

/**
* Test that getOrCreateLog on a non-existent log creates a new log and that we can append to the new log.
* The LogManager is configured with one invalid log directory which should be marked as offline.
Expand Down

0 comments on commit daea0fd

Please sign in to comment.