New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-3933: close deepIterator during log recovery #1614
Conversation
Avoids leaking native memory and hence crashing brokers on bootup due to running out of memory. Introduces `kafka.common.ClosableIterator`, which is an iterator that can be closed, and changes the signature of `ByteBufferMessageSet.deepIterator` to return it, then changes the callers to always close the iterator.
@ijuma want to take a look now? I found there's a leak where we uncompress messages during handling of |
@junrao mind reviewing this? Thanks a lot! |
Thanks @tcrayford. I will take a deeper look tomorrow if Jun doesn't beat me to it. One question: have you given some thought on how we could test these changes? |
@ijuma yeah, we have a system that aggrevates these memory leaks when pointed at a Kafka cluster. I can release the new code to it and then watch for leaks like I did previously. |
@ijuma or were you talking about adding tests inside the project itself? Unfortunately it seems impossible to tell if a GZIPInputStream is closed. |
Yes, I meant within the project itself. One way to check if GZipInputStream is closed is to call |
@ijuma the compressor isn't exposed by the iterator. I can expose that, but it feels like the classic "should we/shouldn't we" thing about private members. Seeing as I'm new to the codebase, I'd love your take on that. |
I spent some time looking at this in more detail and after changing a number of additional places to use val messageAndOffsets = if (wrapperMessageAndOffset.message.magic > MagicValue_V0) {
val innerMessageAndOffsets = new ArrayDeque[MessageAndOffset]()
try {
while (true)
innerMessageAndOffsets.add(readMessageFromStream())
} catch {
case eofe: EOFException =>
compressed.close()
case ioe: IOException =>
throw new InvalidMessageException(s"Error while reading message from stream compressed with ${wrapperMessage.compressionCodec}", ioe)
}
Some(innerMessageAndOffsets)
} else None So, we are reading the whole compressed message set during construction if message format > 0. I suggest we do the same for message format = 0, and then we don't need to change any other code. Thoughts @tcrayford? |
Nice spot! I was originally concerned about the resource utilization of something like this, but it seems much better than this PR in that no future logic needs to worry about it. Furthermore, seeing as most messages will be format > 0 in the future we don't save much anyway. I can put another PR in for this today. |
Yes, indeed. If we weren't doing it for message format > 0 (which will be the common case going forward, as you said), then it would make sense to try and avoid it. |
Avoids leaking native memory and hence crashing brokers on bootup due to
running out of memory.
Introduces
kafka.common.ClosableIterator
, which is an iterator thatcan be closed, and changes the signature of
ByteBufferMessageSet.deepIterator
to return it, then changes thecallers to always close the iterator.
This is a followup from #1598 with more native memory leaks in the broker code found and fixed.