Skip to content

Commit

Permalink
KAFKA-3933; Always fully read deepIterator
Browse files Browse the repository at this point in the history
Avoids leaking native memory and hence crashing brokers on bootup due to
running out of memory.

Seeeing as `messageFormat > 0` always reads the full compressed message
set and is the default going forwards, we can use that behaviour to
always close the compressor when calling `deepIterator`

Author: Tom Crayford <tcrayford@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1660 from tcrayford/dont_leak_native_memory_round_2

(cherry picked from commit 8a417c8)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>
  • Loading branch information
tcrayford authored and ijuma committed Jul 26, 2016
1 parent 6b1a6d9 commit c47c3b0
Showing 1 changed file with 34 additions and 35 deletions.
69 changes: 34 additions & 35 deletions core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package kafka.message

import kafka.utils.{IteratorTemplate, Logging}
import kafka.utils.{CoreUtils, IteratorTemplate, Logging}
import kafka.common.{KafkaException, LongRef}
import java.nio.ByteBuffer
import java.nio.channels._
Expand Down Expand Up @@ -85,36 +85,45 @@ object ByteBufferMessageSet {
new IteratorTemplate[MessageAndOffset] {

val MessageAndOffset(wrapperMessage, wrapperMessageOffset) = wrapperMessageAndOffset

if (wrapperMessage.payload == null)
throw new KafkaException(s"Message payload is null: $wrapperMessage")

val wrapperMessageTimestampOpt: Option[Long] =
if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestamp) else None
val wrapperMessageTimestampTypeOpt: Option[TimestampType] =
if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestampType) else None
if (wrapperMessage.payload == null)
throw new KafkaException(s"Message payload is null: $wrapperMessage")
val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
val compressed = try {
new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic, inputStream))
} catch {
case ioe: IOException =>
throw new InvalidMessageException(s"Failed to instantiate input stream compressed with ${wrapperMessage.compressionCodec}", ioe)
}

var lastInnerOffset = -1L

val messageAndOffsets = if (wrapperMessageAndOffset.message.magic > MagicValue_V0) {
val messageAndOffsets = {
val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
val compressed = try {
new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic, inputStream))
} catch {
case ioe: IOException =>
throw new InvalidMessageException(s"Failed to instantiate input stream compressed with ${wrapperMessage.compressionCodec}", ioe)
}

val innerMessageAndOffsets = new ArrayDeque[MessageAndOffset]()
try {
while (true)
innerMessageAndOffsets.add(readMessageFromStream())
innerMessageAndOffsets.add(readMessageFromStream(compressed))
} catch {
case eofe: EOFException =>
compressed.close()
// we don't do anything at all here, because the finally
// will close the compressed input stream, and we simply
// want to return the innerMessageAndOffsets
case ioe: IOException =>
throw new InvalidMessageException(s"Error while reading message from stream compressed with ${wrapperMessage.compressionCodec}", ioe)
} finally {
CoreUtils.swallow(compressed.close())
}
Some(innerMessageAndOffsets)
} else None

private def readMessageFromStream(): MessageAndOffset = {
innerMessageAndOffsets
}

private def readMessageFromStream(compressed: DataInputStream): MessageAndOffset = {
val innerOffset = compressed.readLong()
val recordSize = compressed.readInt()

Expand All @@ -138,25 +147,15 @@ object ByteBufferMessageSet {
}

override def makeNext(): MessageAndOffset = {
messageAndOffsets match {
// Using inner offset and timestamps
case Some(innerMessageAndOffsets) =>
innerMessageAndOffsets.pollFirst() match {
case null => allDone()
case MessageAndOffset(message, offset) =>
val relativeOffset = offset - lastInnerOffset
val absoluteOffset = wrapperMessageOffset + relativeOffset
new MessageAndOffset(message, absoluteOffset)
}
// Not using inner offset and timestamps
case None =>
try readMessageFromStream()
catch {
case eofe: EOFException =>
compressed.close()
allDone()
case ioe: IOException =>
throw new KafkaException(ioe)
messageAndOffsets.pollFirst() match {
case null => allDone()
case nextMessage@ MessageAndOffset(message, offset) =>
if (wrapperMessage.magic > MagicValue_V0) {
val relativeOffset = offset - lastInnerOffset
val absoluteOffset = wrapperMessageOffset + relativeOffset
new MessageAndOffset(message, absoluteOffset)
} else {
nextMessage
}
}
}
Expand Down

0 comments on commit c47c3b0

Please sign in to comment.