-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-3160: Fix LZ4 Framing #1212
Conversation
val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream)) | ||
val compressor = CompressionFactory(compressionCodec, outputStream) | ||
if (compressionCodec == LZ4CompressionCodec && magicAndTimestamp.magic == Message.MagicValue_V0) | ||
compressor.asInstanceOf[KafkaLZ4BlockOutputStream].useBrokenHC = true // Maintain compatibility with 0.8 / 0.9 clients |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it not be better to keep this logic in CompressionFactory
? We would then be able to reuse it in ByteBufferMessageSet
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about that, but it feels weird to add Message Magic Value to the CompressionFactory constructor. I don't think message version and compression internals should be linked generally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that you are mutating a variable of a compression codec in this class after an unsafe cast, things are already very much linked. I think it's much cleaner for CompressionFactory
to take a message format version and then instantiate the compressor appropriately (that seems like perfectly sensible behaviour for a factory).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree -- you're inviting other compression codecs to couple themselves with message format, and that is not the intent of this change.
Nonetheless, I'll change it since ultimately I am not the one that has to maintain this codebase :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for changing it.
We are not inviting that at all. :) We are simply making the coupling clear and centralised so the compiler can help us instead of keeping it hidden and duplicated. The previous approach was prone to bugs if we used CompressionFactory
in new classes and we didn't remember to cast and mutate the variable, for example.
Thanks for the PR @dpkp. I haven't looked in detail, but the idea is appealing. Let's see what committers think. In order to ensure that this works as expected, we would want to expand our system tests to cover LZ4 as well: https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/core/upgrade_test.py#L65 |
By the way, there's another wrinkle. This change probably means that a new ApiVersion (see the class documentation for details) called KAFKA_0_10_0_IV1 needs to be introduced for people who have adopted trunk in production after KAFKA_0_10_0_IV0 was introduced (and before this PR). However, it would make things a lot simpler (for clients and testing) if we didn't have to bump the version again (which would benefit people using trunk and LZ4). Let's see what @junrao thinks. |
6c6882c
to
b85cb95
Compare
bump for review. Re: ApiVersion -- I noted in KIP-57 that I think this would only apply to people who (1) run trunk, (2) upgraded to v1 message storage, (3) use LZ4 compression. I'm skeptical that there are any users that fall into all 3 categories. Thoughts? |
super(in); | ||
decompressor = LZ4Factory.fastestInstance().safeDecompressor(); | ||
checksum = XXHashFactory.fastestInstance().hash32(); | ||
readHeader(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please elaborate why we no longer read the header during construction? It seems to me that checkHC
could be a constructor parameter and then we could keep it as a private and final variable and less changes would be required. But maybe I am missing something. Note that public and mutable variables are generally avoided in Java.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both GZipInputStream
and SnappyInputStream
read the header in the constructor, so it would make sense to me to remain consistent in that respect.
private FLG flg; | ||
private BD bd; | ||
private int bufferOffset; | ||
private int bufferSize; | ||
private boolean finished; | ||
private boolean headerRead; | ||
|
||
public boolean checkHC; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make this private and add a method to change checkHC or just pass it in through the constructor?
Thanks for the patch. LGTM. Just a few minor comments. |
@@ -142,22 +168,19 @@ private void readBlock() throws IOException { | |||
|
|||
// verify checksum | |||
if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) { | |||
throw new IOException(BLOCK_HASH_MISMATCH); | |||
throw new LZ4Exception(BLOCK_HASH_MISMATCH); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we now throwing LZ4Exception
instead of IOException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I see that you are treating these exceptions differently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the approach used by Snappy is a bit better in that it throws a subclass of IOException so that existing IOException catch blocks can catch the exception while allowing more specific catch blocks to exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ultimate goal should be to merge all of this code into the upstream lz4 library (lz4-java). Once that happens you'll need to catch the LZ4Exception errors that it raises. Much of this code is written such that it could be merged upstream and we would not have to maintain a separate LZ4 implementation in the kafka codebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before I added the comment above, I checked what the existing net.jpountz.lz4.LZ4BlockInputStream
does and it has code like:
try {
final int compressedLen2 = decompressor.decompress(compressedBuffer, 0, buffer, 0, originalLen);
if (compressedLen != compressedLen2) {
throw new IOException("Stream is corrupted");
}
} catch (LZ4Exception e) {
throw new IOException("Stream is corrupted", e);
}
So, it doesn't seem clear to me that the upstream InputStream
and OutputStream
classes would be throwing LZ4Exception
which inherits from RuntimeException
instead of IOException
or a subclass (that would be unusual). Why do you think it would do that?
@dpkp, we are on quite a tight deadline to get this into 0.10.0.0. Please let us know if you need any help getting this PR into shape. I've sketched a few of the changes suggested in the review comments locally to test them out (including the changes necessary to checkHC in the client) and I can clean them up and submit as a PR to your branch (you would then review and integrate into your branch if you were happy with them). The merged commit would still credit you as the author. |
I'll make the changes, but generally I disagree w/ the style choices you're making. I think it should be remembered that underlying this is a broken lz4 implementation. I wrote the code intentionally to look nasty when breaking lz4. I think it is a mistake to make breaking lz4 seem easy and clean. It should be gross and broken, just like the lz4 encoding it's doing... But anyways, I'll make the changes :) |
We're going to have to live with the broken implementation for a long time. There's no reason to make the code ugly on purpose. No-one will intentionally use the broken implementation, it's just there for compatibility reasons. Thanks for making the changes. |
We also need to add a note to the |
6fe465e
to
a657cd4
Compare
I've made the requested changes to move mutable attributes to constructor, use IOException not LZ4Exception, drop explicit 'this' references, and update upgrade docs. I have not updated client code because I dont have a good vision for what that would look like. I mentioned in the KIP discuss email thread that the client plumbing around compression handling is a bit too much for me to grok. I am happy to take some pointers from others who understand java code better than I do. But also I am ok with enforcing strict semantics in the broker and letting clients be more forgiving since they (clients) have access to other bit-flip protection, like Message-level CRC. |
Thank you very much for the update @dpkp. I'll try to help with the client plumbing around compression. |
ok, thanks! |
Here's a PR that should do it: dpkp#1 |
- update spec to 1.5.1; remove dictID - fix frame descriptor HC check (dont include magic bytes) - dont require HC validation on input by default - add useBrokenHC to constructor for output compatibility - nominal support for contentChecksum / contentSize flags - throw LZ4Exception on validation errors when decompressing
@ijuma I merged/squashed your commits and removed the assert statement. Thanks for helping w/ client bits! |
* every block of data | ||
* @throws IOException | ||
*/ | ||
public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this? It seems unused. The constructor at line 119 also seems unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct -- kafka doesn't currently use them, but we don't know whether users have written code against these interfaces. I prefer to think of these classes as general implementations of the LZ4 spec, somewhat independent of kafka's specific use.
Started a system tests build: https://jenkins.confluent.io/job/system-test-kafka-branch-builder-2/64/ |
- implement v1.5.1 LZ4 framing specification - fix frame descriptor checksum - use Magic MessageVersion to maintain backwards compatibility - message version v0: return old broken fd checksum - message version v1: return correct fd checksum - return CorruptMessage error on ProduceRequest for v1 message with incorrect fd checksum (ProduceRequest Version >= 2)
re: system-tests build -- should I hold off on further changes or is it ok to push the |
You can update the PR. |
@dpkp, it would also be great if you could retest kafka-python against the updated PR (if you haven't already). |
I started a new system tests build that includes the latest update from @dpkp: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/435/ |
Thanks for the latest patch. LGTM. @ijuma : You can merge it once the system test passes. |
Thanks Jun, will do. |
I had to start another system tests run as 435 hung: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/437/ There were two failures:
Both are known to be transient. All the other replication test variants passed and there's a PR for the connect transient failures: #1340 So, merging to trunk and 0.10. |
This contribution is my original work and I license the work under Apache 2.0. Author: Dana Powers <dana.powers@gmail.com> Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk> Closes #1212 from dpkp/KAFKA-3160 (cherry picked from commit 8fe2552) Signed-off-by: Ismael Juma <ismael@juma.me.uk>
Thanks for your contribution! |
This contribution is my original work and I license the work under Apache 2.0. Author: Dana Powers <dana.powers@gmail.com> Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk> Closes apache#1212 from dpkp/KAFKA-3160
* Added support for SHARE type coordinator in FindCoordinator RPC. * Added support for creating new internal topic __share_group_state (WIP). * Basic impl of `ShareCoordinatorService` * Added configs related to share coord and share-group state topic.
This contribution is my original work and I license the work under Apache 2.0.