From e9e4ed83149bf6c18b32d7ce5a2aebbff3522742 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 5 Oct 2017 14:26:04 +0100 Subject: [PATCH 01/10] Several clean ups in Log, LogManager, etc. --- .../consumer/ConsumerFetcherThread.scala | 3 +- core/src/main/scala/kafka/log/Log.scala | 53 ++++++------ .../src/main/scala/kafka/log/LogCleaner.scala | 2 +- .../scala/kafka/log/LogCleanerManager.scala | 9 +- .../src/main/scala/kafka/log/LogManager.scala | 83 ++++++++++--------- .../kafka/log/ProducerStateManager.scala | 33 ++++---- .../kafka/server/AbstractFetcherThread.scala | 4 +- .../main/scala/kafka/server/KafkaApis.scala | 2 +- .../kafka/server/ReplicaFetcherThread.scala | 1 + .../scala/kafka/server/ReplicaManager.scala | 2 +- .../checkpoints/OffsetCheckpointFile.scala | 6 +- .../scala/kafka/tools/DumpLogSegments.scala | 4 +- .../test/scala/unit/kafka/log/LogTest.scala | 34 ++++---- .../kafka/log/ProducerStateManagerTest.scala | 2 +- .../server/AbstractFetcherThreadTest.scala | 1 + 15 files changed, 122 insertions(+), 117 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index d5e084e914116..705dc249bf302 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -20,7 +20,8 @@ package kafka.consumer import kafka.api.{FetchRequestBuilder, FetchResponsePartitionData, OffsetRequest, Request} import kafka.cluster.BrokerEndPoint import kafka.message.ByteBufferMessageSet -import kafka.server.{AbstractFetcherThread, PartitionFetchState, ResultWithPartitions} +import kafka.server.{AbstractFetcherThread, PartitionFetchState} +import AbstractFetcherThread.ResultWithPartitions import kafka.common.{ErrorMapping, TopicAndPartition} import scala.collection.Map diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index d397ca6e7dc23..a8640eb99cef0 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -256,10 +256,10 @@ class Log(@volatile var dir: File, var swapFiles = Set[File]() for (file <- dir.listFiles if file.isFile) { - if(!file.canRead) + if (!file.canRead) throw new IOException("Could not read file " + file) val filename = file.getName - if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) { + if (filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) { // if the file ends in .deleted or .cleaned, delete it Files.deleteIfExists(file.toPath) } else if(filename.endsWith(SwapFileSuffix)) { @@ -271,7 +271,7 @@ class Log(@volatile var dir: File, Files.deleteIfExists(file.toPath) } else if (isLogFile(baseFile)) { // delete the index files - val offset = offsetFromFilename(baseFile.getName) + val offset = offsetFromFile(baseFile) Files.deleteIfExists(Log.offsetIndexFile(dir, offset).toPath) Files.deleteIfExists(Log.timeIndexFile(dir, offset).toPath) Files.deleteIfExists(Log.transactionIndexFile(dir, offset).toPath) @@ -287,10 +287,9 @@ class Log(@volatile var dir: File, // load segments in ascending order because transactional data from one segment may depend on the // segments that come before it for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) { - val filename = file.getName if (isIndexFile(file)) { // if it is an index file, make sure it has a corresponding .log file - val offset = offsetFromFilename(filename) + val offset = offsetFromFile(file) val logFile = Log.logFile(dir, offset) if (!logFile.exists) { warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath)) @@ -298,7 +297,7 @@ class Log(@volatile var dir: File, } } else if (isLogFile(file)) { // if it's a log file, load the corresponding log segment - val startOffset = offsetFromFilename(filename) + val startOffset = offsetFromFile(file) val indexFile = Log.offsetIndexFile(dir, startOffset) val timeIndexFile = Log.timeIndexFile(dir, startOffset) val txnIndexFile = Log.transactionIndexFile(dir, startOffset) @@ -361,8 +360,7 @@ class Log(@volatile var dir: File, private def completeSwapOperations(swapFiles: Set[File]): Unit = { for (swapFile <- swapFiles) { val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, "")) - val filename = logFile.getName - val startOffset = offsetFromFilename(filename) + val startOffset = offsetFromFile(logFile) val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix) val index = new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize) val timeIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TimeIndexFileSuffix) + SwapFileSuffix) @@ -425,14 +423,14 @@ class Log(@volatile var dir: File, // This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded private def recoverLog() { // if we have the clean shutdown marker, skip recovery - if(hasCleanShutdownFile) { + if (hasCleanShutdownFile) { this.recoveryPoint = activeSegment.nextOffset() return } // okay we need to actually recovery this log val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator - while(unflushed.hasNext) { + while (unflushed.hasNext) { val segment = unflushed.next info("Recovering unflushed segment %d in log %s.".format(segment.baseOffset, name)) val truncatedBytes = @@ -445,7 +443,7 @@ class Log(@volatile var dir: File, "creating an empty one with starting offset " + startOffset) segment.truncateTo(startOffset) } - if(truncatedBytes > 0) { + if (truncatedBytes > 0) { // we had an invalid message, delete all remaining log warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(segment.baseOffset, name, segment.nextOffset())) @@ -1281,15 +1279,12 @@ class Log(@volatile var dir: File, file.delete() } - segments.lastEntry() match { - case null => - case entry => { - val seg = entry.getValue - seg.onBecomeInactiveSegment() - seg.index.trimToValidSize() - seg.timeIndex.trimToValidSize() - seg.log.trim() - } + Option(segments.lastEntry).foreach { entry => + val seg = entry.getValue + seg.onBecomeInactiveSegment() + seg.index.trimToValidSize() + seg.timeIndex.trimToValidSize() + seg.log.trim() } // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot @@ -1638,7 +1633,7 @@ object Log { /** a time index file */ val TimeIndexFileSuffix = ".timeindex" - val PidSnapshotFileSuffix = ".snapshot" + val ProducerIdSnapshotFileSuffix = ".snapshot" /** an (aborted) txn index */ val TxnIndexFileSuffix = ".txnindex" @@ -1704,7 +1699,7 @@ object Log { * @param dir The directory in which the log will reside * @param offset The base offset of the log file */ - def logFile(dir: File, offset: Long) = + def logFile(dir: File, offset: Long): File = new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix) /** @@ -1722,7 +1717,7 @@ object Log { * @param dir The directory in which the log will reside * @param offset The base offset of the log file */ - def offsetIndexFile(dir: File, offset: Long) = + def offsetIndexFile(dir: File, offset: Long): File = new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix) /** @@ -1731,7 +1726,7 @@ object Log { * @param dir The directory in which the log will reside * @param offset The base offset of the log file */ - def timeIndexFile(dir: File, offset: Long) = + def timeIndexFile(dir: File, offset: Long): File = new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix) /** @@ -1740,14 +1735,16 @@ object Log { * @param dir The directory in which the log will reside * @param offset The last offset (exclusive) included in the snapshot */ - def producerSnapshotFile(dir: File, offset: Long) = - new File(dir, filenamePrefixFromOffset(offset) + PidSnapshotFileSuffix) + def producerSnapshotFile(dir: File, offset: Long): File = + new File(dir, filenamePrefixFromOffset(offset) + ProducerIdSnapshotFileSuffix) - def transactionIndexFile(dir: File, offset: Long) = + def transactionIndexFile(dir: File, offset: Long): File = new File(dir, filenamePrefixFromOffset(offset) + TxnIndexFileSuffix) - def offsetFromFilename(filename: String): Long = + def offsetFromFile(file: File): Long = { + val filename = file.getName filename.substring(0, filename.indexOf('.')).toLong + } /** * Calculate a log's size (in bytes) based on its log segments diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index d45984bfc3fe1..217c49e70eb55 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -87,7 +87,7 @@ import scala.collection.JavaConverters._ * @param time A way to control the passage of time */ class LogCleaner(val config: CleanerConfig, - val logDirs: Array[File], + val logDirs: Seq[File], val logs: Pool[TopicPartition, Log], val logDirFailureChannel: LogDirFailureChannel, time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup { diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index af8707c60c8b4..e8fe0932ac5ab 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -47,7 +47,7 @@ private[log] case object LogCleaningPaused extends LogCleaningState * While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is * requested to be resumed. */ -private[log] class LogCleanerManager(val logDirs: Array[File], +private[log] class LogCleanerManager(val logDirs: Seq[File], val logs: Pool[TopicPartition, Log], val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { @@ -59,7 +59,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint" /* the offset checkpoints holding the last cleaned point for each log */ - @volatile private var checkpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, offsetCheckpointFile), logDirFailureChannel))).toMap + @volatile private var checkpoints = logDirs.map(dir => + (dir, new OffsetCheckpointFile(new File(dir, offsetCheckpointFile), logDirFailureChannel))).toMap /* the set of logs currently being cleaned */ private val inProgress = mutable.HashMap[TopicPartition, LogCleaningState]() @@ -88,7 +89,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], checkpoint.read() } catch { case e: KafkaStorageException => - error(s"Failed to access checkpoint file ${checkpoint.f.getName} in dir ${checkpoint.f.getParentFile.getAbsolutePath}", e) + error(s"Failed to access checkpoint file ${checkpoint.file.getName} in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e) Map.empty[TopicPartition, Long] } }).toMap @@ -239,7 +240,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], checkpoint.write(existing) } catch { case e: KafkaStorageException => - error(s"Failed to access checkpoint file ${checkpoint.f.getName} in dir ${checkpoint.f.getParentFile.getAbsolutePath}", e) + error(s"Failed to access checkpoint file ${checkpoint.file.getName} in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e) } } } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 102b1e52e0dd0..3615565ee17f6 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -47,8 +47,8 @@ import scala.collection.mutable.ArrayBuffer * A background thread handles log retention by periodically truncating excess log segments. */ @threadsafe -class LogManager(logDirs: Array[File], - initialOfflineDirs: Array[File], +class LogManager(logDirs: Seq[File], + initialOfflineDirs: Seq[File], val topicConfigs: Map[String, LogConfig], // note that this doesn't get updated after creation val defaultConfig: LogConfig, val cleanerConfig: CleanerConfig, @@ -74,11 +74,11 @@ class LogManager(logDirs: Array[File], private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs) - def liveLogDirs: Array[File] = { + def liveLogDirs: Seq[File] = { if (_liveLogDirs.size == logDirs.size) logDirs else - _liveLogDirs.asScala.toArray + _liveLogDirs.asScala.toBuffer } private val dirLocks = lockLogDirs(liveLogDirs) @@ -87,9 +87,14 @@ class LogManager(logDirs: Array[File], @volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap - private def offlineLogDirs = logDirs.filterNot(_liveLogDirs.contains) private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]() + private def offlineLogDirs: Iterable[File] = { + val logDirsSet = mutable.Set[File](logDirs: _*) + _liveLogDirs.asScala.foreach(logDirsSet -=) + logDirsSet + } + loadLogs() @@ -103,7 +108,7 @@ class LogManager(logDirs: Array[File], val offlineLogDirectoryCount = newGauge( "OfflineLogDirectoryCount", new Gauge[Int] { - def value = offlineLogDirs.length + def value = offlineLogDirs.size } ) @@ -168,20 +173,22 @@ class LogManager(logDirs: Array[File], Exit.halt(1) } - recoveryPointCheckpoints = recoveryPointCheckpoints.filterKeys(file => file.getAbsolutePath != dir) - logStartOffsetCheckpoints = logStartOffsetCheckpoints.filterKeys(file => file.getAbsolutePath != dir) + recoveryPointCheckpoints = recoveryPointCheckpoints.filter { case (file, _) => file.getAbsolutePath != dir } + logStartOffsetCheckpoints = logStartOffsetCheckpoints.filter { case (file, _) => file.getAbsolutePath != dir } if (cleaner != null) cleaner.handleLogDirFailure(dir) - val offlineTopicPartitions = logs.filter { case (tp, log) => log.dir.getParent == dir}.map { case (tp, log) => tp } + val offlineTopicPartitions = logs.collect { + case (tp, log) if log.dir.getParent == dir => tp + } - offlineTopicPartitions.foreach(topicPartition => { + offlineTopicPartitions.foreach { topicPartition => val removedLog = logs.remove(topicPartition) if (removedLog != null) { removedLog.closeHandlers() removedLog.removeLogMetrics() } - }) + } info(s"Partitions ${offlineTopicPartitions.mkString(",")} are offline due to failure on log directory $dir") dirLocks.filter(_.file.getParent == dir).foreach(dir => CoreUtils.swallow(dir.destroy())) } @@ -206,7 +213,7 @@ class LogManager(logDirs: Array[File], } } - private def loadLogs(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = { + private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = { debug("Loading log '" + logDir.getName + "'") val topicPartition = Log.parseTopicPartitionName(logDir) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) @@ -255,10 +262,7 @@ class LogManager(logDirs: Array[File], val cleanShutdownFile = new File(dir, Log.CleanShutdownFile) if (cleanShutdownFile.exists) { - debug( - "Found clean shutdown file. " + - "Skipping recovery for all logs in data directory: " + - dir.getAbsolutePath) + debug(s"Found clean shutdown file. Skipping recovery for all logs in data directory: ${dir.getAbsolutePath}") } else { // log recovery itself is being performed by `Log` class during initialization brokerState.newState(RecoveringFromUncleanShutdown) @@ -287,7 +291,7 @@ class LogManager(logDirs: Array[File], } yield { CoreUtils.runnable { try { - loadLogs(logDir, recoveryPoints, logStartOffsets) + loadLog(logDir, recoveryPoints, logStartOffsets) } catch { case e: IOException => offlineDirs.append((dir.getAbsolutePath, e)) @@ -318,10 +322,9 @@ class LogManager(logDirs: Array[File], logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while deleting the clean shutdown file in dir $dir", e) } } catch { - case e: ExecutionException => { + case e: ExecutionException => error("There was an error in one of the threads during logs loading: " + e.getCause) throw e.getCause - } } finally { threadPools.foreach(_.shutdown()) } @@ -334,7 +337,7 @@ class LogManager(logDirs: Array[File], */ def startup() { /* Schedule the cleanup task to delete old logs */ - if(scheduler != null) { + if (scheduler != null) { info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs)) scheduler.schedule("kafka-log-retention", cleanupLogs _, @@ -363,7 +366,7 @@ class LogManager(logDirs: Array[File], period = defaultConfig.fileDeleteDelayMs, TimeUnit.MILLISECONDS) } - if(cleanerConfig.enableCleaner) + if (cleanerConfig.enableCleaner) cleaner.startup() } @@ -506,13 +509,16 @@ class LogManager(logDirs: Array[File], * Make a checkpoint for all logs in provided directory. */ private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = { - val recoveryPoints = this.logsByDir.get(dir.toString) - if (recoveryPoints.isDefined) { + for { + partitionToLog <- logsByDir.get(dir.getAbsolutePath) + checkpoint <- recoveryPointCheckpoints.get(dir) + } { try { - this.recoveryPointCheckpoints.get(dir).foreach(_.write(recoveryPoints.get.mapValues(_.recoveryPoint))) + checkpoint.write(partitionToLog.mapValues(_.recoveryPoint)) } catch { case e: IOException => - logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point file in directory $dir", e) + logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " + + s"file in directory $dir", e) } } } @@ -521,12 +527,15 @@ class LogManager(logDirs: Array[File], * Checkpoint log start offset for all logs in provided directory. */ private def checkpointLogStartOffsetsInDir(dir: File): Unit = { - val logs = this.logsByDir.get(dir.getAbsolutePath) - if (logs.isDefined) { + for { + partitionToLog <- logsByDir.get(dir.getAbsolutePath) + checkpoint <- logStartOffsetCheckpoints.get(dir) + } { try { - this.logStartOffsetCheckpoints.get(dir).foreach(_.write( - logs.get.filter { case (tp, log) => log.logStartOffset > log.logSegments.head.baseOffset }.mapValues(_.logStartOffset) - )) + val logStartOffsets = partitionToLog.filter { case (_, log) => + log.logStartOffset > log.logSegments.head.baseOffset + }.mapValues(_.logStartOffset) + checkpoint.write(logStartOffsets) } catch { case e: IOException => logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to logStartOffset file in directory $dir", e) @@ -713,7 +722,7 @@ class LogManager(logDirs: Array[File], /** * Get all the partition logs */ - def allLogs(): Iterable[Log] = logs.values + def allLogs: Iterable[Log] = logs.values /** * Get a map of TopicPartition => Log @@ -723,10 +732,8 @@ class LogManager(logDirs: Array[File], /** * Map of log dir to logs by topic and partitions in that dir */ - private def logsByDir = { - this.logsByTopicPartition.groupBy { - case (_, log) => log.dir.getParent - } + private def logsByDir: Map[String, Map[TopicPartition, Log]] = { + this.logsByTopicPartition.groupBy { case (_, log) => log.dir.getParent } } // logDir should be an absolute path @@ -741,7 +748,7 @@ class LogManager(logDirs: Array[File], /** * Flush any log which has exceeded its flush interval and has unwritten messages. */ - private def flushDirtyLogs() = { + private def flushDirtyLogs(): Unit = { debug("Checking for dirty logs to flush...") for ((topicPartition, log) <- logs) { @@ -788,8 +795,8 @@ object LogManager { backOffMs = config.logCleanerBackoffMs, enableCleaner = config.logCleanerEnable) - new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile).toArray, - initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile).toArray, + new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile), + initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile), topicConfigs = topicConfigs, defaultConfig = defaultLogConfig, cleanerConfig = cleanerConfig, diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 81726c14458d1..0ffb0d1b63efb 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import java.nio.file.Files import kafka.common.KafkaException -import kafka.log.Log.offsetFromFilename +import kafka.log.Log.offsetFromFile import kafka.server.LogOffsetMetadata import kafka.utils.{Logging, nonthreadsafe, threadsafe} import org.apache.kafka.common.TopicPartition @@ -403,7 +403,7 @@ object ProducerStateManager { } } - private def isSnapshotFile(name: String): Boolean = name.endsWith(Log.PidSnapshotFileSuffix) + private def isSnapshotFile(file: File): Boolean = file.getName.endsWith(Log.ProducerIdSnapshotFileSuffix) } @@ -430,7 +430,6 @@ class ProducerStateManager(val topicPartition: TopicPartition, import ProducerStateManager._ import java.util - private val validateSequenceNumbers = topicPartition.topic != Topic.GROUP_METADATA_TOPIC_NAME private val producers = mutable.Map.empty[Long, ProducerIdEntry] private var lastMapOffset = 0L private var lastSnapOffset = 0L @@ -495,7 +494,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, isProducerRetained(producerEntry, logStartOffset) && !isProducerExpired(currentTime, producerEntry) } loadedProducers.foreach(loadProducerEntry) - lastSnapOffset = offsetFromFilename(file.getName) + lastSnapOffset = offsetFromFile(file) lastMapOffset = lastSnapOffset return } catch { @@ -611,12 +610,12 @@ class ProducerStateManager(val topicPartition: TopicPartition, /** * Get the last offset (exclusive) of the latest snapshot file. */ - def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => offsetFromFilename(file.getName)) + def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => offsetFromFile(file)) /** * Get the last offset (exclusive) of the oldest snapshot file. */ - def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFilename(file.getName)) + def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFile(file)) private def isProducerRetained(producerIdEntry: ProducerIdEntry, logStartOffset: Long): Boolean = { producerIdEntry.removeBatchesOlderThan(logStartOffset) @@ -700,21 +699,18 @@ class ProducerStateManager(val topicPartition: TopicPartition, deleteSnapshotFiles(_ < offset) } - private def listSnapshotFiles: List[File] = { + private def listSnapshotFiles: Seq[File] = { if (logDir.exists && logDir.isDirectory) { - val files = logDir.listFiles - if (files != null) - files.filter(f => f.isFile && isSnapshotFile(f.getName)).toList - else - List.empty[File] - } else - List.empty[File] + Option(logDir.listFiles).map { files => + files.filter(f => f.isFile && isSnapshotFile(f)).toSeq + }.getOrElse(Seq.empty) + } else Seq.empty } private def oldestSnapshotFile: Option[File] = { val files = listSnapshotFiles if (files.nonEmpty) - Some(files.minBy(file => offsetFromFilename(file.getName))) + Some(files.minBy(offsetFromFile)) else None } @@ -722,14 +718,15 @@ class ProducerStateManager(val topicPartition: TopicPartition, private def latestSnapshotFile: Option[File] = { val files = listSnapshotFiles if (files.nonEmpty) - Some(files.maxBy(file => offsetFromFilename(file.getName))) + Some(files.maxBy(offsetFromFile)) else None } private def deleteSnapshotFiles(predicate: Long => Boolean = _ => true) { - listSnapshotFiles.filter(file => predicate(offsetFromFilename(file.getName))) - .foreach(file => Files.deleteIfExists(file.toPath)) + listSnapshotFiles.filter(file => predicate(offsetFromFile(file))).foreach { file => + Files.deleteIfExists(file.toPath) + } } } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 5df67325bf92c..d6b9f1b9dc393 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -301,6 +301,8 @@ abstract class AbstractFetcherThread(name: String, object AbstractFetcherThread { + case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition]) + trait FetchRequest { def isEmpty: Boolean def offset(topicPartition: TopicPartition): Long @@ -418,5 +420,3 @@ case class PartitionFetchState(fetchOffset: Long, delay: DelayedItem, truncating override def toString = "offset:%d-isReadyForFetch:%b-isTruncatingLog:%b".format(fetchOffset, isReadyForFetch, truncatingLog) } - -case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition]) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c171aaa0f26d9..8799716db110b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1992,7 +1992,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorize(request.session, Describe, Resource.ClusterResource)) { val partitions = if (describeLogDirsDirRequest.isAllTopicPartitions) - replicaManager.logManager.allLogs().map(_.topicPartition).toSet + replicaManager.logManager.allLogs.map(_.topicPartition).toSet else describeLogDirsDirRequest.topicPartitions().asScala diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index d4221121b6e26..52c6de31f9195 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -19,6 +19,7 @@ package kafka.server import java.util +import AbstractFetcherThread.ResultWithPartitions import kafka.admin.AdminUtils import kafka.api.{FetchRequest => _, _} import kafka.cluster.{BrokerEndPoint, Replica} diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index f4f367293eed0..560fdd49eed17 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -611,7 +611,7 @@ class ReplicaManager(val config: KafkaConfig, * are included. There may be future logs (which will replace the current logs of the partition in the future) on the broker after KIP-113 is implemented. */ def describeLogDirs(partitions: Set[TopicPartition]): Map[String, LogDirInfo] = { - val logsByDir = logManager.allLogs().groupBy(log => log.dir.getParent) + val logsByDir = logManager.allLogs.groupBy(log => log.dir.getParent) config.logDirs.toSet.map { logDir: String => val absolutePath = new File(logDir).getAbsolutePath diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala index 9cd096369e2bc..2769cb4240a52 100644 --- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala +++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala @@ -52,9 +52,9 @@ trait OffsetCheckpoint { /** * This class persists a map of (Partition => Offsets) to a file (for a certain replica) */ -class OffsetCheckpointFile(val f: File, logDirFailureChannel: LogDirFailureChannel = null) { - val checkpoint = new CheckpointFile[(TopicPartition, Long)](f, OffsetCheckpointFile.CurrentVersion, - OffsetCheckpointFile.Formatter, logDirFailureChannel, f.getParent) +class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureChannel = null) { + val checkpoint = new CheckpointFile[(TopicPartition, Long)](file, OffsetCheckpointFile.CurrentVersion, + OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent) def write(offsets: Map[TopicPartition, Long]): Unit = checkpoint.write(offsets.toSeq) diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index a8a2ba71cf403..6f13f4f7d8fce 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -114,7 +114,7 @@ object DumpLogSegments { dumpIndex(file, indexSanityOnly, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize) case Log.TimeIndexFileSuffix => dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize) - case Log.PidSnapshotFileSuffix => + case Log.ProducerIdSnapshotFileSuffix => dumpProducerIdSnapshot(file) case Log.TxnIndexFileSuffix => dumpTxnIndex(file) @@ -145,7 +145,7 @@ object DumpLogSegments { } private def dumpTxnIndex(file: File): Unit = { - val index = new TransactionIndex(Log.offsetFromFilename(file.getName), file) + val index = new TransactionIndex(Log.offsetFromFile(file), file) for (abortedTxn <- index.allAbortedTxns) { println(s"version: ${abortedTxn.version} producerId: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " + s"lastOffset: ${abortedTxn.lastOffset} lastStableOffset: ${abortedTxn.lastStableOffset}") diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 6d40967fec34c..18d43d967f22e 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -69,20 +69,20 @@ class LogTest { } @Test - def testOffsetFromFilename() { + def testOffsetFromFile() { val offset = 23423423L val logFile = Log.logFile(tmpDir, offset) - assertEquals(offset, Log.offsetFromFilename(logFile.getName)) + assertEquals(offset, Log.offsetFromFile(logFile)) val offsetIndexFile = Log.offsetIndexFile(tmpDir, offset) - assertEquals(offset, Log.offsetFromFilename(offsetIndexFile.getName)) + assertEquals(offset, Log.offsetFromFile(offsetIndexFile)) val timeIndexFile = Log.timeIndexFile(tmpDir, offset) - assertEquals(offset, Log.offsetFromFilename(timeIndexFile.getName)) + assertEquals(offset, Log.offsetFromFile(timeIndexFile)) val snapshotFile = Log.producerSnapshotFile(tmpDir, offset) - assertEquals(offset, Log.offsetFromFilename(snapshotFile.getName)) + assertEquals(offset, Log.offsetFromFile(snapshotFile)) } /** @@ -167,7 +167,7 @@ class LogTest { assertTrue(log.logSegments.size >= 2) log.close() - logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach { file => + logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.ProducerIdSnapshotFileSuffix)).foreach { file => Files.delete(file.toPath) } @@ -192,7 +192,7 @@ class LogTest { } @Test - def testPidMapOffsetUpdatedForNonIdempotentData() { + def testProducerIdMapOffsetUpdatedForNonIdempotentData() { val logConfig = createLogConfig(segmentBytes = 2048 * 5) val log = createLog(logDir, logConfig) val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes))) @@ -383,7 +383,7 @@ class LogTest { } @Test - def testRebuildPidMapWithCompactedData() { + def testRebuildProducerIdMapWithCompactedData() { val logConfig = createLogConfig(segmentBytes = 2048 * 5) val log = createLog(logDir, logConfig) val pid = 1L @@ -467,7 +467,7 @@ class LogTest { } @Test - def testUpdatePidMapWithCompactedData() { + def testUpdateProducerIdMapWithCompactedData() { val logConfig = createLogConfig(segmentBytes = 2048 * 5) val log = createLog(logDir, logConfig) val pid = 1L @@ -500,7 +500,7 @@ class LogTest { } @Test - def testPidMapTruncateTo() { + def testProducerIdMapTruncateTo() { val logConfig = createLogConfig(segmentBytes = 2048 * 5) val log = createLog(logDir, logConfig) log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes))), leaderEpoch = 0) @@ -520,7 +520,7 @@ class LogTest { } @Test - def testPidMapTruncateToWithNoSnapshots() { + def testProducerIdMapTruncateToWithNoSnapshots() { // This ensures that the upgrade optimization path cannot be hit after initial loading val logConfig = createLogConfig(segmentBytes = 2048 * 5) val log = createLog(logDir, logConfig) @@ -533,7 +533,7 @@ class LogTest { producerEpoch = epoch, sequence = 1), leaderEpoch = 0) // Delete all snapshots prior to truncating - logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach { file => + logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.ProducerIdSnapshotFileSuffix)).foreach { file => Files.delete(file.toPath) } @@ -612,7 +612,7 @@ class LogTest { } @Test - def testPidMapTruncateFullyAndStartAt() { + def testProducerIdMapTruncateFullyAndStartAt() { val records = TestUtils.singletonRecords("foo".getBytes) val logConfig = createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2) val log = createLog(logDir, logConfig) @@ -634,7 +634,7 @@ class LogTest { } @Test - def testPidExpirationOnSegmentDeletion() { + def testProducerIdExpirationOnSegmentDeletion() { val pid1 = 1L val records = TestUtils.records(Seq(new SimpleRecord("foo".getBytes)), producerId = pid1, producerEpoch = 0, sequence = 0) val logConfig = createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2) @@ -729,7 +729,7 @@ class LogTest { } @Test - def testPeriodicPidExpiration() { + def testPeriodicProducerIdExpiration() { val maxProducerIdExpirationMs = 200 val producerIdExpirationCheckIntervalMs = 100 @@ -824,7 +824,7 @@ class LogTest { } @Test - def testMultiplePidsPerMemoryRecord() : Unit = { + def testMultipleProducerIdsPerMemoryRecord() : Unit = { // create a log val log = createLog(logDir, LogConfig()) @@ -2585,7 +2585,7 @@ class LogTest { appendEndTxnMarkerAsLeader(log, pid4, epoch, ControlRecordType.COMMIT) // 90 // delete all snapshot files - logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach { file => + logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.ProducerIdSnapshotFileSuffix)).foreach { file => Files.delete(file.toPath) } diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 865062455ce03..67b1b15f2dc31 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -741,6 +741,6 @@ class ProducerStateManagerTest extends JUnitSuite { } private def currentSnapshotOffsets = - logDir.listFiles().map(file => Log.offsetFromFilename(file.getName)).toSet + logDir.listFiles.map(Log.offsetFromFile).toSet } diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index fc49d8c40f228..b95f66cba55ca 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -17,6 +17,7 @@ package kafka.server +import AbstractFetcherThread._ import com.yammer.metrics.Metrics import kafka.cluster.BrokerEndPoint import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData} From a7bc0f2fe4888dccfec0a7620392ce46527944be Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 5 Oct 2017 14:57:07 +0100 Subject: [PATCH 02/10] KAFKA-5829: Only delete producer snapshots before the recovery point --- core/src/main/scala/kafka/log/Log.scala | 40 ++++++++++++------- .../src/main/scala/kafka/log/LogManager.scala | 1 + .../test/scala/unit/kafka/log/LogTest.scala | 8 ++-- 3 files changed, 31 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index a8640eb99cef0..bd941bad93497 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -474,7 +474,7 @@ class Log(@volatile var dir: File, // To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the // last two segments and the last offset. This should avoid the full scan in the case that the log needs // truncation. - val nextLatestSegmentBaseOffset = Option(segments.lowerEntry(activeSegment.baseOffset)).map(_.getValue.baseOffset) + val nextLatestSegmentBaseOffset = lowerSegment(activeSegment.baseOffset).map(_.baseOffset) val offsetsToSnapshot = Seq(nextLatestSegmentBaseOffset, Some(activeSegment.baseOffset), Some(lastOffset)) offsetsToSnapshot.flatten.foreach { offset => producerStateManager.updateMapEndOffset(offset) @@ -1344,11 +1344,6 @@ class Log(@volatile var dir: File, for (segment <- logSegments(this.recoveryPoint, offset)) segment.flush() - // now that we have flushed, we can cleanup old producer snapshots. However, it is useful to retain - // the snapshots from the recent segments in case we need to truncate and rebuild the producer state. - // Otherwise, we would always need to rebuild from the earliest segment. - producerStateManager.deleteSnapshotsBefore(minSnapshotOffsetToRetain(offset)) - lock synchronized { if (offset > this.recoveryPoint) { this.recoveryPoint = offset @@ -1358,17 +1353,32 @@ class Log(@volatile var dir: File, } } - def minSnapshotOffsetToRetain(flushedOffset: Long) = { - // always retain the producer snapshot from the last two segments. This solves the common case - // of truncating to an offset within the active segment, and the rarer case of truncating to the - // previous segment just after rolling the new segment. - var minSnapshotOffset = activeSegment.baseOffset - val previousSegment = segments.lowerEntry(activeSegment.baseOffset) - if (previousSegment != null) - minSnapshotOffset = previousSegment.getValue.baseOffset - math.min(flushedOffset, minSnapshotOffset) + /** + * Cleanup old producer snapshots after the recovery point is checkpointed. It is useful to retain + * the snapshots from the recent segments in case we need to truncate and rebuild the producer state. + * Otherwise, we would always need to rebuild from the earliest segment. + * + * More specifically: + * + * 1. We always retain the producer snapshot from the last two segments. This solves the common case + * of truncating to an offset within the active segment, and the rarer case of truncating to the previous segment. + * + * 2. We only delete snapshots for offsets less than the recovery point. The recovery point is checkpointed + * periodically and it can be behind after a hard shutdown. Since recovery starts from the recovery point, the logic + * of rebuilding the producer snapshots in one pass and without loading older segments is simpler if we always + * have a producer snapshot for all segments being recovered. + */ + def deleteSnapshotsAfterRecoveryPointCheckpoint(): Unit = { + val twoSegmentsMinOffset = lowerSegment(activeSegment.baseOffset).getOrElse(activeSegment).baseOffset + // Prefer segment base offset + val recoveryPointOffset = lowerSegment(recoveryPoint).map(_.baseOffset).getOrElse(recoveryPoint) + val minOffsetToRetain = math.min(recoveryPointOffset, twoSegmentsMinOffset) + producerStateManager.deleteSnapshotsBefore(minOffsetToRetain) } + private def lowerSegment(offset: Long): Option[LogSegment] = + Option(segments.lowerEntry(offset)).map(_.getValue) + /** * Completely delete this log directory and all contents from the file system with no delay */ diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 3615565ee17f6..aaa228d9cb2f4 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -515,6 +515,7 @@ class LogManager(logDirs: Seq[File], } { try { checkpoint.write(partitionToLog.mapValues(_.recoveryPoint)) + logs.values.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint()) } catch { case e: IOException => logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " + diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 18d43d967f22e..82d9d8ae7e347 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -677,9 +677,11 @@ class LogTest { log.roll(3L) assertEquals(Some(3L), log.latestProducerSnapshotOffset) - // roll triggers a flush at the starting offset of the new segment. we should - // retain the snapshots from the active segment and the previous segment, but - // the oldest one should be gone + // roll triggers a flush at the starting offset of the new segment, we should retain all snapshots + assertEquals(Some(1L), log.oldestProducerSnapshotOffset) + + // retain the snapshots from the active segment and the previous segment, delete the oldest one + log.deleteSnapshotsAfterRecoveryPointCheckpoint() assertEquals(Some(2L), log.oldestProducerSnapshotOffset) // even if we flush within the active segment, the snapshot should remain From 13bcf0095e37036a5d9a2daded65270fe66456a6 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 6 Oct 2017 00:13:56 +0100 Subject: [PATCH 03/10] Update recovery point after dirty recovery --- core/src/main/scala/kafka/log/Log.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index bd941bad93497..5c2663bc8b719 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -424,7 +424,7 @@ class Log(@volatile var dir: File, private def recoverLog() { // if we have the clean shutdown marker, skip recovery if (hasCleanShutdownFile) { - this.recoveryPoint = activeSegment.nextOffset() + recoveryPoint = activeSegment.nextOffset return } @@ -450,6 +450,8 @@ class Log(@volatile var dir: File, unflushed.foreach(deleteSegment) } } + + recoveryPoint = activeSegment.nextOffset } private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized { From 4f6c165afa16dd8f867d5df2269efa554b057add Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 6 Oct 2017 00:17:51 +0100 Subject: [PATCH 04/10] ProducerIdSnapshotFileSuffix -> ProducerSnapshotFileSuffix --- core/src/main/scala/kafka/log/Log.scala | 4 ++-- core/src/main/scala/kafka/log/ProducerStateManager.scala | 2 +- core/src/main/scala/kafka/tools/DumpLogSegments.scala | 2 +- core/src/test/scala/unit/kafka/log/LogTest.scala | 6 +++--- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 5c2663bc8b719..8d9e6b9d66085 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1645,7 +1645,7 @@ object Log { /** a time index file */ val TimeIndexFileSuffix = ".timeindex" - val ProducerIdSnapshotFileSuffix = ".snapshot" + val ProducerSnapshotFileSuffix = ".snapshot" /** an (aborted) txn index */ val TxnIndexFileSuffix = ".txnindex" @@ -1748,7 +1748,7 @@ object Log { * @param offset The last offset (exclusive) included in the snapshot */ def producerSnapshotFile(dir: File, offset: Long): File = - new File(dir, filenamePrefixFromOffset(offset) + ProducerIdSnapshotFileSuffix) + new File(dir, filenamePrefixFromOffset(offset) + ProducerSnapshotFileSuffix) def transactionIndexFile(dir: File, offset: Long): File = new File(dir, filenamePrefixFromOffset(offset) + TxnIndexFileSuffix) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 0ffb0d1b63efb..0a95959c193ae 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -403,7 +403,7 @@ object ProducerStateManager { } } - private def isSnapshotFile(file: File): Boolean = file.getName.endsWith(Log.ProducerIdSnapshotFileSuffix) + private def isSnapshotFile(file: File): Boolean = file.getName.endsWith(Log.ProducerSnapshotFileSuffix) } diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 6f13f4f7d8fce..f0ea50c06769f 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -114,7 +114,7 @@ object DumpLogSegments { dumpIndex(file, indexSanityOnly, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize) case Log.TimeIndexFileSuffix => dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize) - case Log.ProducerIdSnapshotFileSuffix => + case Log.ProducerSnapshotFileSuffix => dumpProducerIdSnapshot(file) case Log.TxnIndexFileSuffix => dumpTxnIndex(file) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 82d9d8ae7e347..2971cdadb03cb 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -167,7 +167,7 @@ class LogTest { assertTrue(log.logSegments.size >= 2) log.close() - logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.ProducerIdSnapshotFileSuffix)).foreach { file => + logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.ProducerSnapshotFileSuffix)).foreach { file => Files.delete(file.toPath) } @@ -533,7 +533,7 @@ class LogTest { producerEpoch = epoch, sequence = 1), leaderEpoch = 0) // Delete all snapshots prior to truncating - logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.ProducerIdSnapshotFileSuffix)).foreach { file => + logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.ProducerSnapshotFileSuffix)).foreach { file => Files.delete(file.toPath) } @@ -2587,7 +2587,7 @@ class LogTest { appendEndTxnMarkerAsLeader(log, pid4, epoch, ControlRecordType.COMMIT) // 90 // delete all snapshot files - logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.ProducerIdSnapshotFileSuffix)).foreach { file => + logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.ProducerSnapshotFileSuffix)).foreach { file => Files.delete(file.toPath) } From e4dadaa1a30911ecaafbb8462b98fcb578c2ba52 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 6 Oct 2017 00:36:24 +0100 Subject: [PATCH 05/10] Test that recovery point is updated after recovery --- core/src/test/scala/unit/kafka/log/LogTest.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 2971cdadb03cb..e0810f4ac46b7 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -1377,7 +1377,8 @@ class LogTest { } log.close() - def verifyRecoveredLog(log: Log) { + def verifyRecoveredLog(log: Log, expectedRecoveryPoint: Long) { + assertEquals(s"Unexpected recovery point", expectedRecoveryPoint, log.recoveryPoint) assertEquals(s"Should have $numMessages messages when log is reopened w/o recovery", numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) @@ -1387,12 +1388,12 @@ class LogTest { } log = createLog(logDir, logConfig, recoveryPoint = lastOffset) - verifyRecoveredLog(log) + verifyRecoveredLog(log, lastOffset) log.close() // test recovery case log = createLog(logDir, logConfig) - verifyRecoveredLog(log) + verifyRecoveredLog(log, lastOffset) log.close() } From 50f94b99d03ddf4f2cf1f733c165634dc1c12438 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 6 Oct 2017 09:52:58 +0100 Subject: [PATCH 06/10] Verify that we delete producer snapshots during recovery point checkpointing --- core/src/main/scala/kafka/log/Log.scala | 14 +++++-- .../scala/unit/kafka/log/LogManagerTest.scala | 39 ++++++++----------- .../server/HighwatermarkPersistenceTest.scala | 4 +- .../kafka/server/ReplicaManagerTest.scala | 10 ++--- .../scala/unit/kafka/utils/TestUtils.scala | 2 +- 5 files changed, 36 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 8d9e6b9d66085..637ef2057d59e 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1369,13 +1369,21 @@ class Log(@volatile var dir: File, * periodically and it can be behind after a hard shutdown. Since recovery starts from the recovery point, the logic * of rebuilding the producer snapshots in one pass and without loading older segments is simpler if we always * have a producer snapshot for all segments being recovered. + * + * Return the minimum snapshots offset that was retained. */ - def deleteSnapshotsAfterRecoveryPointCheckpoint(): Unit = { + def deleteSnapshotsAfterRecoveryPointCheckpoint(): Long = { + val minOffsetToRetain = minSnapshotsOffsetToRetain + producerStateManager.deleteSnapshotsBefore(minOffsetToRetain) + minOffsetToRetain + } + + // Visible for testing, see `deleteSnapshotsAfterRecoveryPointCheckpoint()` for details + private[log] def minSnapshotsOffsetToRetain: Long = { val twoSegmentsMinOffset = lowerSegment(activeSegment.baseOffset).getOrElse(activeSegment).baseOffset // Prefer segment base offset val recoveryPointOffset = lowerSegment(recoveryPoint).map(_.baseOffset).getOrElse(recoveryPoint) - val minOffsetToRetain = math.min(recoveryPointOffset, twoSegmentsMinOffset) - producerStateManager.deleteSnapshotsBefore(minOffsetToRetain) + math.min(recoveryPointOffset, twoSegmentsMinOffset) } private def lowerSegment(offset: Long): Option[LogSegment] = diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 9794b1a77d0d9..a3980f260bf50 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -31,9 +31,9 @@ import org.junit.{After, Before, Test} class LogManagerTest { - val time: MockTime = new MockTime() + val time = new MockTime() val maxRollInterval = 100 - val maxLogAgeMs = 10*60*1000 + val maxLogAgeMs = 10 * 60 * 1000 val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer) @@ -50,7 +50,6 @@ class LogManagerTest { logDir = TestUtils.tempDir() logManager = createLogManager() logManager.startup() - logDir = logManager.liveLogDirs(0) } @After @@ -58,6 +57,7 @@ class LogManagerTest { if (logManager != null) logManager.shutdown() Utils.delete(logDir) + // Some tests assign a new LogManager logManager.liveLogDirs.foreach(Utils.delete) } @@ -154,7 +154,7 @@ class LogManagerTest { time.sleep(log.config.fileDeleteDelayMs + 1) // there should be a log file, two indexes (the txn index is created lazily), - // the leader epoch checkpoint and two pid mapping files (one for the active and previous segments) + // the leader epoch checkpoint and two producer snapshot files (one for the active and previous segments) assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 3, log.dir.list.length) assertEquals("Should get empty fetch off new log.", 0, log.readUncommitted(offset + 1, 1024).records.sizeInBytes) try { @@ -220,7 +220,7 @@ class LogManagerTest { @Test def testLeastLoadedAssignment() { // create a log manager with multiple data directories - val dirs = Array(TestUtils.tempDir(), + val dirs = Seq(TestUtils.tempDir(), TestUtils.tempDir(), TestUtils.tempDir()) logManager.shutdown() @@ -253,7 +253,7 @@ class LogManagerTest { */ @Test def testCheckpointRecoveryPoints() { - verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1), new TopicPartition("test-b", 1)), logManager) + verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1), new TopicPartition("test-b", 1)), logManager, logDir) } /** @@ -262,11 +262,9 @@ class LogManagerTest { @Test def testRecoveryDirectoryMappingWithTrailingSlash() { logManager.shutdown() - logDir = TestUtils.tempDir() - logManager = TestUtils.createLogManager( - logDirs = Array(new File(logDir.getAbsolutePath + File.separator))) + logManager = TestUtils.createLogManager(logDirs = Seq(new File(TestUtils.tempDir().getAbsolutePath + File.separator))) logManager.startup() - verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager) + verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager, logManager.liveLogDirs.head) } /** @@ -275,34 +273,31 @@ class LogManagerTest { @Test def testRecoveryDirectoryMappingWithRelativeDirectory() { logManager.shutdown() - logDir = new File("data" + File.separator + logDir.getName).getAbsoluteFile - logDir.mkdirs() - logDir.deleteOnExit() - logManager = createLogManager() + logManager = createLogManager(Seq(new File("data", logDir.getName).getAbsoluteFile)) logManager.startup() - verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager) + verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager, logManager.liveLogDirs.head) } - - private def verifyCheckpointRecovery(topicPartitions: Seq[TopicPartition], - logManager: LogManager) { - val logs = topicPartitions.map(this.logManager.getOrCreateLog(_, logConfig)) - logs.foreach(log => { + private def verifyCheckpointRecovery(topicPartitions: Seq[TopicPartition], logManager: LogManager, logDir: File) { + val logs = topicPartitions.map(logManager.getOrCreateLog(_, logConfig)) + logs.foreach { log => for (_ <- 0 until 50) log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0) log.flush() - }) + println("log segments " + log.logSegments.size) + } logManager.checkpointLogRecoveryOffsets() val checkpoints = new OffsetCheckpointFile(new File(logDir, logManager.RecoveryPointCheckpointFile)).read() topicPartitions.zip(logs).foreach { case (tp, log) => assertEquals("Recovery point should equal checkpoint", checkpoints(tp), log.recoveryPoint) + assertEquals(Some(log.minSnapshotsOffsetToRetain), log.oldestProducerSnapshotOffset) } } - private def createLogManager(logDirs: Array[File] = Array(this.logDir)): LogManager = { + private def createLogManager(logDirs: Seq[File] = Seq(this.logDir)): LogManager = { TestUtils.createLogManager( defaultConfig = logConfig, logDirs = logDirs, diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 79acfa733cc89..1a75f94a9d08b 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -37,7 +37,7 @@ class HighwatermarkPersistenceTest { val zkUtils = EasyMock.createMock(classOf[ZkUtils]) val logManagers = configs map { config => TestUtils.createLogManager( - logDirs = config.logDirs.map(new File(_)).toArray, + logDirs = config.logDirs.map(new File(_)), cleanerConfig = CleanerConfig()) } @@ -47,7 +47,7 @@ class HighwatermarkPersistenceTest { @After def teardown() { - for(manager <- logManagers; dir <- manager.liveLogDirs) + for (manager <- logManagers; dir <- manager.liveLogDirs) Utils.delete(dir) } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index b8c78ff718283..57fe5b5204ffc 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -67,7 +67,7 @@ class ReplicaManagerTest { def testHighWaterMarkDirectoryMapping() { val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) val config = KafkaConfig.fromProps(props) - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size)) @@ -86,7 +86,7 @@ class ReplicaManagerTest { val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) val config = KafkaConfig.fromProps(props) - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size)) @@ -104,7 +104,7 @@ class ReplicaManagerTest { def testIllegalRequiredAcks() { val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) val config = KafkaConfig.fromProps(props) - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), Option(this.getClass.getName)) @@ -133,7 +133,7 @@ class ReplicaManagerTest { props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) val config = KafkaConfig.fromProps(props) val logProps = new Properties() - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray, LogConfig(logProps)) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps)) val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1)) val metadataCache = EasyMock.createMock(classOf[MetadataCache]) EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() @@ -594,7 +594,7 @@ class ReplicaManagerTest { props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) val config = KafkaConfig.fromProps(props) val logProps = new Properties() - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray, LogConfig(logProps)) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps)) val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, s"host$brokerId", brokerId)) val metadataCache = EasyMock.createMock(classOf[MetadataCache]) EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 1149db474d403..48688cda20f97 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1019,7 +1019,7 @@ object TestUtils extends Logging { /** * Create new LogManager instance with default configuration for testing */ - def createLogManager(logDirs: Array[File] = Array.empty[File], + def createLogManager(logDirs: Seq[File] = Seq.empty[File], defaultConfig: LogConfig = LogConfig(), cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false), time: MockTime = new MockTime()): LogManager = { From 0ab57a480dd6e3e6ddf905d25636ed49ccd27b38 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 6 Oct 2017 17:14:31 +0100 Subject: [PATCH 07/10] Flesh out snapshots upgrade test and refactor code to make it easier to test --- core/src/main/scala/kafka/log/Log.scala | 43 +++++------ .../src/main/scala/kafka/log/LogManager.scala | 9 ++- .../kafka/log/ProducerStateManager.scala | 42 ++++++----- .../scala/unit/kafka/log/LogManagerTest.scala | 3 +- .../test/scala/unit/kafka/log/LogTest.scala | 72 ++++++++++++------- .../kafka/server/ServerShutdownTest.scala | 3 +- .../scala/unit/kafka/utils/TestUtils.scala | 4 +- 7 files changed, 103 insertions(+), 73 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 637ef2057d59e..fc62bb8a67859 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -333,7 +333,7 @@ class Log(@volatile var dir: File, error("Could not find offset index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) recoverSegment(segment) } - segments.put(startOffset, segment) + addSegment(segment) } } } @@ -397,17 +397,17 @@ class Log(@volatile var dir: File, // before the swap file is restored as the new segment file. completeSwapOperations(swapFiles) - if(logSegments.isEmpty) { + if (logSegments.isEmpty) { // no existing segments, create a new mutable segment beginning at offset 0 - segments.put(0L, new LogSegment(dir = dir, - startOffset = 0, - indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize, - rollJitterMs = config.randomSegmentJitter, - time = time, - fileAlreadyExists = false, - initFileSize = this.initFileSize, - preallocate = config.preallocate)) + addSegment(new LogSegment(dir = dir, + startOffset = 0, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + rollJitterMs = config.randomSegmentJitter, + time = time, + fileAlreadyExists = false, + initFileSize = this.initFileSize, + preallocate = config.preallocate)) } else if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) { recoverLog() // reset the index size of the currently active log segment to allow more entries @@ -483,7 +483,8 @@ class Log(@volatile var dir: File, producerStateManager.takeSnapshot() } } else { - val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset + val nonEmptyBeforeTruncation = !producerStateManager.isEmpty + val snapOffsetBeforeTruncation = producerStateManager.mapEndOffset producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds()) // Only do the potentially expensive reloading if the last snapshot offset is lower than the log end @@ -492,7 +493,7 @@ class Log(@volatile var dir: File, // shouldn't change that fact (although it could cause a producerId to expire earlier than expected), // and we can skip the loading. This is an optimization for users which are not yet using // idempotent/transactional features yet. - if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) { + if (lastOffset > producerStateManager.mapEndOffset && (lastOffset > snapOffsetBeforeTruncation || nonEmptyBeforeTruncation)) { logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment => val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset) producerStateManager.updateMapEndOffset(startOffset) @@ -531,7 +532,7 @@ class Log(@volatile var dir: File, /** * Check if we have the "clean shutdown" file */ - private def hasCleanShutdownFile = new File(dir.getParentFile, CleanShutdownFile).exists() + private def hasCleanShutdownFile: Boolean = new File(dir.getParentFile, CleanShutdownFile).exists() /** * The number of segments in the log. @@ -1380,10 +1381,12 @@ class Log(@volatile var dir: File, // Visible for testing, see `deleteSnapshotsAfterRecoveryPointCheckpoint()` for details private[log] def minSnapshotsOffsetToRetain: Long = { - val twoSegmentsMinOffset = lowerSegment(activeSegment.baseOffset).getOrElse(activeSegment).baseOffset - // Prefer segment base offset - val recoveryPointOffset = lowerSegment(recoveryPoint).map(_.baseOffset).getOrElse(recoveryPoint) - math.min(recoveryPointOffset, twoSegmentsMinOffset) + lock synchronized { + val twoSegmentsMinOffset = lowerSegment(activeSegment.baseOffset).getOrElse(activeSegment).baseOffset + // Prefer segment base offset + val recoveryPointOffset = lowerSegment(recoveryPoint).map(_.baseOffset).getOrElse(recoveryPoint) + math.min(recoveryPointOffset, twoSegmentsMinOffset) + } } private def lowerSegment(offset: Long): Option[LogSegment] = @@ -1493,7 +1496,7 @@ class Log(@volatile var dir: File, /** * The time this log is last known to have been fully flushed to disk */ - def lastFlushTime(): Long = lastflushedTime.get + def lastFlushTime: Long = lastflushedTime.get /** * The active segment that is currently taking appends @@ -1512,7 +1515,7 @@ class Log(@volatile var dir: File, def logSegments(from: Long, to: Long): Iterable[LogSegment] = { lock synchronized { val floor = segments.floorKey(from) - if(floor eq null) + if (floor eq null) segments.headMap(to).values.asScala else segments.subMap(floor, true, to, false).values.asScala diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index aaa228d9cb2f4..6da494ed0288e 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -63,10 +63,11 @@ class LogManager(logDirs: Seq[File], brokerTopicStats: BrokerTopicStats, logDirFailureChannel: LogDirFailureChannel, time: Time) extends Logging with KafkaMetricsGroup { - val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" - val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint" + + import LogManager._ + val LockFile = ".lock" - val InitialTaskDelayMs = 30*1000 + val InitialTaskDelayMs = 30 * 1000 private val logCreationOrDeletionLock = new Object private val logs = new Pool[TopicPartition, Log]() @@ -769,6 +770,8 @@ class LogManager(logDirs: Seq[File], object LogManager { + val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" + val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint" val ProducerIdExpirationCheckIntervalMs = 10 * 60 * 1000 def apply(config: KafkaConfig, diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 0a95959c193ae..f7cfb8e6c4d5a 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -405,6 +405,24 @@ object ProducerStateManager { private def isSnapshotFile(file: File): Boolean = file.getName.endsWith(Log.ProducerSnapshotFileSuffix) + // visible for testing + private[log] def listSnapshotFiles(dir: File): Seq[File] = { + if (dir.exists && dir.isDirectory) { + Option(dir.listFiles).map { files => + files.filter(f => f.isFile && isSnapshotFile(f)).toSeq + }.getOrElse(Seq.empty) + } else Seq.empty + } + + // visible for testing + private[log] def deleteSnapshotsBefore(dir: File, offset: Long): Unit = deleteSnapshotFiles(dir, _ < offset) + + private def deleteSnapshotFiles(dir: File, predicate: Long => Boolean = _ => true) { + listSnapshotFiles(dir).filter(file => predicate(offsetFromFile(file))).foreach { file => + Files.deleteIfExists(file.toPath) + } + } + } /** @@ -538,9 +556,9 @@ class ProducerStateManager(val topicPartition: TopicPartition, */ def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) { // remove all out of range snapshots - deleteSnapshotFiles { snapOffset => + deleteSnapshotFiles(logDir, { snapOffset => snapOffset > logEndOffset || snapOffset <= logStartOffset - } + }) if (logEndOffset != mapEndOffset) { producers.clear() @@ -674,7 +692,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, producers.clear() ongoingTxns.clear() unreplicatedTxns.clear() - deleteSnapshotFiles() + deleteSnapshotFiles(logDir) lastSnapOffset = 0L lastMapOffset = 0L } @@ -695,17 +713,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, } @threadsafe - def deleteSnapshotsBefore(offset: Long): Unit = { - deleteSnapshotFiles(_ < offset) - } - - private def listSnapshotFiles: Seq[File] = { - if (logDir.exists && logDir.isDirectory) { - Option(logDir.listFiles).map { files => - files.filter(f => f.isFile && isSnapshotFile(f)).toSeq - }.getOrElse(Seq.empty) - } else Seq.empty - } + def deleteSnapshotsBefore(offset: Long): Unit = ProducerStateManager.deleteSnapshotsBefore(logDir, offset) private def oldestSnapshotFile: Option[File] = { val files = listSnapshotFiles @@ -723,10 +731,6 @@ class ProducerStateManager(val topicPartition: TopicPartition, None } - private def deleteSnapshotFiles(predicate: Long => Boolean = _ => true) { - listSnapshotFiles.filter(file => predicate(offsetFromFile(file))).foreach { file => - Files.deleteIfExists(file.toPath) - } - } + private def listSnapshotFiles: Seq[File] = ProducerStateManager.listSnapshotFiles(logDir) } diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index a3980f260bf50..6544d43f6f98e 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -285,11 +285,10 @@ class LogManagerTest { log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0) log.flush() - println("log segments " + log.logSegments.size) } logManager.checkpointLogRecoveryOffsets() - val checkpoints = new OffsetCheckpointFile(new File(logDir, logManager.RecoveryPointCheckpointFile)).read() + val checkpoints = new OffsetCheckpointFile(new File(logDir, LogManager.RecoveryPointCheckpointFile)).read() topicPartitions.zip(logs).foreach { case (tp, log) => assertEquals("Recovery point should equal checkpoint", checkpoints(tp), log.recoveryPoint) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index e0810f4ac46b7..359d448d19a73 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -19,7 +19,6 @@ package kafka.log import java.io._ import java.nio.ByteBuffer -import java.nio.file.Files import java.util.Properties import org.apache.kafka.common.errors._ @@ -27,7 +26,7 @@ import kafka.common.KafkaException import org.junit.Assert._ import org.junit.{After, Before, Test} import kafka.utils._ -import kafka.server.{BrokerTopicStats, KafkaConfig, LogDirFailureChannel} +import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel} import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.MemoryRecords.RecordFilter @@ -39,7 +38,7 @@ import org.apache.kafka.common.utils.{Time, Utils} import org.easymock.EasyMock import scala.collection.JavaConverters._ -import scala.collection.mutable.ListBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} class LogTest { @@ -156,26 +155,44 @@ class LogTest { // simulate the upgrade path by creating a new log with several segments, deleting the // snapshot files, and then reloading the log val logConfig = createLogConfig(segmentBytes = 64 * 10) - val log = createLog(logDir, logConfig) + var log = createLog(logDir, logConfig) assertEquals(None, log.oldestProducerSnapshotOffset) for (i <- 0 to 100) { val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes) log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) } - assertTrue(log.logSegments.size >= 2) + val logEndOffset = log.logEndOffset log.close() - logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.ProducerSnapshotFileSuffix)).foreach { file => - Files.delete(file.toPath) - } + val cleanShutdownFile = createCleanShutdownFile() + deleteProducerSnapshotFiles() - val reloadedLog = createLog(logDir, logConfig) - val expectedSnapshotsOffsets = log.logSegments.toSeq.reverse.take(2).map(_.baseOffset) ++ Seq(reloadedLog.logEndOffset) - expectedSnapshotsOffsets.foreach { offset => - assertTrue(Log.producerSnapshotFile(logDir, offset).exists) - } + // Reload after clean shutdown + log = createLog(logDir, logConfig, recoveryPoint = logEndOffset) + var expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).takeRight(2).toVector :+ log.logEndOffset + assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) + log.close() + + Utils.delete(cleanShutdownFile) + deleteProducerSnapshotFiles() + + // Reload after unclean shutdown with recoveryPoint set to log end offset + log = createLog(logDir, logConfig, recoveryPoint = logEndOffset) + // Is this working as intended? + expectedSnapshotOffsets = Vector(log.logEndOffset) + assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) + log.close() + + deleteProducerSnapshotFiles() + + // Reload after unclean shutdown with recoveryPoint set to 0 + log = createLog(logDir, logConfig, recoveryPoint = 0L) + // Is this working as intended? + expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).tail.toVector :+ log.logEndOffset + assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) + log.close() } @Test @@ -343,7 +360,7 @@ class LogTest { logDirFailureChannel = null) EasyMock.verify(stateManager) - cleanShutdownFile.delete() + Utils.delete(cleanShutdownFile) } @Test @@ -379,7 +396,7 @@ class LogTest { logDirFailureChannel = null) EasyMock.verify(stateManager) - cleanShutdownFile.delete() + Utils.delete(cleanShutdownFile) } @Test @@ -532,10 +549,7 @@ class LogTest { log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), producerId = pid, producerEpoch = epoch, sequence = 1), leaderEpoch = 0) - // Delete all snapshots prior to truncating - logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.ProducerSnapshotFileSuffix)).foreach { file => - Files.delete(file.toPath) - } + deleteProducerSnapshotFiles() log.truncateTo(1L) assertEquals(1, log.activeProducersWithLastSequence.size) @@ -660,7 +674,7 @@ class LogTest { } @Test - def testTakeSnapshotOnRollAndDeleteSnapshotOnFlush() { + def testTakeSnapshotOnRollAndDeleteSnapshotOnRecoveryPointCheckpoint() { val logConfig = createLogConfig(segmentBytes = 2048 * 5) val log = createLog(logDir, logConfig) log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0) @@ -1201,7 +1215,7 @@ class LogTest { maxOffset = Some(numMessages + 1)).records assertEquals("Should be no more messages", 0, lastRead.records.asScala.size) - // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure + // check that rolling the log forced a flushed, the flush is async so retry in case of failure TestUtils.retry(1000L){ assertTrue("Log role should have forced flush", log.recoveryPoint >= log.activeSegment.baseOffset) } @@ -1830,7 +1844,7 @@ class LogTest { recoveryPoint = log.logEndOffset log = createLog(logDir, logConfig) assertEquals(recoveryPoint, log.logEndOffset) - cleanShutdownFile.delete() + Utils.delete(cleanShutdownFile) } @Test @@ -2587,10 +2601,7 @@ class LogTest { appendPid4(4) // 89 appendEndTxnMarkerAsLeader(log, pid4, epoch, ControlRecordType.COMMIT) // 90 - // delete all snapshot files - logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.ProducerSnapshotFileSuffix)).foreach { file => - Files.delete(file.toPath) - } + deleteProducerSnapshotFiles() // delete the last offset and transaction index files to force recovery. this should force us to rebuild // the producer state from the start of the log @@ -2933,4 +2944,13 @@ class LogTest { assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists()) cleanShutdownFile } + + private def deleteProducerSnapshotFiles(): Unit = { + val files = logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.ProducerSnapshotFileSuffix)) + files.foreach(Utils.delete) + } + + private def listProducerSnapshotOffsets: Seq[Long] = + ProducerStateManager.listSnapshotFiles(logDir).map(Log.offsetFromFile).sorted + } diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 66845c1d84e5f..23c40fe0ddbac 100755 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -24,6 +24,7 @@ import kafka.api.FetchRequestBuilder import kafka.message.ByteBufferMessageSet import java.io.File +import kafka.log.LogManager import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer} import org.junit.{Before, Test} @@ -67,7 +68,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness { // do a clean shutdown and check that offset checkpoint file exists server.shutdown() for (logDir <- config.logDirs) { - val OffsetCheckpointFile = new File(logDir, server.logManager.RecoveryPointCheckpointFile) + val OffsetCheckpointFile = new File(logDir, LogManager.RecoveryPointCheckpointFile) assertTrue(OffsetCheckpointFile.exists) assertTrue(OffsetCheckpointFile.length() > 0) } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 48688cda20f97..135227fc70537 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -836,8 +836,8 @@ object TestUtils extends Logging { return } catch { case e: AssertionError => - val ellapsed = System.currentTimeMillis - startTime - if(ellapsed > maxWaitMs) { + val elapsed = System.currentTimeMillis - startTime + if (elapsed > maxWaitMs) { throw e } else { info("Attempt failed, sleeping for " + wait + ", and then retrying.") From 9c6b641c490c81fa57020ff6094696960413f1f7 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 6 Oct 2017 19:11:45 +0100 Subject: [PATCH 08/10] Remove duplicate LogSegment.read call during `Log` initialisation --- core/src/main/scala/kafka/log/Log.scala | 72 +++++++++++++------------ 1 file changed, 38 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index fc62bb8a67859..2a8f1403a1c57 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -192,14 +192,14 @@ class Log(@volatile var dir: File, locally { val startMs = time.milliseconds - loadSegments() + val nextOffset = loadSegments() /* Calculate the offset of the next message */ - nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset, activeSegment.baseOffset, activeSegment.size) + nextOffsetMetadata = new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size) leaderEpochCache.clearAndFlushLatest(nextOffsetMetadata.messageOffset) - logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset) + logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset) // The earliest leader epoch may not be flushed during a hard failure. Recover it here. leaderEpochCache.clearAndFlushEarliest(logStartOffset) @@ -382,9 +382,9 @@ class Log(@volatile var dir: File, } } - // Load the log segments from the log files on disk + // Load the log segments from the log files on disk and return the next offset // This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded - private def loadSegments() { + private def loadSegments(): Long = { // first do a pass through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles() @@ -408,50 +408,54 @@ class Log(@volatile var dir: File, fileAlreadyExists = false, initFileSize = this.initFileSize, preallocate = config.preallocate)) + 0 } else if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) { - recoverLog() + val nextOffset = recoverLog() // reset the index size of the currently active log segment to allow more entries activeSegment.index.resize(config.maxIndexSize) activeSegment.timeIndex.resize(config.maxIndexSize) - } + nextOffset + } else 0 } private def updateLogEndOffset(messageOffset: Long) { nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size) } - // This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded - private def recoverLog() { + /** + * Recover the log segments and return the next offset after recovery. + * + * This method does not need to convert IOException to KafkaStorageException because it is only called before all + * logs are loaded. + */ + private def recoverLog(): Long = { // if we have the clean shutdown marker, skip recovery - if (hasCleanShutdownFile) { - recoveryPoint = activeSegment.nextOffset - return - } - - // okay we need to actually recovery this log - val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator - while (unflushed.hasNext) { - val segment = unflushed.next - info("Recovering unflushed segment %d in log %s.".format(segment.baseOffset, name)) - val truncatedBytes = - try { - recoverSegment(segment, Some(leaderEpochCache)) - } catch { - case _: InvalidOffsetException => - val startOffset = segment.baseOffset - warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " + - "creating an empty one with starting offset " + startOffset) - segment.truncateTo(startOffset) + if (!hasCleanShutdownFile) { + // okay we need to actually recovery this log + val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator + while (unflushed.hasNext) { + val segment = unflushed.next + info("Recovering unflushed segment %d in log %s.".format(segment.baseOffset, name)) + val truncatedBytes = + try { + recoverSegment(segment, Some(leaderEpochCache)) + } catch { + case _: InvalidOffsetException => + val startOffset = segment.baseOffset + warn("Found invalid offset during recovery for log " + dir.getName + ". Deleting the corrupt segment and " + + "creating an empty one with starting offset " + startOffset) + segment.truncateTo(startOffset) + } + if (truncatedBytes > 0) { + // we had an invalid message, delete all remaining log + warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(segment.baseOffset, name, + segment.nextOffset())) + unflushed.foreach(deleteSegment) } - if (truncatedBytes > 0) { - // we had an invalid message, delete all remaining log - warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(segment.baseOffset, name, - segment.nextOffset())) - unflushed.foreach(deleteSegment) } } - recoveryPoint = activeSegment.nextOffset + recoveryPoint } private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized { From be001bcf8edf54284cd52947606a20c72b9eb31d Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 6 Oct 2017 19:12:07 +0100 Subject: [PATCH 09/10] Push wip test for Jason to take a look --- .../test/scala/unit/kafka/log/LogTest.scala | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 359d448d19a73..5c52377b9212d 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -195,6 +195,82 @@ class LogTest { log.close() } + @Test + def testProducerSnapshotsRecoveryAfterUncleanShutdown(): Unit = { + val logConfig = createLogConfig(segmentBytes = 64 * 10) + var log = createLog(logDir, logConfig) + assertEquals(None, log.oldestProducerSnapshotOffset) + + for (i <- 0 to 100) { + val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes) + log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) + } + + assertTrue(log.logSegments.size >= 5) + val segmentOffsets = log.logSegments.toVector.map(_.baseOffset) + + // We want the recovery point to be past the segment offset and before the last 2 segments including a gap of + // 1 segment. We collect the data before closing the log. + val offsetForSegmentAfterRecoveryPoint = segmentOffsets(segmentOffsets.size - 3) + val offsetForRecoveryPointSegment = segmentOffsets(segmentOffsets.size - 4) + val recoveryPoint = offsetForRecoveryPointSegment + 1 + println("recovery point " + recoveryPoint) + assertTrue(recoveryPoint < offsetForSegmentAfterRecoveryPoint) + log.close() + + val segmentsWithReads = ArrayBuffer[LogSegment]() + + def createLogWithInterceptedReads(recoveryPoint: Long) = { + val maxProducerIdExpirationMs = 60 * 60 * 1000 + val topicPartition = Log.parseTopicPartitionName(logDir) + val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs) + + // Intercept all segment read calls + new Log(logDir, logConfig, logStartOffset = 0, recoveryPoint = recoveryPoint, mockTime.scheduler, + brokerTopicStats, mockTime, maxProducerIdExpirationMs, LogManager.ProducerIdExpirationCheckIntervalMs, + topicPartition, producerStateManager, new LogDirFailureChannel(10)) { + + override def addSegment(segment: LogSegment): LogSegment = { + val wrapper = new LogSegment(segment.log, segment.index, segment.timeIndex, segment.txnIndex, segment.baseOffset, + segment.indexIntervalBytes, segment.rollJitterMs, mockTime) { + + override def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long, + minOneMessage: Boolean): FetchDataInfo = { + new Exception().printStackTrace() + segmentsWithReads += this + super.read(startOffset, maxOffset, maxSize, maxPosition, minOneMessage) + } + } + super.addSegment(wrapper) + } + } + } + + // Retain snapshots for the last 2 segments + ProducerStateManager.deleteSnapshotsBefore(logDir, segmentOffsets(segmentOffsets.size - 2)) + log = createLogWithInterceptedReads(recoveryPoint) + // We will reload all segments because the recovery point is behind the producer snapshot files (pre KAFKA-5829 behaviour) + assertEquals(segmentOffsets, segmentsWithReads.map(_.baseOffset)) + var expectedSnapshotOffsets = segmentOffsets.takeRight(3) :+ log.logEndOffset + assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) + log.close() + segmentsWithReads.clear() + + // Only delete snapshots before the base offset of the recovery point segment (post KAFKA-5829 behaviour) to + // avoid reading all segments + ProducerStateManager.deleteSnapshotsBefore(logDir, offsetForRecoveryPointSegment) + log = createLogWithInterceptedReads(recoveryPoint = recoveryPoint) + expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).toVector.takeRight(3) :+ log.logEndOffset + assertEquals(expectedSnapshotOffsets, segmentsWithReads.map(_.baseOffset)) + assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) + + // Verify that we keep 2 snapshot files if we checkpoint the log end offset + log.deleteSnapshotsAfterRecoveryPointCheckpoint() + expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset + assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) + log.close() + } + @Test def testSizeForLargeLogs(): Unit = { val largeSize = Int.MaxValue.toLong * 2 From 6518a8c17c1f6612fcb5ce4b0777fd0b2f4b77d3 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 6 Oct 2017 22:55:59 +0100 Subject: [PATCH 10/10] Complete test and update code for it to pass --- core/src/main/scala/kafka/log/Log.scala | 10 +++++-- .../test/scala/unit/kafka/log/LogTest.scala | 29 +++++++++++++------ 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 2a8f1403a1c57..a6f76ab00eb8b 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -348,6 +348,11 @@ class Log(@volatile var dir: File, loadProducersFromLog(stateManager, fetchDataInfo.records) } stateManager.updateMapEndOffset(segment.baseOffset) + + // take a snapshot for the first recovered segment to avoid reloading all the segments if we shutdown before we + // checkpoint the recovery point + stateManager.takeSnapshot() + val bytesTruncated = segment.recover(stateManager, leaderEpochCache) // once we have recovered the segment's data, take a snapshot to ensure that we won't @@ -487,8 +492,7 @@ class Log(@volatile var dir: File, producerStateManager.takeSnapshot() } } else { - val nonEmptyBeforeTruncation = !producerStateManager.isEmpty - val snapOffsetBeforeTruncation = producerStateManager.mapEndOffset + val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds()) // Only do the potentially expensive reloading if the last snapshot offset is lower than the log end @@ -497,7 +501,7 @@ class Log(@volatile var dir: File, // shouldn't change that fact (although it could cause a producerId to expire earlier than expected), // and we can skip the loading. This is an optimization for users which are not yet using // idempotent/transactional features yet. - if (lastOffset > producerStateManager.mapEndOffset && (lastOffset > snapOffsetBeforeTruncation || nonEmptyBeforeTruncation)) { + if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) { logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment => val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset) producerStateManager.updateMapEndOffset(startOffset) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 5c52377b9212d..1d0fd15818c13 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -27,7 +27,7 @@ import org.junit.Assert._ import org.junit.{After, Before, Test} import kafka.utils._ import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel} -import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} +import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention @@ -180,8 +180,8 @@ class LogTest { // Reload after unclean shutdown with recoveryPoint set to log end offset log = createLog(logDir, logConfig, recoveryPoint = logEndOffset) - // Is this working as intended? - expectedSnapshotOffsets = Vector(log.logEndOffset) + // Note that we don't maintain the guarantee of having a snapshot for the 2 most recent segments in this case + expectedSnapshotOffsets = Vector(log.logSegments.last.baseOffset, log.logEndOffset) assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) log.close() @@ -208,17 +208,19 @@ class LogTest { assertTrue(log.logSegments.size >= 5) val segmentOffsets = log.logSegments.toVector.map(_.baseOffset) + val activeSegmentOffset = segmentOffsets.last // We want the recovery point to be past the segment offset and before the last 2 segments including a gap of // 1 segment. We collect the data before closing the log. val offsetForSegmentAfterRecoveryPoint = segmentOffsets(segmentOffsets.size - 3) val offsetForRecoveryPointSegment = segmentOffsets(segmentOffsets.size - 4) + val (segOffsetsBeforeRecovery, segOffsetsAfterRecovery) = segmentOffsets.partition(_ < offsetForRecoveryPointSegment) val recoveryPoint = offsetForRecoveryPointSegment + 1 - println("recovery point " + recoveryPoint) assertTrue(recoveryPoint < offsetForSegmentAfterRecoveryPoint) log.close() val segmentsWithReads = ArrayBuffer[LogSegment]() + val recoveredSegments = ArrayBuffer[LogSegment]() def createLogWithInterceptedReads(recoveryPoint: Long) = { val maxProducerIdExpirationMs = 60 * 60 * 1000 @@ -240,6 +242,12 @@ class LogTest { segmentsWithReads += this super.read(startOffset, maxOffset, maxSize, maxPosition, minOneMessage) } + + override def recover(producerStateManager: ProducerStateManager, + leaderEpochCache: Option[LeaderEpochCache]): Int = { + recoveredSegments += this + super.recover(producerStateManager, leaderEpochCache) + } } super.addSegment(wrapper) } @@ -248,20 +256,23 @@ class LogTest { // Retain snapshots for the last 2 segments ProducerStateManager.deleteSnapshotsBefore(logDir, segmentOffsets(segmentOffsets.size - 2)) - log = createLogWithInterceptedReads(recoveryPoint) + log = createLogWithInterceptedReads(offsetForRecoveryPointSegment) // We will reload all segments because the recovery point is behind the producer snapshot files (pre KAFKA-5829 behaviour) - assertEquals(segmentOffsets, segmentsWithReads.map(_.baseOffset)) - var expectedSnapshotOffsets = segmentOffsets.takeRight(3) :+ log.logEndOffset + assertEquals(segOffsetsBeforeRecovery, segmentsWithReads.map(_.baseOffset) -- Seq(activeSegmentOffset)) + assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset)) + var expectedSnapshotOffsets = segmentOffsets.takeRight(4) :+ log.logEndOffset assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) log.close() segmentsWithReads.clear() + recoveredSegments.clear() // Only delete snapshots before the base offset of the recovery point segment (post KAFKA-5829 behaviour) to // avoid reading all segments ProducerStateManager.deleteSnapshotsBefore(logDir, offsetForRecoveryPointSegment) log = createLogWithInterceptedReads(recoveryPoint = recoveryPoint) - expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).toVector.takeRight(3) :+ log.logEndOffset - assertEquals(expectedSnapshotOffsets, segmentsWithReads.map(_.baseOffset)) + assertEquals(Seq(activeSegmentOffset), segmentsWithReads.map(_.baseOffset)) + assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset)) + expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).toVector.takeRight(4) :+ log.logEndOffset assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) // Verify that we keep 2 snapshot files if we checkpoint the log end offset