KAFKA-19012 Fix rare producer message corruption, don't reuse buffers… #21286
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
… on the client in certain error cases (#21065)
Client versions 2.8.0 and later are affected by a
change
that exposes a latent bug in how BufferPool is used (BufferPool is a
class used on the client side to allocate memory in ByteBuffers, for
performance it will reuse them with the caller of the class doing manual
memory management by calling free when they are done with the memory).
The bug is that a pooled ByteBuffer can be freed while it is still in
use by the network sending thread - this early freeing can happen when
batches expire / brokers are disconnecting from clients. This bug has
existed for more than a decade (since Kafka 0.x it seems), but never
manifested because prior to 2.8.0 the pooled ByteBuffer (which contained
record data aka your publishes) was copied into a freshly allocated
ByteBuffer before any potential reuse and that fresh ByteBuffer was what
got written over the network to the broker. With a change included in
2.8.0, the pooled ByteBuffer remains as-is inside of a MemoryRecords
instance and this pooled ByteBuffer (which in some cases can be reused
and overwritten with other data) is written over the network. Two
contributing factors are that the checksum for Kafka records only
includes the key/value/headers/etc and not the topic so there is no
protection there, and also an implementation detail is that, also newly
in the commit that exposed the bug, the produce request header (which
includes the topic and partition of a group of message batches) is
serialized in a buffer separately from the messages themselves (and the
latter is what gets put in the pooled ByteBuffer) which allows you to
get messages misrouted to a random recently used topic as opposed to
simple duplicate messages on their intended topic.
The key change is in Sender.sendProducerData, we cannot allow the pooled
ByteBuffer to be reused for expired in-flight batches until the request
completes. For these batches we avoid deallocating the buffer in the
normal failBatch call, deferring it until we call completeBatch (or a
different path of failBatch).
There are some automated tests to cover this, and also manual testing
done to reproduce the issue from KAFKA-19012 and verify that this is
sufficient to stop it.
Reviewers: Justine Olshan jolshan@confluent.io, Jun Rao
junrao@gmail.com, Chia-Ping Tsai chia7712@gmail.com