-
Notifications
You must be signed in to change notification settings - Fork 1.7k
AVRO-2162 Adds Zstandard compression to the Avro File Format (Java) #303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AVRO-2162 Adds Zstandard compression to the Avro File Format (Java) #303
Conversation
scottcarey
commented
Mar 22, 2018
Adds TestCodecs to cover all file compression Codecs.
Consolidates common code in Codecs into OutputStreamCodec
and OutputInputStreamCodec abstractions.
Fixes DataFileStream so that Codecs can return DirectByteBuffers
or Heap ByteBuffers with non-zero offset.
|
While writing this and adding test coverage, I ended up making a few other clean-ups. Do we need to add anything in the spec about this? I also targetted the 1.8 branch because I assume that many people on older branches might be interested. I'm still stuck on a 1.7.x branch in (hadoop) production myself -- though that should change in a few months. |
| <dependency> | ||
| <groupId>com.github.luben</groupId> | ||
| <artifactId>zstd-jni</artifactId> | ||
| <optional>true</optional> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that most of the compression dependencies should be . Its extra baggage that is not useful in any case where we aren't writing out or reading files. It would be more consistent with the other codecs to remove this.
| @Override | ||
| public ByteBuffer decompress(ByteBuffer compressedData) throws IOException { | ||
| ByteArrayInputStream bais = new ByteArrayInputStream(compressedData.array()); | ||
| BZip2CompressorInputStream inputStream = new BZip2CompressorInputStream(bais); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the Codecs are internally based on InputStreams and OutputStreams. I refactored the commonalities out into two abstract classes (these would be better as 'mix-in' interfaces in Java 8+).
| /** Null codec, for no compression. */ | ||
| public static CodecFactory nullCodec() { | ||
| return NullCodec.OPTION; | ||
| // we can not reference NullCodec.OPTION because the static |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The unit test uncovered the fact that accessing this field here results in 'null' since we have a circular dependency in static initialization.
| return ByteBuffer.wrap(data, offset, blockSize); | ||
| } | ||
|
|
||
| void setBytes(ByteBuffer block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this now supports Codecs that return Direct ByteBuffers. Earlier versions of the ZstandardCodec were using APIs that returned direct buffers, which exploded here.
| @Override public String getName() { return DataFileConstants.SNAPPY_CODEC; } | ||
|
|
||
| @Override | ||
| public ByteBuffer compress(ByteBuffer in) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests were failing on only Snappy when I made some overly strict assumptions on the returned buffer.
In the process of debugging I fixed at least one bug (not setting ByteOrder.LITTLE_ENDIAN and letting the file format depend on the CPU of the writer).
The code also was not properly accounting for arrayOffset in many cases.
| return true; | ||
| if (getClass() != obj.getClass()) | ||
| return false; | ||
| XZCodec other = (XZCodec)obj; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The equals method for the XZ codec was wrong -- according to the specification of Codec, it should equal if it is mutually decompressible. The compression level is used for the compressor but does not affect decompressibility.
I made the implementation of hashCode and equals consistent across the codecs when appropriate.
| } | ||
|
|
||
| private final Codec codec; | ||
| private final byte[] zeroes = new byte[1024*1024]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all zeroes tends to compress massively -- which can uncover bugs in buffer sizing when decompressing.
pure random tends to be larger when compressed than uncompressed and may find bugs in buffer sizing when compressing.
|
Thinking about this a bit more.... I think I'll make three pull requests. One to the 1.7.x branch, one to the 1.8.x branch, and one to master. The two for the older branches will be as minimal as possible, only adding the new compression type. The one for master will include refactoring to reduce code duplication. |
|
Is there something considerable being missing to get this one merged? I think it would be great to have it for the 1.9.0 release. Backporting this looks like a lot of extra work, and can be done if someone really really needs/want to, otherwise we should encourage moving upwards. |
|
I believe a Zstandard codec was already merged: cf2f303 I think this is a duplicate. |
|
zstandard codec already merged. Duplicate |
|
@dkulp @nandorKollar Not quite a duplicate, the version that was merged did not include zstandard compression level. Zstandard's compression level runs from snappy or lz4 ish, extremely fast but not high compression, to nearly xz levels of compression ratio (but 20x faster at decompression than xz). Not having the compression ratio configurable is a fairly big issue for me. |
| private final int compressionLevel; | ||
|
|
||
| public ZstandardCodec(int compressionLevel) { | ||
| this.compressionLevel = Math.max(Math.min(compressionLevel, 22), 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note zstandard now has negative compression levels, for use cases that require even less CPU use: https://github.com/facebook/zstd/releases/tag/v1.3.4