From 81f8d0dd83a25e075cc6a08c6105b01459980c5f Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Wed, 19 Oct 2011 23:52:58 +0000 Subject: [PATCH] ZK consumer gets into infinite loop if a message is larger than fetch size; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-160 git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1186570 13f79535-47bb-0310-9956-ffa450edef68 --- .../kafka/consumer/PartitionTopicInfo.scala | 3 ++- .../kafka/message/ByteBufferMessageSet.scala | 2 +- .../scala/kafka/tools/ConsumerShell.scala | 2 +- .../message/ByteBufferMessageSetTest.scala | 19 +++++++++++++++++++ 4 files changed, 23 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala index 8e1102b54742..c839a079030f 100644 --- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala +++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala @@ -76,7 +76,8 @@ private[consumer] class PartitionTopicInfo(val topic: String, * add an empty message with the exception to the queue so that client can see the error */ def enqueueError(e: Throwable, fetchOffset: Long) = { - val messages = new ByteBufferMessageSet(ErrorMapping.EmptyByteBuffer, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + val messages = new ByteBufferMessageSet(buffer = ErrorMapping.EmptyByteBuffer, initialOffset = 0, + errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset)) } diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 59b2e695d30c..3ff7b2e6e305 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -99,7 +99,7 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer, logger.trace("size of data = " + size) } if(size < 0 || topIter.remaining < size) { - if (currValidBytes == 0 || size < 0) + if (currValidBytes == initialOffset || size < 0) throw new InvalidMessageSizeException("invalid message size: " + size + " only received bytes: " + topIter.remaining + " at " + currValidBytes + "( possible causes (1) a single message larger than " + "the fetch size; (2) log corruption )") diff --git a/core/src/main/scala/kafka/tools/ConsumerShell.scala b/core/src/main/scala/kafka/tools/ConsumerShell.scala index 17f93918221c..8f91ec97758b 100644 --- a/core/src/main/scala/kafka/tools/ConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/ConsumerShell.scala @@ -98,7 +98,7 @@ class ZKConsumerThread(stream: KafkaMessageStream[String]) extends Thread { } }catch { case e:ConsumerTimeoutException => // this is ok - case oe: Exception => logger.error(oe) + case oe: Exception => logger.error("error in ZKConsumerThread", oe) } shutdownLatch.countDown println("Received " + count + " messages") diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala index a888f37594ab..962a86dca47b 100644 --- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala @@ -21,12 +21,31 @@ import java.nio._ import junit.framework.Assert._ import org.junit.Test import kafka.utils.TestUtils +import kafka.common.InvalidMessageSizeException class ByteBufferMessageSetTest extends BaseMessageSetTestCases { override def createMessageSet(messages: Seq[Message]): ByteBufferMessageSet = new ByteBufferMessageSet(NoCompressionCodec, messages: _*) + @Test + def testSmallFetchSize() { + // create a ByteBufferMessageSet that doesn't contain a full message + // iterating it should get an InvalidMessageSizeException + val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("01234567890123456789".getBytes())) + val buffer = messages.serialized.slice + buffer.limit(10) + val messageSetWithNoFullMessage = new ByteBufferMessageSet(buffer = buffer, initialOffset = 1000) + try { + for (message <- messageSetWithNoFullMessage) + fail("shouldn't see any message") + } + catch { + case e: InvalidMessageSizeException => //this is expected + case e2 => fail("shouldn't see any other exceptions") + } + } + @Test def testValidBytes() { {