From 3ca0e390aabd31dd178991da77373c44e05a4044 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 21 Mar 2016 21:44:45 -0700 Subject: [PATCH 1/3] KAFKA-3442: Fix FileMessageSet iterator. --- core/src/main/scala/kafka/log/FileMessageSet.scala | 11 ++++++----- .../scala/unit/kafka/log/FileMessageSetTest.scala | 6 ++++-- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index 45b3df9970e13..f9c11c0cd3553 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -245,10 +245,11 @@ class FileMessageSet private[kafka](@volatile var file: File, def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = { new IteratorTemplate[MessageAndOffset] { var location = start - val sizeOffsetBuffer = ByteBuffer.allocate(12) + val SizeOffsetLength = 12 + val sizeOffsetBuffer = ByteBuffer.allocate(SizeOffsetLength) override def makeNext(): MessageAndOffset = { - if(location >= end) + if(location + SizeOffsetLength >= end) return allDone() // read the size of the item @@ -260,20 +261,20 @@ class FileMessageSet private[kafka](@volatile var file: File, sizeOffsetBuffer.rewind() val offset = sizeOffsetBuffer.getLong() val size = sizeOffsetBuffer.getInt() - if(size < Message.MinMessageOverhead) + if(size < Message.MinMessageOverhead || location + SizeOffsetLength + size > end) return allDone() if(size > maxMessageSize) throw new CorruptRecordException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize)) // read the item itself val buffer = ByteBuffer.allocate(size) - channel.read(buffer, location + 12) + channel.read(buffer, location + SizeOffsetLength) if(buffer.hasRemaining) return allDone() buffer.rewind() // increment the location and return the item - location += size + 12 + location += size + SizeOffsetLength new MessageAndOffset(new Message(buffer), offset) } } diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala index a3e5b2d4f01e0..1e6f57ae3b9a5 100644 --- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala @@ -133,11 +133,13 @@ class FileMessageSetTest extends BaseMessageSetTestCases { def testIteratorWithLimits() { val message = messageSet.toList(1) val start = messageSet.searchFor(1, 0).position - val size = message.message.size + val size = message.message.size + 12 val slice = messageSet.read(start, size) assertEquals(List(message), slice.toList) + val slice2 = messageSet.read(start, size - 1) + assertEquals(List(), slice2.toList) } - + /** * Test the truncateTo method lops off messages and appropriately updates the size */ From 48fd9afb7ecd0ae16f55e7de68f33a7c8cdc62c1 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Tue, 22 Mar 2016 16:44:26 -0700 Subject: [PATCH 2/3] Addressed Jun's comments --- .../main/scala/kafka/log/FileMessageSet.scala | 30 ++++++++++++------- .../unit/kafka/log/FileMessageSetTest.scala | 11 +++++++ 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index f9c11c0cd3553..bdbbf3d6c2260 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -224,11 +224,19 @@ class FileMessageSet private[kafka](@volatile var file: File, } } - // We use the offset seq to assign offsets so the offset of the messages does not change. - new ByteBufferMessageSet( - compressionCodec = this.headOption.map(_.message.compressionCodec).getOrElse(NoCompressionCodec), - offsetSeq = offsets, - newMessages: _*) + if (_size.get() > 0 && newMessages.size == 0) { + // This indicates that the message is too large. We just return all the bytes in the file message set. + val buffer = ByteBuffer.allocate(_size.get()) + channel.read(buffer, start) + buffer.rewind() + new ByteBufferMessageSet(buffer) + } else { + // We use the offset seq to assign offsets so the offset of the messages does not change. + new ByteBufferMessageSet( + compressionCodec = this.headOption.map(_.message.compressionCodec).getOrElse(NoCompressionCodec), + offsetSeq = offsets, + newMessages: _*) + } } /** @@ -245,11 +253,11 @@ class FileMessageSet private[kafka](@volatile var file: File, def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = { new IteratorTemplate[MessageAndOffset] { var location = start - val SizeOffsetLength = 12 - val sizeOffsetBuffer = ByteBuffer.allocate(SizeOffsetLength) + val sizeOffsetLength = 12 + val sizeOffsetBuffer = ByteBuffer.allocate(sizeOffsetLength) override def makeNext(): MessageAndOffset = { - if(location + SizeOffsetLength >= end) + if(location + sizeOffsetLength >= end) return allDone() // read the size of the item @@ -261,20 +269,20 @@ class FileMessageSet private[kafka](@volatile var file: File, sizeOffsetBuffer.rewind() val offset = sizeOffsetBuffer.getLong() val size = sizeOffsetBuffer.getInt() - if(size < Message.MinMessageOverhead || location + SizeOffsetLength + size > end) + if(size < Message.MinMessageOverhead || location + sizeOffsetLength + size > end) return allDone() if(size > maxMessageSize) throw new CorruptRecordException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize)) // read the item itself val buffer = ByteBuffer.allocate(size) - channel.read(buffer, location + SizeOffsetLength) + channel.read(buffer, location + sizeOffsetLength) if(buffer.hasRemaining) return allDone() buffer.rewind() // increment the location and return the item - location += size + SizeOffsetLength + location += size + sizeOffsetLength new MessageAndOffset(new Message(buffer), offset) } } diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala index 1e6f57ae3b9a5..534443ce3203e 100644 --- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala @@ -204,6 +204,17 @@ class FileMessageSetTest extends BaseMessageSetTestCases { assertEquals(oldposition, tempReopen.length) } + @Test + def testFormatConversionWithPartialMessage() { + val message = messageSet.toList(1) + val start = messageSet.searchFor(1, 0).position + val size = message.message.size + 12 + val slice = messageSet.read(start, size - 1) + val messageV0 = slice.toMessageFormat(Message.MagicValue_V0) + assertEquals("No message should be there", 0, messageV0.size) + assertEquals(s"There should be ${size - 1} bytes", size - 1, messageV0.sizeInBytes) + } + @Test def testMessageFormatConversion() { From 2325112906f20016f76cba6565d7d0cb92c3fad2 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Tue, 22 Mar 2016 19:52:31 -0700 Subject: [PATCH 3/3] Addressed Jun's comments. --- core/src/main/scala/kafka/log/FileMessageSet.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index bdbbf3d6c2260..a164b4b96730c 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -206,7 +206,7 @@ class FileMessageSet private[kafka](@volatile var file: File, /** * Convert this message set to use the specified message format. */ - def toMessageFormat(toMagicValue: Byte): ByteBufferMessageSet = { + def toMessageFormat(toMagicValue: Byte): MessageSet = { val offsets = new ArrayBuffer[Long] val newMessages = new ArrayBuffer[Message] this.foreach { messageAndOffset => @@ -224,12 +224,9 @@ class FileMessageSet private[kafka](@volatile var file: File, } } - if (_size.get() > 0 && newMessages.size == 0) { + if (sizeInBytes > 0 && newMessages.size == 0) { // This indicates that the message is too large. We just return all the bytes in the file message set. - val buffer = ByteBuffer.allocate(_size.get()) - channel.read(buffer, start) - buffer.rewind() - new ByteBufferMessageSet(buffer) + this } else { // We use the offset seq to assign offsets so the offset of the messages does not change. new ByteBufferMessageSet(