Skip to content

Commit

Permalink
ZK consumer gets into infinite loop if a message is larger than fetch…
Browse files Browse the repository at this point in the history
… size; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-160

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1186570 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Jun Rao committed Oct 19, 2011
1 parent a0b2aa8 commit 81f8d0d
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 3 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
Expand Up @@ -76,7 +76,8 @@ private[consumer] class PartitionTopicInfo(val topic: String,
* add an empty message with the exception to the queue so that client can see the error
*/
def enqueueError(e: Throwable, fetchOffset: Long) = {
val messages = new ByteBufferMessageSet(ErrorMapping.EmptyByteBuffer, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
val messages = new ByteBufferMessageSet(buffer = ErrorMapping.EmptyByteBuffer, initialOffset = 0,
errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
}

Expand Down
Expand Up @@ -99,7 +99,7 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
logger.trace("size of data = " + size)
}
if(size < 0 || topIter.remaining < size) {
if (currValidBytes == 0 || size < 0)
if (currValidBytes == initialOffset || size < 0)
throw new InvalidMessageSizeException("invalid message size: " + size + " only received bytes: " +
topIter.remaining + " at " + currValidBytes + "( possible causes (1) a single message larger than " +
"the fetch size; (2) log corruption )")
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/ConsumerShell.scala
Expand Up @@ -98,7 +98,7 @@ class ZKConsumerThread(stream: KafkaMessageStream[String]) extends Thread {
}
}catch {
case e:ConsumerTimeoutException => // this is ok
case oe: Exception => logger.error(oe)
case oe: Exception => logger.error("error in ZKConsumerThread", oe)
}
shutdownLatch.countDown
println("Received " + count + " messages")
Expand Down
Expand Up @@ -21,12 +21,31 @@ import java.nio._
import junit.framework.Assert._
import org.junit.Test
import kafka.utils.TestUtils
import kafka.common.InvalidMessageSizeException

class ByteBufferMessageSetTest extends BaseMessageSetTestCases {

override def createMessageSet(messages: Seq[Message]): ByteBufferMessageSet =
new ByteBufferMessageSet(NoCompressionCodec, messages: _*)

@Test
def testSmallFetchSize() {
// create a ByteBufferMessageSet that doesn't contain a full message
// iterating it should get an InvalidMessageSizeException
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("01234567890123456789".getBytes()))
val buffer = messages.serialized.slice
buffer.limit(10)
val messageSetWithNoFullMessage = new ByteBufferMessageSet(buffer = buffer, initialOffset = 1000)
try {
for (message <- messageSetWithNoFullMessage)
fail("shouldn't see any message")
}
catch {
case e: InvalidMessageSizeException => //this is expected
case e2 => fail("shouldn't see any other exceptions")
}
}

@Test
def testValidBytes() {
{
Expand Down

0 comments on commit 81f8d0d

Please sign in to comment.