From f2e99a874b30bb3b26f15348a9ac5f55dd7d700e Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Fri, 2 Sep 2016 11:43:15 +0200 Subject: [PATCH 1/2] Applying https://github.com/apache/kafka/pull/1332 KAFKA-3587: LogCleaner fails due to incorrect offset map computation --- .../src/main/scala/kafka/log/LogCleaner.scala | 27 ++++++++++----- .../scala/unit/kafka/log/CleanerTest.scala | 33 +++++++++++++++++++ 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index d5c247cab95c3..1fd2e3f893105 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -575,17 +575,19 @@ private[log] class Cleaner(val id: Int, // but we may be able to fit more (if there is lots of duplication in the dirty section of the log) var offset = dirty.head.baseOffset require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name)) - val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt var full = false for (segment <- dirty if !full) { checkDone(log.topicAndPartition) - val segmentSize = segment.nextOffset() - segment.baseOffset - require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s but offset map can fit only %d. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(segmentSize, log.name, segment.log.file.getName, maxDesiredMapSize)) - if (map.size + segmentSize <= maxDesiredMapSize) - offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map) - else + val newOffset = buildOffsetMapForSegment(log.topicAndPartition, segment, map) + if (newOffset > -1L) + offset = newOffset + else { + // If not even one segment can fit in the map, compaction cannot happen + require(offset > start, "Unable to build the offset map for segment %s/%s. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(log.name, segment.log.file.getName)) + debug("Offset map is full, %d segments fully mapped, segment with base offset %d is partially mapped".format(dirty.indexOf(segment), segment.baseOffset)) full = true + } } info("Offset map for log %s complete.".format(log.name)) offset @@ -597,11 +599,12 @@ private[log] class Cleaner(val id: Int, * @param segment The segment to index * @param map The map in which to store the key=>offset mapping * - * @return The final offset covered by the map + * @return The final offset covered by the map or -1 if the map is full */ private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap): Long = { var position = 0 var offset = segment.baseOffset + val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt while (position < segment.log.sizeInBytes) { checkDone(topicAndPartition) readBuffer.clear() @@ -610,8 +613,14 @@ private[log] class Cleaner(val id: Int, val startPosition = position for (entry <- messages) { val message = entry.message - if (message.hasKey) - map.put(message.key, entry.offset) + if (message.hasKey) { + if (map.size < maxDesiredMapSize) + map.put(message.key, entry.offset) + else { + // The map is full, stop looping and return + return -1L + } + } offset = entry.offset stats.indexMessagesRead(1) } diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 8ab9f91e82dba..9b1fab93c7462 100755 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -422,6 +422,39 @@ class CleanerTest extends JUnitSuite { recoverAndCheck(config, cleanedKeys) } + + @Test + def testBuildOffsetMapFakeLarge() { + val map = new FakeOffsetMap(1000) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer) + logProps.put(LogConfig.SegmentIndexBytesProp, 72: java.lang.Integer) + logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) + val logConfig = LogConfig(logProps) + val log = makeLog(config = logConfig) + val cleaner = makeCleaner(Int.MaxValue) + val start = 0 + val end = 2 + val offsetSeq = Seq(0L, 7206178L) + val offsets = writeToLog(log, (start until end) zip (start until end), offsetSeq) + val endOffset = cleaner.buildOffsetMap(log, start, end, map) + assertEquals("Last offset should be the end offset.", 7206178L, endOffset) + assertEquals("Should have the expected number of messages in the map.", end - start, map.size) + assertEquals("Map should contain first value", 0L, map.get(key(0))) + assertEquals("Map should contain second value", 7206178L, map.get(key(1))) + } + + private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = { + for(((key, value), offset) <- keysAndValues.zip(offsetSeq)) + yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).firstOffset + } + + private def messageWithOffset(key: Int, value: Int, offset: Long) = + new ByteBufferMessageSet(NoCompressionCodec, Seq(offset), + new Message(key = key.toString.getBytes, + bytes = value.toString.getBytes, + timestamp = Message.NoTimestamp, + magicValue = Message.MagicValue_V1)) def makeLog(dir: File = dir, config: LogConfig = logConfig) = From c7453a0a4a782a5c2194fb6d9d18f76034d2490e Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Fri, 2 Sep 2016 13:47:33 +0200 Subject: [PATCH 2/2] fix test --- .../test/scala/unit/kafka/log/CleanerTest.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 9b1fab93c7462..6d8a7bad2277f 100755 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -436,7 +436,7 @@ class CleanerTest extends JUnitSuite { val start = 0 val end = 2 val offsetSeq = Seq(0L, 7206178L) - val offsets = writeToLog(log, (start until end) zip (start until end), offsetSeq) + writeToLog(log, (start until end) zip (start until end), offsetSeq) val endOffset = cleaner.buildOffsetMap(log, start, end, map) assertEquals("Last offset should be the end offset.", 7206178L, endOffset) assertEquals("Should have the expected number of messages in the map.", end - start, map.size) @@ -449,14 +449,6 @@ class CleanerTest extends JUnitSuite { yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).firstOffset } - private def messageWithOffset(key: Int, value: Int, offset: Long) = - new ByteBufferMessageSet(NoCompressionCodec, Seq(offset), - new Message(key = key.toString.getBytes, - bytes = value.toString.getBytes, - timestamp = Message.NoTimestamp, - magicValue = Message.MagicValue_V1)) - - def makeLog(dir: File = dir, config: LogConfig = logConfig) = new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) @@ -482,6 +474,10 @@ class CleanerTest extends JUnitSuite { def message(key: Int, value: Int) = new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=value.toString.getBytes)) + def messageWithOffset(key: Int, value: Int, offset: Long) = + new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(offset), + new Message(key=key.toString.getBytes, bytes=value.toString.getBytes)) + def unkeyedMessage(value: Int) = new ByteBufferMessageSet(new Message(bytes=value.toString.getBytes)) @@ -511,4 +507,4 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap { def size: Int = map.size -} \ No newline at end of file +}