Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor parquet PageReader to use PagedInputStream for decompression #5914

Conversation

nmahadevuni
Copy link
Collaborator

@nmahadevuni nmahadevuni commented Jul 28, 2023

Resolves #5714

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Jul 28, 2023
@netlify
Copy link

netlify bot commented Jul 28, 2023

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit 6bfc6c4
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/64ef015e7aa1840008efc334

@nmahadevuni
Copy link
Collaborator Author

New PR for decoupling compression classes into common package is #5946

@nmahadevuni nmahadevuni changed the title Refact parquet PageReader to use PagedInputStream for decompression Refactor parquet PageReader to use PagedInputStream for decompression Aug 2, 2023
@nmahadevuni nmahadevuni force-pushed the parquet_refactor_uncompress branch 2 times, most recently from 1bc9754 to 371264e Compare August 8, 2023 12:47
@nmahadevuni
Copy link
Collaborator Author

@yingsu00 @Yuhta Updated this PR. Please review.

@Yuhta Yuhta self-requested a review August 8, 2023 23:11
velox/dwio/common/compression/Compression.h Outdated Show resolved Hide resolved
velox/dwio/common/compression/Compression.h Outdated Show resolved Hide resolved
velox/dwio/parquet/reader/PageReader.cpp Outdated Show resolved Hide resolved
velox/dwio/common/compression/PagedInputStream.h Outdated Show resolved Hide resolved
velox/dwio/parquet/reader/PageReader.cpp Outdated Show resolved Hide resolved
@nmahadevuni nmahadevuni force-pushed the parquet_refactor_uncompress branch 2 times, most recently from 6e831c9 to 83b4e7b Compare August 16, 2023 16:29
@nmahadevuni
Copy link
Collaborator Author

@yingsu00 @Yuhta Addressed the comments. Please review again.

@nmahadevuni
Copy link
Collaborator Author

@yingsu00 @Yuhta Addressed the latest comments. Please review!

Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nmahadevuni thanks for the change. Can we add tests to verify the passed options have been handled properly. Thanks!

velox/dwio/common/compression/Compression.h Show resolved Hide resolved
uint32_t compressionThreshold,
int32_t zlibCompressionLevel,
int32_t zstdCompressionLevel,
CompressionOptions options,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make CompressionOptions the last input parameter?

const CompressionOptions& options;

const dwio::common::encryption::Decrypter* decryptr = nullptr,
bool useRawDecompression = false,
size_t compressedLength = 0,
CompressionOptions options = getDwrfOrcDecompressionOptions());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why default is dwrf options? Compressor is also used for presto serializer. We could have a default compression options which is not file format specific.

return options;
}

static CompressionOptions getParquetDecompressionOptions() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to parquet, dwrf code repo when they create compression options? Thanks!

@@ -137,7 +139,9 @@ class ZlibDecompressor : public Decompressor {

ZlibDecompressor::ZlibDecompressor(
uint64_t blockSize,
const std::string& streamDebugInfo)
const std::string& streamDebugInfo,
int windowBits,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move windowBits and isGzip ahead of streamDebugInfo? Thanks!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved windowBits. isGzip has a default value. Cannot move it.

streamDebugInfo,
options.format.zlib.windowBits,
true,
useRawDecompression,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useRawDecompression can be zlib compression option? thanks!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is for PagedInputStream, not specific to any compression format.

@@ -28,7 +28,9 @@ class PagedInputStream : public dwio::common::SeekableInputStream {
memory::MemoryPool& memPool,
std::unique_ptr<Decompressor> decompressor,
const dwio::common::encryption::Decrypter* decrypter,
const std::string& streamDebugInfo)
const std::string& streamDebugInfo,
bool useRawDecompression = false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move streamDebugInfo to the last input?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has default arguments, streamDebugInfo can't be moved to last.

}

static CompressionOptions getParquetDecompressionOptions() {
CompressionOptions options;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const CompressionOptions& options;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't declare it const reference. needs initialization.

const dwio::common::encryption::Decrypter* decryptr = nullptr,
bool useRawDecompression = false,
size_t compressedLength = 0,
CompressionOptions options = getDwrfOrcDecompressionOptions());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const CompressionOptions& options;

const dwio::common::encryption::Decrypter* decryptr = nullptr);
const dwio::common::encryption::Decrypter* decryptr = nullptr,
bool useRawDecompression = false,
size_t compressedLength = 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const CompressionOptions& options;

yaqi-zhao added a commit to yaqi-zhao/velox that referenced this pull request Aug 24, 2023
yaqi-zhao added a commit to yaqi-zhao/velox that referenced this pull request Aug 24, 2023
yaqi-zhao added a commit to yaqi-zhao/velox that referenced this pull request Aug 24, 2023
@nmahadevuni nmahadevuni force-pushed the parquet_refactor_uncompress branch 2 times, most recently from c139534 to b69418a Compare August 25, 2023 11:01
@nmahadevuni
Copy link
Collaborator Author

@xiaoxmeng Addressed the comments. Please review again. @Yuhta @yingsu00

uint32_t compressedSize,
uint32_t uncompressedSize,
uint32_t decompressedSize,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be uncompressedSize, which means the size before the data was compressed

"{} uncompress failed, remainingOutputSize is less then "
"uncompressedBlockLength, remainingOutputSize: {}, "
"uncompressedBlockLength: {}",
"{} decompress failed, remainingOutputSize is less then "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decompression failed

@@ -165,7 +165,7 @@ const char* FOLLY_NONNULL decompressLz4AndLzo(
// Check that input length should not be negative.
if (inputLength < sizeof(uint32_t)) {
VELOX_FAIL(
"{} uncompress failed, input len is to small: {}",
"{} decompress failed, input len is to small: {}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decompression failed

/**
* Create a decompressor for the given compression kind.
* @param kind the compression type to implement
* @param input the input stream that is the underlying source
* @param bufferSize the maximum size of the buffer
* @param pool the memory pool
* @param useRawDecompression specify whether to perform raw decompression
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please make the comments order the same as the real arguments?

@@ -87,10 +120,8 @@ std::unique_ptr<BufferedOutputStream> createCompressor(
facebook::velox::common::CompressionKind kind,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is for line 117: will you please update the comments accordingly?

constexpr uint8_t PAGE_HEADER_SIZE = 3;

FOLLY_ALWAYS_INLINE const CompressionOptions getDwrfOrcCompressionOptions(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use FOLLY_ALWAYS_INLINE? This function is just called once when creating the compressor.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

encrypter);
}

FOLLY_ALWAYS_INLINE const CompressionOptions getDwrfOrcDecompressionOptions() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why FOLLY_ALWAYS_INLINE?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@@ -36,4 +36,5 @@ target_link_libraries(
arrow
Snappy::snappy
thrift
zstd::zstd)
zstd::zstd
${Protobuf_LIBRARIES})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line related and necessary?

Copy link
Collaborator Author

@nmahadevuni nmahadevuni Aug 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary since we are now having references to google::protobuf::io::ZeroCopyOutputStream via BufferedOutputStream from Parquet PageReader.h -> dwio/common/compression/Compression.h -> velox/dwio/common/OutputStream.h

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undefined reference to `typeinfo for google::protobuf::io::ZeroCopyOutputStream' after this PR

I got the following link issue

/usr/bin/ld: velox/dwio/common/compression/libvelox_dwio_common_compression.a(Compression.cpp.o): in function `google::protobuf::io::ZeroCopyOutputStream::ZeroCopyOutputStream()':
/home/chang/SourceCode/velox/cmake-build-debug-system/_deps/protobuf-src/src/google/protobuf/io/zero_copy_stream.h:193: undefined reference to `vtable for google::protobuf::io::ZeroCopyOutputStream'
/usr/bin/ld: velox/dwio/common/compression/libvelox_dwio_common_compression.a(Compression.cpp.o): in function `google::protobuf::io::ZeroCopyOutputStream::~ZeroCopyOutputStream()':
/velox/cmake-build-debug-system/_deps/protobuf-src/src/google/protobuf/io/zero_copy_stream.h:194: undefined reference to `vtable for google::protobuf::io::ZeroCopyOutputStream'
/usr/bin/ld: velox/dwio/common/libvelox_dwio_common.a(OutputStream.cpp.o):(.[data.rel.ro](http://data.rel.ro/)._ZTIN8facebook5velox4dwio6common20BufferedOutputStreamE[_ZTIN8facebook5velox4dwio6common20BufferedOutputStreamE]+0x10): undefined reference to `typeinfo for google::protobuf::io::ZeroCopyOutputStream'

Here are my cmake option

-DVELOX_DEPENDENCY_SOURCE=BUNDLED
-DVELOX_ENABLE_EXAMPLES=ON
-DVELOX_ENABLE_PARQUET=ON
-DVELOX_BUILD_TESTING=ON
-DVELOX_ENABLE_BENCHMARKS=ON
-DVELOX_ENABLE_BENCHMARKS_BASIC=ON

I checked probobuf cmake file and make sure disbale RTTI is OFF

Not sure how to fix it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@baibaichen I was trying to do BUNDLED build using your cmake settings buy I'm getting a different error with Arrow build failing with Snappy.h file not found. I will sort that out. Can you move this protobuf link library to velox_dwio_common and check of it fixes the issue?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have fixed the protobuf header leak instead of propagating this dependency. I will open a PR to fix this.

@@ -28,6 +30,8 @@

namespace facebook::velox::parquet {

using namespace dwio::common::compression;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By convention, the header files do not use "using namespace". Can you please just inline these prefixes?

Copy link
Collaborator

@yingsu00 yingsu00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nmahadevuni Mostly look good! Just a couple of nits...

/**
* Create a decompressor for the given compression kind.
* @param kind the compression type to implement
* @param input the input stream that is the underlying source
* @param bufferSize the maximum size of the buffer
* @param pool the memory pool
* @param options compression options to use
* @param useRawDecompression specify whether to perform raw decompression
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter is only for GZIp and ZLIB, can you mention it in the comment?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is currently for Gzip and Zlib, but can be used for any format. Do we want to change the comment?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also compressionThreshold is not specific to these formats.


uint32_t compressionThreshold;
};

/**
* Create a decompressor for the given compression kind.
* @param kind the compression type to implement
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please use Upper case for the first characters? E.g.
@param kind The compression type to implement

ditto


/**
* Create a decompressor for the given compression kind.
* @param kind the compression type to implement
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upper case for the first chars

@nmahadevuni
Copy link
Collaborator Author

@ying, can you rerun the failed test? It is not related.

@facebook-github-bot
Copy link
Contributor

@Yuhta has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

yaqi-zhao added a commit to yaqi-zhao/velox that referenced this pull request Sep 7, 2023
@facebook-github-bot
Copy link
Contributor

@Yuhta merged this pull request in 95f608b.

@conbench-facebook
Copy link

Conbench analyzed the 1 benchmark run on commit 95f608b3.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details.

gaoyangxiaozhu pushed a commit to gayangya/velox that referenced this pull request Sep 25, 2023
codyschierbeck pushed a commit to codyschierbeck/velox that referenced this pull request Sep 27, 2023
…facebookincubator#5914)

Summary:
Resolves facebookincubator#5714

Pull Request resolved: facebookincubator#5914

Reviewed By: mbasmanova

Differential Revision: D48521298

Pulled By: Yuhta

fbshipit-source-id: 1a5f4204f02b556dfa03da125d1a587f9aae70f6
codyschierbeck pushed a commit to codyschierbeck/velox that referenced this pull request Sep 27, 2023
…facebookincubator#5914)

Summary:
Resolves facebookincubator#5714

Pull Request resolved: facebookincubator#5914

Reviewed By: mbasmanova

Differential Revision: D48521298

Pulled By: Yuhta

fbshipit-source-id: 1a5f4204f02b556dfa03da125d1a587f9aae70f6
codyschierbeck pushed a commit to codyschierbeck/velox that referenced this pull request Sep 27, 2023
…facebookincubator#5914)

Summary:
Resolves facebookincubator#5714

Pull Request resolved: facebookincubator#5914

Reviewed By: mbasmanova

Differential Revision: D48521298

Pulled By: Yuhta

fbshipit-source-id: 1a5f4204f02b556dfa03da125d1a587f9aae70f6
gaoyangxiaozhu pushed a commit to gayangya/velox that referenced this pull request Sep 28, 2023
ericyuliu pushed a commit to ericyuliu/velox that referenced this pull request Oct 12, 2023
…facebookincubator#5914)

Summary:
Resolves facebookincubator#5714

Pull Request resolved: facebookincubator#5914

Reviewed By: mbasmanova

Differential Revision: D48521298

Pulled By: Yuhta

fbshipit-source-id: 1a5f4204f02b556dfa03da125d1a587f9aae70f6
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. Merged parquet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Refactor PageReader::uncompressData to use PagedInputStream
7 participants