-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-38271: [C++][Parquet] Support reading parquet files with multiple gzip members #38272
Conversation
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution! Could you please add a round trip test for the GZipCodec, which will fail decoding without you fix?
@amassalha How does the gzip streaming parquet file generated? This patch looks general ok but I want to know how other system support file like this |
Yeah, I know it's allowed by the protocol. I will investigate arrow-rs and parquet-mr about can we write or read file like this. Also, do you know what system could generate parquet file with multiple gzip members? |
I agree with @mapleFU. Did you have an example parquet file with multiple gzip members on hand? We need to make sure parquet-mr can decompress it successfully. If you don't mind, please upload one here and I can verify it on my end. |
@wgtmac the parquet file is attached to the issue: I also tried pushing it to parquet-testing repo (https://github.com/apache/parquet-testing) but somehow I couldn't creat a pull request (thats what causing the failiures in this run, it can't find the file). any suggestions on that? is 'parquet-testing' also opened for pull requests ? @mapleFU Im not familier with other system (than ours) curently |
CI failed might related to the lint problem. You can run How do you generate that parquet-file? |
As I mentioned, we use our internal high-performance gzip implementation which is fully adheres to the gzip RFC. (I can't give more details, I work in a startup) |
https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L496-L498 Parquet also has two versions of LZ4 in standard. I've no issue about gzip standard, but maybe I need to make sure this kind of file is supported. From the standard ( https://github.com/apache/parquet-format/blob/master/Compression.md#gzip ), the link is authority. Let me go through the parquet-mr impl tonight to make it clear. |
@amassalha I've try current implemention, seems that we're able to read the concatenated_gzip_members.zip . Which contains 513 rows in this file. Can you confirm that? |
to read with current values? because when I tried using "ReadBatch" it returned "0" instead of last number (513) |
Ah you're right! I've check the file, read with arrow-rs:
@wgtmac Would you mind also check it using parquet-mr? |
parquet-mr seems to be happy with the file.
|
Yeah, seems it's a bug we need fix, thanks @amassalha ! I'll checkout why arrow-rs cannot open file like that |
great, thank you |
@@ -368,6 +368,41 @@ TEST_P(CodecTest, CodecRoundtrip) { | |||
} | |||
} | |||
|
|||
TEST(CodecTest, CodecRoundtripGzipMembers) { | |||
int sizes[] = {0, 10000, 100000}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use std::array or std::vector instead of raw array.
std::unique_ptr<Codec> c1; | ||
ASSERT_OK_AND_ASSIGN(c1, Codec::Create(Compression::GZIP)); | ||
|
||
for (int data_half_size : sizes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (int data_half_size : sizes) { | |
for (int data_half_size : {0, 10000, 100000}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or simply do this.
std::unique_ptr<Codec> c1; | ||
ASSERT_OK_AND_ASSIGN(c1, Codec::Create(Compression::GZIP)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::unique_ptr<Codec> c1; | |
ASSERT_OK_AND_ASSIGN(c1, Codec::Create(Compression::GZIP)); | |
ASSERT_OK_AND_ASSIGN(auto gzip_codec, Codec::Create(Compression::GZIP)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's name it more meaningfully.
ASSERT_OK_AND_ASSIGN(actual_size_p1, c1->Compress(data_half.size(), data_half.data(), | ||
max_compressed_len_half, compressed.data())); | ||
ASSERT_OK_AND_ASSIGN(actual_size_p2, c1->Compress(data_half.size(), data_half.data(), | ||
max_compressed_len_half, compressed.data()+actual_size_p1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ASSERT_OK_AND_ASSIGN(actual_size_p1, c1->Compress(data_half.size(), data_half.data(), | |
max_compressed_len_half, compressed.data())); | |
ASSERT_OK_AND_ASSIGN(actual_size_p2, c1->Compress(data_half.size(), data_half.data(), | |
max_compressed_len_half, compressed.data()+actual_size_p1)); | |
ASSERT_OK_AND_ASSIGN(int64_t actual_size_p1, gzip_codec->Compress(data_half.size(), data_half.data(), | |
max_compressed_len_half, compressed.data())); | |
ASSERT_OK_AND_ASSIGN(int64_t actual_size_p2, gzip_codec->Compress(data_half.size(), data_half.data(), | |
max_compressed_len_half, compressed.data() + actual_size_p1)); |
We can simply do this.
compressed.resize(actual_size_p1 + actual_size_p2); | ||
|
||
// Decompress the concatenated data | ||
std::vector<uint8_t> decompressed(data_half_size*2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::vector<uint8_t> decompressed(data_half_size*2); | |
std::vector<uint8_t> decompressed(data_half_size * 2); |
c1->Decompress(compressed.size(), compressed.data(), | ||
decompressed.size(), decompressed.data())); | ||
|
||
ASSERT_EQ(data_half.size()*2, actual_decompressed_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ASSERT_EQ(data_half.size()*2, actual_decompressed_size); | |
ASSERT_EQ(data_half.size() * 2, actual_decompressed_size); |
I found arrow-cpp we can read the file because we don't check the decompress size: // Decompress the values
PARQUET_ASSIGN_OR_THROW(auto decompress_len, decompressor_->Decompress(
compressed_len - levels_byte_len, page_buffer->data() + levels_byte_len,
uncompressed_len - levels_byte_len,
decompression_buffer_->mutable_data() + levels_byte_len));
if (decompress_len != uncompressed_len - levels_byte_len) {
throw ParquetException("Expected " + std::to_string(uncompressed_len - levels_byte_len) +
" bytes but decompressed " + std::to_string(decompress_len));
} @wgtmac @pitrou Do you think we need to add the check above? |
@mapleFU Yes, I think it would be good regardless of whether we decide to support this feature or not. |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update! Here are two more comments.
Also, can you fix the C++ lint failures (see CI results)?
linting review fixes code review fixes2 code review fixes Update testing Update parquet-testing flint fixes Update compression_test.cc
c7dc64e
to
a22d6a5
Compare
thanks for the comments all |
I see a timeout fail in 3sfs, I don't see how this is related to gzip, do you have a flaky / non-deterministic bug? |
Don't worry I'll rerun it. |
great, thanks |
@wgtmac any comments from your side? |
I don't have any concern. It seems that you need rebase it to get merged. |
@wgtmac I rebased, and got same random fale of sf3, should we just rerun or what? |
@mapleFU how can I rerun this? |
@amassalha There is no need to re-run, these failures are unrelated. |
@wgtmac Would you like to review this again? Otherwise, I will just merge this PR. |
Thanks for notifying me! Feel free to merge this. |
I'll merge it |
thanks! |
After merging your PR, Conbench analyzed the 5 benchmarking runs that have been run so far on merge-commit 1d904d6. There were 5 benchmark results indicating a performance regression:
The full Conbench report has more details. It also includes information about 6 possible false positives for unstable benchmarks that are known to sometimes produce them. |
…tiple gzip members (apache#38272) ### What changes are included in this PR? Adding support in GZipCodec to decompress concatenated gzip members ### Are these changes tested? test is attached ### Are there any user-facing changes? no * Closes: apache#38271 Lead-authored-by: amassalha <amassalha@speedata.io> Co-authored-by: Atheel Massalha <amassalha@speedata.io> Signed-off-by: mwish <maplewish117@gmail.com>
What changes are included in this PR?
Adding support in GZipCodec to decompress concatenated gzip members
Are these changes tested?
test is attached
Are there any user-facing changes?
no