-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support to compress data in PrestoVectorSerializer #5544
Add support to compress data in PrestoVectorSerializer #5544
Conversation
✅ Deploy Preview for meta-velox ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
5fa9ca8
to
9c0d9ec
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change % minors!
velox/dwio/common/Common.cpp
Outdated
case CompressionKind_LZ4: | ||
return getCodec(folly::io::CodecType::LZ4); | ||
} | ||
VELOX_UNREACHABLE(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to log out the unsupported kind in default case to help debugging? Thanks!
auto children = &(*result)->children(); | ||
auto childTypes = type->as<TypeKind::ROW>().children(); | ||
readColumns(source, pool, childTypes, children, useLosslessTimestamp); | ||
if (codec->type() == folly::io::CodecType::NO_COMPRESSION) { | ||
VELOX_CHECK(!isCompressedBitSet(pageCodecMarker)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we move the check above at l1845?
VELOX_CHECK_EQ(codec->type() == folly::io::CodecType::NO_COMPRESSION, !isCompressedBitSet(pageCodecMarker), "error message");
for (auto& stream : streams_) { | ||
stream->flush(&out); | ||
} | ||
int32_t uncompressedSize = out.tellp(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const
crc = computeChecksum(listener, codec, numRows, compressedSize); | ||
} | ||
output->seekp(crcOffset); | ||
writeInt64(output, crc); // Write zero checksum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is not valid?
Fixed all the comments, can you help review again? @xiaoxmeng |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. LGTM. Could you draw a diagram to show the serialized page layout with and without compression in header?
TEST_F(PrestoSerializerTest, ioBufRoundTrip) { | ||
serializer::presto::PrestoVectorSerde::registerVectorSerde(); | ||
TEST_P(PrestoSerializerTest, ioBufRoundTrip) { | ||
if (!isRegisteredVectorSerde()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you register once in SetUpTestCase? Thanks!
@@ -275,27 +297,31 @@ TEST_F(PrestoSerializerTest, multiPage) { | |||
auto byteStream = toByteStream(bytes); | |||
|
|||
RowVectorPtr deserialized; | |||
dwio::common::CompressionKind kind = GetParam(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make a method in test class: getSerdeOptions() which construct an options based on the test parameter?
velox/serializers/PrestoSerializer.h
Outdated
#include "velox/vector/VectorStream.h" | ||
|
||
namespace facebook::velox::serializer::presto { | ||
class PrestoVectorSerde : public VectorSerde { | ||
public: | ||
// Input options that the serializer recognizes. | ||
struct PrestoOptions : VectorSerde::Options { | ||
explicit PrestoOptions(bool useLosslessTimestamp) | ||
: useLosslessTimestamp(useLosslessTimestamp) {} | ||
explicit PrestoOptions() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PrestoOptions() = default;
Do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, then we can simplify the function, otherwise, we get nullptr Options most of the case, we should construct the PrestoOptions with its default value.
const PrestoVectorSerde::PrestoOptions toPrestoOptions(
const VectorSerde::Options* options) {
if (options == nullptr) {
return PrestoVectorSerde::PrestoOptions();
}
return *(static_cast<const PrestoVectorSerde::PrestoOptions*>(options));
}
velox/serializers/PrestoSerializer.h
Outdated
explicit PrestoOptions() {} | ||
|
||
explicit PrestoOptions( | ||
bool useLosslessTimestamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/useLosslessTimestamp/_useLosslessTimestamp/
s/compressionKind/_compressionKind/
@@ -136,6 +136,18 @@ std::string typeToEncodingName(const TypePtr& type) { | |||
} | |||
} | |||
|
|||
const PrestoVectorSerde::PrestoOptions toPrestoOptions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/const PrestoVectorSerde::PrestoOptions/PrestoVectorSerde::PrestoOptions/
Drop the const as return by value?
private: | ||
static const int32_t kSizeInBytesOffset{4 + 1}; | ||
static const int32_t kHeaderSize{kSizeInBytesOffset + 4 + 4 + 8}; | ||
|
||
const StreamArena* streamArena_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamArena* const streamArena_;
const std::unique_ptr<folly::io::Codec> codec_;
bool useLosslessTimestamp) { | ||
bool useLosslessTimestamp, | ||
dwio::common::CompressionKind compressionKind) { | ||
streamArena_ = streamArena; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same for the streamArena_ init
@xiaoxmeng has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jinchengchenghh Looks great. Thanks for iterations!
readColumns(source, pool, childTypes, children, useLosslessTimestamp); | ||
if (!needCompression(*codec)) { | ||
// skip number of columns | ||
source->skip(4); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add a check if the number of columns match? We currently see some serialized pages corruption in meta internal test. Thanks!
ByteStream uncompressedSource; | ||
uncompressedSource.resetInput({byteRange}); | ||
// skip number of columns | ||
uncompressedSource.skip(4); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@xiaoxmeng has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
10a6fc1
to
3c374dc
Compare
ASSERT_EQ( | ||
folly::io::CodecType::LZ4, | ||
compressionKindToCodec(CompressionKind::CompressionKind_LZ4)->type()); | ||
EXPECT_THROW( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use VELOX_ASSERT_THROW
@@ -14,7 +14,7 @@ | |||
add_library(velox_presto_serializer PrestoSerializer.cpp | |||
UnsafeRowSerializer.cpp) | |||
|
|||
target_link_libraries(velox_presto_serializer velox_vector) | |||
target_link_libraries(velox_presto_serializer velox_dwio_common velox_vector) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if it is a good idea to add this dependency. What do we need from dwio::common? Can we refactor to extract what we need?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this file https://github.com/facebookincubator/velox/blob/main/velox/dwio/common/Common.h, I would suggest to rename this file to Compression.h, and extract it to velox_compression.
d2cc78a
to
0370416
Compare
@xiaoxmeng has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
add_subdirectory(tests) | ||
endif() | ||
|
||
add_library(velox_dwio_common_compression Compression.cpp LzoDecompressor.cpp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be under dwio? Looks like it will be used for non-DWIO code. Maybe move it somewhere else. It would be nice to do this refactoring in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where would you recommend to put it? Maybe velox/common/compression
or velox/compression
? I can help refactor it in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
velox/common/compression sounds good. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jinchengchenghh I think you have moved to the wrong location: it should be velox/common/compression/Compression.h but not velox/dwio/common/compression/Compression.h? @mbasmanova
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will refactor in a separate PR as this discussion suggested. #5544 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jinchengchenghh Thank you. Just to clarify, we'll wait for a refactoring PR to proceed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jinchengchenghh could you rebase the PR to clear the ci test failure? Thanks!
0370416
to
a6ac219
Compare
@xiaoxmeng has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
case folly::io::CodecType::LZ4: | ||
return CompressionKind_LZ4; | ||
default: | ||
VELOX_UNSUPPORTED("Not support folly codec type {}", type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type.toString()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a enum class folly::io::CodecType, it does not have toString function
case CompressionKind_LZ4: | ||
return "lz4"; | ||
} | ||
return folly::to<std::string>("unknown - ", kind); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not put inside switch default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is old code, I just move it to another directory.
@@ -15,20 +15,29 @@ | |||
*/ | |||
#pragma once | |||
#include "velox/common/base/Crc.h" | |||
#include "velox/dwio/common/compression/Compression.h" | |||
#include "velox/vector/VectorStream.h" | |||
|
|||
namespace facebook::velox::serializer::presto { | |||
class PrestoVectorSerde : public VectorSerde { | |||
public: | |||
// Input options that the serializer recognizes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand this is not within this PR but can we make the comments triple slash as it's public? THanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could open a new PR to do that since it is no relevant change.
dwio::common::CompressionKind _compressionKind) | ||
: useLosslessTimestamp(_useLosslessTimestamp), | ||
compressionKind(_compressionKind) {} | ||
|
||
// Currently presto only supports millisecond precision and the serializer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
PrestoVectorSerde::PrestoOptions toPrestoOptions( | ||
const VectorSerde::Options* options) { | ||
if (options == nullptr) { | ||
return PrestoVectorSerde::PrestoOptions(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does returning NONE mean no compression or something wrong? I see a needCompression() method down there already can tell if compression is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@jinchengchenghh I know this is due to current code behavior but could you help either fix all the code sites to pass non-null VectorSerde::Options or have a unit test to make sure PrestoVectorSerde::PrestoOptions() returns common::CompressionKind::CompressionKind_NONE.
@@ -1580,6 +1595,9 @@ class PrestoVectorSerializer : public VectorSerializer { | |||
} | |||
} | |||
|
|||
// The SerializedPage layout is: | |||
// numRows(4) | codec(1) | uncompressedSize(4) | compressedSize(4) | | |||
// checksum(8) | data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice comments. Can we make them triple slash /// as it's public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jinchengchenghh file move PR is now committed. Can you rebase? Thanks!
b436e64
to
c48af6b
Compare
@xiaoxmeng has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
@xiaoxmeng merged this pull request in e80df93. |
Compress the data by folly when flush to OutputStream. And decompress the data under deserialization.
Will use in spill compression.
Resolve:#5313