-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5456: Ensure producer handles old format large compressed messages #3356
Conversation
@@ -709,9 +709,8 @@ public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value) { | |||
} | |||
|
|||
// Be conservative and not take compression of the new record into consideration. | |||
return numRecords == 0 ? | |||
bufferStream.remaining() >= recordSize : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the intent in RecordAccumulator
is to ensure the producer can always write a message even if it exceeds the batch size, so I just removed this check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it should be at the top after isFull()
though since we don't need the other computations in that case.
I think it's reasonable to remove this check since we can't make sure it's always correct. We should probably add a comment to the following code making it clear that it's not really an upper bound.
int size = Math.max(this.batchSize, AbstractRecords.sizeInBytesUpperBound(maxUsableMagic, key, value, headers));
We could perhaps fix part of it by taking into account the outer record header for a compressed message batch with message format < V2. It may be worth doing this to avoid confusion.
However, that still doesn't take into account the compression header by Gzip (10 bytes), LZ4 (7 bytes) or Snappy (16 bytes). That's probably OK though as hopefully compression will decrease the size of the key and value so that the record fits. If it doesn't (which may well happen), we will re-allocate the underlying buffer, which is not the end of the world.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was because of the compression-specific headers that I didn't bother trying to account for the compressed message set overhead.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@@ -699,6 +699,10 @@ public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Head | |||
if (isFull()) | |||
return false; | |||
|
|||
// We always allow at least one record to be appended (the ByteBufferOutputStream will grow as needed) | |||
if (numRecords == 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you may need to update the method javadoc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, LGTM if the tests pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking this over from me. I spent a bit of time understanding the semantics of the old format and how the regression happened. It was quite educational!
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
More specifically, fix the case where a compressed V0 or V1 message is larger than the producer batch size. Author: Jason Gustafson <jason@confluent.io> Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk> Closes #3356 from hachikuji/KAFKA-5456 (cherry picked from commit f49697a) Signed-off-by: Ismael Juma <ismael@juma.me.uk>
No description provided.