From 157b8523ac5de7f8c954889f0a719b956810fb76 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 28 Sep 2016 13:28:31 -0700 Subject: [PATCH 1/2] Get rid of CleanShutdownFile now that we are well past 0.8.2 :) --- core/src/main/scala/kafka/log/Log.scala | 22 +-- .../src/main/scala/kafka/log/LogManager.scala | 32 +---- .../scala/other/kafka/StressTestLog.scala | 5 +- .../other/kafka/TestLinearWriteSpeed.scala | 10 +- .../kafka/log/BrokerCompressionTest.scala | 7 +- .../kafka/log/LogCleanerIntegrationTest.scala | 8 +- .../kafka/log/LogCleanerManagerTest.scala | 4 +- .../scala/unit/kafka/log/LogCleanerTest.scala | 6 +- .../test/scala/unit/kafka/log/LogTest.scala | 130 +++++++++++------- 9 files changed, 123 insertions(+), 101 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 6b5769656eae5..c7909426727ef 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -22,7 +22,7 @@ import kafka.utils._ import kafka.message._ import kafka.common._ import kafka.metrics.KafkaMetricsGroup -import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata} +import kafka.server.{BrokerState, BrokerTopicStats, FetchDataInfo, LogOffsetMetadata, RecoveringFromUncleanShutdown} import java.io.{File, IOException} import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} import java.util.concurrent.atomic._ @@ -86,7 +86,8 @@ class Log(val dir: File, @volatile var config: LogConfig, @volatile var recoveryPoint: Long = 0L, scheduler: Scheduler, - time: Time = SystemTime) extends Logging with KafkaMetricsGroup { + time: Time = SystemTime, + val brokerState: BrokerState) extends Logging with KafkaMetricsGroup { import kafka.log.Log._ @@ -275,13 +276,8 @@ class Log(val dir: File, } private def recoverLog() { - // if we have the clean shutdown marker, skip recovery - if(hasCleanShutdownFile) { - this.recoveryPoint = activeSegment.nextOffset - return - } - // okay we need to actually recovery this log + brokerState.newState(RecoveringFromUncleanShutdown) val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator while(unflushed.hasNext) { val curr = unflushed.next @@ -304,11 +300,6 @@ class Log(val dir: File, } } - /** - * Check if we have the "clean shutdown" file - */ - private def hasCleanShutdownFile() = new File(dir.getParentFile, CleanShutdownFile).exists() - /** * The number of segments in the log. * Take care! this is an O(n) operation. @@ -1044,11 +1035,6 @@ object Log { /** A temporary file used when swapping files into the log */ val SwapFileSuffix = ".swap" - /** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8. This is required to maintain backwards compatibility - * with 0.8 and avoid unnecessary log recovery when upgrading from 0.8 to 0.8.1 */ - /** TODO: Get rid of CleanShutdownFile in 0.8.2 */ - val CleanShutdownFile = ".kafka_cleanshutdown" - /** * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros * so that ls sorts the files numerically. diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 7806edad5ad6e..fd7a573adf17c 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -110,24 +110,12 @@ class LogManager(val logDirs: Array[File], info("Loading logs.") val startMs = time.milliseconds val threadPools = mutable.ArrayBuffer.empty[ExecutorService] - val jobs = mutable.Map.empty[File, Seq[Future[_]]] + val jobs = mutable.ArrayBuffer.empty[Future[_]] for (dir <- this.logDirs) { val pool = Executors.newFixedThreadPool(ioThreads) threadPools.append(pool) - val cleanShutdownFile = new File(dir, Log.CleanShutdownFile) - - if (cleanShutdownFile.exists) { - debug( - "Found clean shutdown file. " + - "Skipping recovery for all logs in data directory: " + - dir.getAbsolutePath) - } else { - // log recovery itself is being performed by `Log` class during initialization - brokerState.newState(RecoveringFromUncleanShutdown) - } - var recoveryPoints = Map[TopicAndPartition, Long]() try { recoveryPoints = this.recoveryPointCheckpoints(dir).read @@ -148,7 +136,7 @@ class LogManager(val logDirs: Array[File], val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) - val current = new Log(logDir, config, logRecoveryPoint, scheduler, time) + val current = new Log(logDir, config, logRecoveryPoint, scheduler, time, brokerState) val previous = this.logs.put(topicPartition, current) if (previous != null) { @@ -158,16 +146,13 @@ class LogManager(val logDirs: Array[File], } } } - - jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq + jobs ++= jobsForDir.map(pool.submit).toSeq } try { - for ((cleanShutdownFile, dirJobs) <- jobs) { - dirJobs.foreach(_.get) - cleanShutdownFile.delete() - } + // TODO proper block on set of futures + jobs.foreach(_.get) } catch { case e: ExecutionException => { error("There was an error in one of the threads during logs loading: " + e.getCause) @@ -250,10 +235,6 @@ class LogManager(val logDirs: Array[File], // update the last flush point debug("Updating recovery points at " + dir) checkpointLogsInDir(dir) - - // mark that the shutdown was clean by creating marker file - debug("Writing clean shutdown marker at " + dir) - CoreUtils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile()) } } catch { case e: ExecutionException => { @@ -363,7 +344,8 @@ class LogManager(val logDirs: Array[File], config, recoveryPoint = 0L, scheduler, - time) + time, + brokerState) logs.put(topicAndPartition, log) info("Created log for partition [%s,%d] in %s with properties {%s}." .format(topicAndPartition.topic, diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index f5cee0c9edd13..57cbf90eb93a1 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic._ import kafka.message._ import kafka.log._ +import kafka.server.BrokerState import kafka.utils._ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException import org.apache.kafka.common.utils.Utils @@ -40,12 +41,14 @@ object StressTestLog { logProprties.put(LogConfig.SegmentBytesProp, 64*1024*1024: java.lang.Integer) logProprties.put(LogConfig.MaxMessageBytesProp, Int.MaxValue: java.lang.Integer) logProprties.put(LogConfig.SegmentIndexBytesProp, 1024*1024: java.lang.Integer) + val brokerState = new BrokerState() val log = new Log(dir = dir, config = LogConfig(logProprties), recoveryPoint = 0L, scheduler = time.scheduler, - time = time) + time = time, + brokerState = brokerState) val writer = new WriterThread(log) writer.start() val reader = new ReaderThread(log) diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index 6bd8e4f5fc7a9..e0e19ea82471a 100755 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -23,8 +23,9 @@ import java.nio.channels._ import java.util.{Properties, Random} import kafka.log._ -import kafka.utils._ import kafka.message._ +import kafka.server.BrokerState +import kafka.utils._ import scala.math._ import joptsimple._ @@ -117,7 +118,8 @@ object TestLinearWriteSpeed { val logProperties = new Properties() logProperties.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) logProperties.put(LogConfig.FlushMessagesProp, flushInterval: java.lang.Long) - writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new LogConfig(logProperties), scheduler, messageSet) + val brokerState = new BrokerState() + writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new LogConfig(logProperties), scheduler, messageSet, brokerState) } else { System.err.println("Must specify what to write to with one of --log, --channel, or --mmap") System.exit(1) @@ -199,9 +201,9 @@ object TestLinearWriteSpeed { } } - class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: ByteBufferMessageSet) extends Writable { + class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: ByteBufferMessageSet, brokerState: BrokerState) extends Writable { Utils.delete(dir) - val log = new Log(dir, config, 0L, scheduler, SystemTime) + val log = new Log(dir, config, 0L, scheduler, SystemTime, brokerState) def write(): Int = { log.append(messages, true) messages.sizeInBytes diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala index 7487bc57e8fc7..fc490ee71810a 100755 --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -17,8 +17,9 @@ package kafka.log -import kafka.utils._ import kafka.message._ +import kafka.server.BrokerState +import kafka.utils._ import org.scalatest.junit.JUnitSuite import org.junit._ import org.junit.Assert._ @@ -37,6 +38,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin val logDir = TestUtils.randomPartitionLogDir(tmpDir) val time = new MockTime(0) val logConfig = LogConfig() + val brokerState = new BrokerState() @After def tearDown() { @@ -52,7 +54,8 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin val logProps = new Properties() logProps.put(LogConfig.CompressionTypeProp,brokerCompression) /*configure broker-side compression */ - val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time, + brokerState = brokerState) /* append two messages */ log.append(new ByteBufferMessageSet(messageCompressionCode, new Message("hello".getBytes), new Message("there".getBytes))) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 250c8b8f25fcb..87b6d999178dc 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -23,7 +23,7 @@ import java.util.Properties import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0} import kafka.common.TopicAndPartition import kafka.message._ -import kafka.server.OffsetCheckpoint +import kafka.server.{BrokerState, OffsetCheckpoint} import kafka.utils._ import org.apache.kafka.common.record.CompressionType import org.apache.kafka.common.utils.Utils @@ -319,7 +319,8 @@ class LogCleanerIntegrationTest(compressionCodec: String) { maxMessageSize: Int = 128, logCleanerBackOffMillis: Long = 15000L, propertyOverrides: Properties = new Properties()): LogCleaner = { - + val brokerState = new BrokerState() + // create partitions and add them to the pool val logs = new Pool[TopicAndPartition, Log]() for(i <- 0 until parts) { @@ -330,7 +331,8 @@ class LogCleanerIntegrationTest(compressionCodec: String) { LogConfig(logConfigProperties(propertyOverrides, maxMessageSize, minCleanableDirtyRatio)), recoveryPoint = 0L, scheduler = time.scheduler, - time = time) + time = time, + brokerState = brokerState) logs.put(TopicAndPartition("log", i), log) } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 6e5806f9610ed..8c6775d933c33 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -181,11 +181,13 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val config = LogConfig(logProps) val partitionDir = new File(logDir, "log-0") + val brokerState = new BrokerState() val log = new Log(partitionDir, config, recoveryPoint = 0L, time.scheduler, - time) + time, + brokerState) log } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index d80fba1905a7d..6eb0b5fdd3894 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -24,6 +24,7 @@ import java.util.Properties import kafka.common._ import kafka.message._ +import kafka.server.BrokerState import kafka.utils._ import org.apache.kafka.common.record.{MemoryRecords, TimestampType} import org.apache.kafka.common.utils.Utils @@ -47,6 +48,8 @@ class LogCleanerTest extends JUnitSuite { val logConfig = LogConfig(logProps) val time = new MockTime() val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time) + val brokerState = new BrokerState() + @After def teardown(): Unit = { @@ -767,7 +770,8 @@ class LogCleanerTest extends JUnitSuite { def makeLog(dir: File = dir, config: LogConfig = logConfig) = - new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time, + brokerState = brokerState) def noOpCheckDone(topicAndPartition: TopicAndPartition) { /* do nothing */ } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index d18719a8570bf..6f55383b14031 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -23,6 +23,7 @@ import java.util.Properties import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException} import kafka.api.ApiVersion import kafka.common.LongRef +import kafka.server.BrokerState import org.junit.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.{After, Before, Test} @@ -38,6 +39,7 @@ class LogTest extends JUnitSuite { val time = new MockTime() var config: KafkaConfig = null val logConfig = LogConfig() + val brokerState = new BrokerState() @Before def setUp() { @@ -73,7 +75,8 @@ class LogTest extends JUnitSuite { LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, - time = time) + time = time, + brokerState = brokerState) assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) // Test the segment rolling behavior when messages do not have a timestamp. time.sleep(log.config.segmentMs + 1) @@ -132,7 +135,8 @@ class LogTest extends JUnitSuite { LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, - time = time) + time = time, + brokerState = brokerState) assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) log.append(set) @@ -159,7 +163,8 @@ class LogTest extends JUnitSuite { // We use need to use magic value 1 here because the test is message size sensitive. logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString) // create a log - val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time, + brokerState = brokerState) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) // segments expire in size @@ -175,7 +180,8 @@ class LogTest extends JUnitSuite { @Test def testLoadEmptyLog() { createEmptyLogs(logDir, 0) - val log = new Log(logDir, logConfig, recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, logConfig, recoveryPoint = 0L, time.scheduler, time = time, + brokerState = brokerState) log.append(TestUtils.singleMessageSet("test".getBytes)) } @@ -188,7 +194,8 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer) // We use need to use magic value 1 here because the test is message size sensitive. logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString) - val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time, + brokerState = brokerState) val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray for(i <- 0 until messages.length) @@ -209,7 +216,8 @@ class LogTest extends JUnitSuite { def testAppendAndReadWithNonSequentialOffsets() { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time, + brokerState = brokerState) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val messages = messageIds.map(id => new Message(id.toString.getBytes)) @@ -234,7 +242,8 @@ class LogTest extends JUnitSuite { def testReadAtLogGap() { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time, + brokerState = brokerState) // keep appending until we have two segments with only a single message in the second segment while(log.numberOfSegments == 1) @@ -317,7 +326,8 @@ class LogTest extends JUnitSuite { // set up replica log starting with offset 1024 and with one message (at offset 1024) logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time, + brokerState = brokerState) log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes))) assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).messageSet.sizeInBytes) @@ -348,7 +358,8 @@ class LogTest extends JUnitSuite { /* create a multipart log with 100 messages */ val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time, + brokerState = brokerState) val numMessages = 100 val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes)) messageSets.foreach(log.append(_)) @@ -380,7 +391,8 @@ class LogTest extends JUnitSuite { /* this log should roll after every messageset */ val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time, + brokerState = brokerState) /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) @@ -406,7 +418,8 @@ class LogTest extends JUnitSuite { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time, + brokerState = brokerState) for(i <- 0 until messagesToAppend) log.append(TestUtils.singleMessageSet(payload = i.toString.getBytes, timestamp = time.milliseconds - 10)) @@ -442,7 +455,8 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer) // We use need to use magic value 1 here because the test is message size sensitive. logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString) - val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time, + brokerState = brokerState) try { log.append(messageSet) @@ -469,7 +483,8 @@ class LogTest extends JUnitSuite { val logProps = new Properties() logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) - val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time, + brokerState = brokerState) try { log.append(messageSetWithUnkeyedMessage) @@ -509,7 +524,8 @@ class LogTest extends JUnitSuite { val maxMessageSize = second.sizeInBytes - 1 val logProps = new Properties() logProps.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time, + brokerState = brokerState) // should be able to append the small message log.append(first) @@ -535,7 +551,8 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.IndexIntervalBytesProp, indexInterval: java.lang.Integer) logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer) val config = LogConfig(logProps) - var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) + var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time, + brokerState = brokerState) for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(messageSize), timestamp = time.milliseconds + i * 10)) @@ -561,12 +578,14 @@ class LogTest extends JUnitSuite { assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries) } - log = new Log(logDir, config, recoveryPoint = lastOffset, time.scheduler, time) + log = new Log(logDir, config, recoveryPoint = lastOffset, time.scheduler, time, + brokerState = brokerState) verifyRecoveredLog(log) log.close() // test recovery case - log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) + log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time, + brokerState = brokerState) verifyRecoveredLog(log) log.close() } @@ -582,7 +601,8 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) val config = LogConfig(logProps) - val log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) + val log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time, + brokerState = brokerState) val messages = (0 until numMessages).map { i => new ByteBufferMessageSet(NoCompressionCodec, new LongRef(100 + i), new Message(i.toString.getBytes(), time.milliseconds + i, Message.MagicValue_V1)) @@ -606,7 +626,8 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) val config = LogConfig(logProps) - var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) + var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time, + brokerState = brokerState) for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10)) val indexFiles = log.logSegments.map(_.index.file) @@ -618,7 +639,8 @@ class LogTest extends JUnitSuite { timeIndexFiles.foreach(_.delete()) // reopen the log - log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) + log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time, + brokerState = brokerState) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0) assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0) @@ -645,7 +667,8 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.MessageFormatVersionProp, "0.9.0") val config = LogConfig(logProps) - var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) + var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time, + brokerState = brokerState) for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10)) val timeIndexFiles = log.logSegments.map(_.timeIndex.file) @@ -655,7 +678,8 @@ class LogTest extends JUnitSuite { timeIndexFiles.foreach(_.delete()) // The rebuilt time index should be empty - log = new Log(logDir, config, recoveryPoint = numMessages + 1, time.scheduler, time) + log = new Log(logDir, config, recoveryPoint = numMessages + 1, time.scheduler, time, + brokerState = brokerState) val segArray = log.logSegments.toArray for (i <- 0 until segArray.size - 1) assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries) @@ -674,7 +698,8 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) val config = LogConfig(logProps) - var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) + var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time, + brokerState = brokerState) for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10)) val indexFiles = log.logSegments.map(_.index.file) @@ -696,7 +721,8 @@ class LogTest extends JUnitSuite { } // reopen the log - log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time) + log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time, + brokerState = brokerState) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) for(i <- 0 until numMessages) { assertEquals(i, log.read(i, 100, None).messageSet.head.offset) @@ -722,7 +748,8 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) // create a log - val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time, + brokerState = brokerState) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (_ <- 1 to msgPerSeg) @@ -777,7 +804,8 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) logProps.put(LogConfig.IndexIntervalBytesProp, (setSize - 1): java.lang.Integer) val config = LogConfig(logProps) - val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time, + brokerState = brokerState) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (i<- 1 to msgPerSeg) @@ -823,7 +851,8 @@ class LogTest extends JUnitSuite { LogConfig(logProps), recoveryPoint = 0L, time.scheduler, - time) + time, + brokerState = brokerState) assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0) assertTrue("The first time index file should have been replaced with a larger file", bogusTimeIndex1.length > 0) @@ -854,7 +883,8 @@ class LogTest extends JUnitSuite { config, recoveryPoint = 0L, time.scheduler, - time) + time, + brokerState = brokerState) // add enough messages to roll over several segments then close and re-open and attempt to truncate for (_ <- 0 until 100) @@ -864,7 +894,8 @@ class LogTest extends JUnitSuite { config, recoveryPoint = 0L, time.scheduler, - time) + time, + brokerState = brokerState) log.truncateTo(3) assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments) assertEquals("Log end offset should be 3.", 3, log.logEndOffset) @@ -889,7 +920,8 @@ class LogTest extends JUnitSuite { config, recoveryPoint = 0L, time.scheduler, - time) + time, + brokerState = brokerState) // append some messages to create some segments for (_ <- 0 until 100) @@ -931,7 +963,8 @@ class LogTest extends JUnitSuite { config, recoveryPoint = 0L, time.scheduler, - time) + time, + brokerState = brokerState) // append some messages to create some segments for (_ <- 0 until 100) @@ -946,7 +979,8 @@ class LogTest extends JUnitSuite { config, recoveryPoint = 0L, time.scheduler, - time) + time, + brokerState = brokerState) assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) } @@ -956,7 +990,8 @@ class LogTest extends JUnitSuite { LogConfig(), recoveryPoint = 0L, time.scheduler, - time) + time, + brokerState = brokerState) log.append(new ByteBufferMessageSet(new Message(bytes = null))) val messageSet = log.read(0, 4096, None).messageSet assertEquals(0, messageSet.head.offset) @@ -969,7 +1004,8 @@ class LogTest extends JUnitSuite { LogConfig(), recoveryPoint = 0L, time.scheduler, - time) + time, + brokerState = brokerState) val messages = (0 until 2).map(id => new Message(id.toString.getBytes)).toArray messages.foreach(message => log.append(new ByteBufferMessageSet(message))) val invalidMessage = new ByteBufferMessageSet(new Message(1.toString.getBytes)) @@ -993,7 +1029,8 @@ class LogTest extends JUnitSuite { config, recoveryPoint = 0L, time.scheduler, - time) + time, + brokerState = brokerState) val numMessages = 50 + TestUtils.random.nextInt(50) for (_ <- 0 until numMessages) log.append(set) @@ -1005,7 +1042,7 @@ class LogTest extends JUnitSuite { TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1) // attempt recovery - log = new Log(logDir, config, recoveryPoint, time.scheduler, time) + log = new Log(logDir, config, recoveryPoint, time.scheduler, time, brokerState = brokerState) assertEquals(numMessages, log.logEndOffset) assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList)) Utils.delete(logDir) @@ -1023,24 +1060,24 @@ class LogTest extends JUnitSuite { val set = TestUtils.singleMessageSet("test".getBytes) val parentLogDir = logDir.getParentFile assertTrue("Data directory %s must exist", parentLogDir.isDirectory) - val cleanShutdownFile = new File(parentLogDir, Log.CleanShutdownFile) + // Create a clean shutdown file + val cleanShutdownFile = new File(parentLogDir, ".kafka_cleanshutdown") cleanShutdownFile.createNewFile() - assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists()) var recoveryPoint = 0L // create a log and write some messages to it var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, - time) - for (_ <- 0 until 100) + time, + brokerState = brokerState) + for(i <- 0 until 100) log.append(set) log.close() - // check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the - // clean shutdown file exists. + // check if recovery was attempted. Recovery should be attempted regardless of clean shutdown file recoveryPoint = log.logEndOffset - log = new Log(logDir, config, 0L, time.scheduler, time) + log = new Log(logDir, config, 0L, time.scheduler, time, brokerState = brokerState) assertEquals(recoveryPoint, log.logEndOffset) cleanShutdownFile.delete() } @@ -1131,8 +1168,8 @@ class LogTest extends JUnitSuite { config, recoveryPoint = 0L, time.scheduler, - time) - + time, + brokerState = brokerState) // append some messages to create some segments for (_ <- 0 until 100) log.append(set) @@ -1250,7 +1287,8 @@ class LogTest extends JUnitSuite { config, recoveryPoint = 0L, time.scheduler, - time) + time, + brokerState = brokerState) log } } From 245814cfbf36b19fdeab7900de0e56386a62f6f6 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 1 Nov 2016 08:34:10 -0700 Subject: [PATCH 2/2] More fixes --- .../unit/kafka/log/LogCleanerLagIntegrationTest.scala | 5 ++++- .../test/scala/unit/kafka/log/LogCleanerManagerTest.scala | 8 ++++++-- core/src/test/scala/unit/kafka/log/LogTest.scala | 6 ++++-- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala index 36c61d6c3c207..e4e63c5231d56 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala @@ -22,6 +22,7 @@ import java.util.Properties import kafka.common.TopicAndPartition import kafka.message._ +import kafka.server.BrokerState import kafka.utils._ import org.apache.kafka.common.record.CompressionType import org.apache.kafka.common.utils.Utils @@ -155,11 +156,13 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) logProps.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float) + val brokerState = new BrokerState() val log = new Log(dir = dir, LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, - time = time) + time = time, + brokerState = brokerState) logs.put(TopicAndPartition("log", i), log) } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 8c6775d933c33..147c892ad8607 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -22,6 +22,7 @@ import java.util.Properties import kafka.common._ import kafka.message._ +import kafka.server.BrokerState import kafka.utils._ import org.apache.kafka.common.utils.Utils import org.junit.Assert._ @@ -191,8 +192,11 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { log } - private def makeLog(dir: File = logDir, config: LogConfig = logConfig) = - new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + private def makeLog(dir: File = logDir, config: LogConfig = logConfig) = { + val brokerState = new BrokerState() + new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time, + brokerState = brokerState) + } private def message(key: Int, value: Int, timestamp: Long) = new ByteBufferMessageSet(new Message(key = key.toString.getBytes, diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 6f55383b14031..5c955dbc10967 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -259,7 +259,8 @@ class LogTest extends JUnitSuite { def testReadWithMinMessage() { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time, + brokerState = brokerState) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val messages = messageIds.map(id => new Message(id.toString.getBytes)) @@ -289,7 +290,8 @@ class LogTest extends JUnitSuite { def testReadWithTooSmallMaxLength() { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time, + brokerState = brokerState) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val messages = messageIds.map(id => new Message(id.toString.getBytes))