-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-29135: ZStandard decompression can operate directly on ByteBuffs #6708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HBASE-29135: ZStandard decompression can operate directly on ByteBuffs #6708
Conversation
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
|
Is the test failure related? |
|
|
|
I spoke too soon. The test works on my machine on a 2.6-based version of this branch that I've been testing with. It doesn't work on this branch. |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
|
I'm now supporting the |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
| import org.apache.yetus.audience.InterfaceAudience; | ||
| import org.apache.yetus.audience.InterfaceStability; | ||
|
|
||
| @InterfaceAudience.Public |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the motivation for making these new classes public? I wonder whether private or limited private (with config exposure) is more appropriate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that people might want to be able to write their own ByteBuffDecompressionCodecs outside the HBase source tree. But I'm not married to that. I'll change it to Private so it'll be easier to change in the future.
| @Override | ||
| public void close() { | ||
| ctx.close(); | ||
| dict.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's possible for this to produce a NPE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point
| while (decompressedBytesInBlock < decompressedBlockSize) { | ||
| int compressedChunkSize = rawReadInt(input); | ||
| compressedBytesConsumed += 4; | ||
| int n = rawDecompressor.decompress(output, input, compressedChunkSize); | ||
| compressedBytesConsumed += compressedChunkSize; | ||
| decompressedBytesInBlock += n; | ||
| totalDecompressedBytes += n; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have some sort of check to bail out of the loop if RawDecompressor#decompress returns zero for some reason? Otherwise I think this logic would be stuck
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea
| * Specification of a block-based decompressor, which can be more efficient than the stream-based | ||
| * Decompressor. | ||
| */ | ||
| @InterfaceAudience.Public |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any thoughts on also making this private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
ndimiduk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a couple questions. Also a basic unit test that verifies the happy path and the obvious unsupported paths would be better.
| return totalDecompressedBytes; | ||
| } | ||
|
|
||
| private static int rawReadInt(ByteBuff input) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that you don't need to implement this method. Instead, call ByteBuff#getInt(). It uses the Unsafe to read the full 4 bytes at once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ByteBuff#getInt() assumes a system-dependent endian-ness, so its behavior is not totally deterministic. That's why I'm using my own method here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(also, since all hardware I use is little-endian, it actually reads this format wrong)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay makes sense. Please add a comment to the method that makes note of this endian-specific implementation. Maybe in the future we'll update our ByteBuff utilities to account for specific endian-ness.
| import org.apache.yetus.audience.InterfaceAudience; | ||
|
|
||
| /** | ||
| * Specification of a block-based decompressor, which can be more efficient than the stream-based |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is it "block-based", or "ByteBuff-based"? Nothing in the interface name or methods tells me that it's only decompressing a single serialised HFileBlocks. Does it operate on a single block at a time, or can I provide it an inputLen that represents several blocks in the same input buffer?
Maybe all this is sort of assumed by the existing conventions in this package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That comment is my bad. It should say "ByteBuff-based." I initially was using "block" in a vague way meaning the opposite of a stream.
| ByteBuffDecompressor decompressor = | ||
| CodecPool.getByteBuffDecompressor((ByteBuffDecompressionCodec) codec); | ||
| if (LOG.isTraceEnabled()) { | ||
| LOG.trace("Retrieved decompressor " + decompressor + " from pool."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use Logger format string API instead.
| public void returnByteBuffDecompressor(ByteBuffDecompressor decompressor) { | ||
| if (decompressor != null) { | ||
| if (LOG.isTraceEnabled()) { | ||
| LOG.trace("Returning decompressor " + decompressor + " to pool."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And here.
| } | ||
| } | ||
|
|
||
| private boolean canFastDecompress(ByteBuff blockBufferWithoutHeader, ByteBuff onDiskBlock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"fast" is relative and will likely continue to change. Instead, can you use a more descriptive name for this alternative implementation. Maybe canDecompressViaByteBuffDecompressor?
| @InterfaceAudience.Private | ||
| public class ZstdByteBuffDecompressor implements ByteBuffDecompressor, CanReinit { | ||
|
|
||
| protected int dictId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these fields protected instead of private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm following the convention established in ZstdDecompressor, but I don't need to be.
|
I've added some unit tests |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
ndimiduk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @charlesconnell this looks pretty nice!
| return totalDecompressedBytes; | ||
| } | ||
|
|
||
| private static int rawReadInt(ByteBuff input) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay makes sense. Please add a comment to the method that makes note of this endian-specific implementation. Maybe in the future we'll update our ByteBuff utilities to account for specific endian-ness.
| input.put(COMPRESSED_PAYLOAD); | ||
| input.rewind(); | ||
| int decompressedSize = decompressor.decompress(output, input, COMPRESSED_PAYLOAD.length); | ||
| assertEquals("HBase is awesome", Bytes.toString(output.toBytes(0, decompressedSize))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😆
|
🎊 +1 overall
This message was automatically generated. |
|
🎊 +1 overall
This message was automatically generated. |
apache#6708) Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
apache#6708) Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
apache#6708) Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
apache#6708) Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
#6708) Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
apache#6708) Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
apache#6708) Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
apache#6708) Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
#6708) Signed-off-by: Nick Dimiduk <ndimiduk@apache.org> Co-authored-by: Charles Connell <cconnell@hubspot.com>
#6708) Signed-off-by: Nick Dimiduk <ndimiduk@apache.org> Co-authored-by: Charles Connell <cconnell@hubspot.com>
#6708) Signed-off-by: Nick Dimiduk <ndimiduk@apache.org> Co-authored-by: Charles Connell <cconnell@hubspot.com>
apache#6708) Signed-off-by: Nick Dimiduk <ndimiduk@apache.org> Co-authored-by: Charles Connell <cconnell@hubspot.com>
Each time a block is decoded in
HFileBlockDefaultDecodingContext, a newDecompressorStreamis allocated and used. This is a lot of allocation, and the use of the streaming pattern requires copying every byte to be decompressed more times than necessary. Each byte is copied from aByteBuffinto abyte[], then decompressed, then copied back to aByteBuff. For decompressors likeorg.apache.hadoop.hbase.io.compress.zstd.ZstdDecompressorthat only operate on direct memory, two additional copies are introduced to move from abyte[]to a direct NIOByteBuffer, then back to abyte[].Aside from the copies inherent in the decompression algorithm, and the necessity of copying from an compressed buffer to an uncompressed buffer, all of these other copies can be avoided without sacrificing functionality. Along the way, we'll also avoid allocating objects.
In this PR:
ByteBuffDecompressorwhich does exactly what it sounds likeZstdByteBuffDecompressorthat uses zstd-jni underneathSingleByteBuffs or both heapSingleByteBuffs.ByteBuffDecompressors the same way that it poolsDecompressors.ByteBuffs, then take the new fast path.In a subsequent PR I plan to add glue so that any codec offering a
org.apache.hadoop.io.compress.DirectDecompressor, which several in hadoop-common already do, can be used as aByteBuffDecompressor.I've already been using this code successfully in production at my company.