Skip to content

Commit

Permalink
ARROW-8274: [C++] Use LZ4 frame format for "LZ4" compression in IPC
Browse files Browse the repository at this point in the history
Closes #6766 from pitrou/ARROW-8274-ipc-lz4-frame-format

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
pitrou authored and wesm committed Mar 31, 2020
1 parent a81aacd commit 895f220
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 27 deletions.
10 changes: 8 additions & 2 deletions cpp/src/arrow/io/compressed_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,7 @@ TEST_P(CompressedOutputStreamTest, RandomData) {
// NOTES:
// - Snappy doesn't support streaming decompression
// - BZ2 doesn't support one-shot compression
// - LZ4 streaming decompression uses the LZ4 framing format, which must be tested
// against a streaming compressor
// - LZ4 raw format doesn't support streaming decompression

#ifdef ARROW_WITH_SNAPPY
TEST(TestSnappyInputStream, NotImplemented) {
Expand Down Expand Up @@ -276,6 +275,13 @@ INSTANTIATE_TEST_SUITE_P(TestBrotliOutputStream, CompressedOutputStreamTest,
::testing::Values(Compression::BROTLI));
#endif

#ifdef ARROW_WITH_LZ4
INSTANTIATE_TEST_SUITE_P(TestLZ4InputStream, CompressedInputStreamTest,
::testing::Values(Compression::LZ4_FRAME));
INSTANTIATE_TEST_SUITE_P(TestLZ4OutputStream, CompressedOutputStreamTest,
::testing::Values(Compression::LZ4_FRAME));
#endif

#ifdef ARROW_WITH_ZSTD
INSTANTIATE_TEST_SUITE_P(TestZSTDInputStream, CompressedInputStreamTest,
::testing::Values(Compression::ZSTD));
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/feather.cc
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ Result<std::shared_ptr<Reader>> Reader::Open(
WriteProperties WriteProperties::Defaults() {
WriteProperties result;
#ifdef ARROW_WITH_LZ4
result.compression = Compression::LZ4;
result.compression = Compression::LZ4_FRAME;
#else
result.compression = Compression::UNCOMPRESSED;
#endif
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/feather.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ struct ARROW_EXPORT WriteProperties {
/// faster random row access
int64_t chunksize = 1LL << 16;

/// Compression type to use. Only UNCOMPRESSED, LZ4, and ZSTD are
/// Compression type to use. Only UNCOMPRESSED, LZ4_FRAME, and ZSTD are
/// supported. The default compression returned by Defaults() is LZ4 if the
/// project is built with support for it, otherwise
/// UNCOMPRESSED. UNCOMPRESSED is set as the object default here so that if
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/ipc/feather_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class TestFeather : public ::testing::TestWithParam<TestParam> {
auto props = WriteProperties::Defaults();
props.version = param.version;

// Don't fail if the build doesn't have LZ4 or ZSTD enabled
// Don't fail if the build doesn't have LZ4_FRAME or ZSTD enabled
if (util::Codec::IsAvailable(param.compression)) {
props.compression = param.compression;
} else {
Expand Down Expand Up @@ -129,7 +129,7 @@ TEST(TestFeatherWriteProperties, Defaults) {
auto props = WriteProperties::Defaults();

#ifdef ARROW_WITH_LZ4
ASSERT_EQ(Compression::LZ4, props.compression);
ASSERT_EQ(Compression::LZ4_FRAME, props.compression);
#else
ASSERT_EQ(Compression::UNCOMPRESSED, props.compression);
#endif
Expand Down Expand Up @@ -296,7 +296,7 @@ TEST_P(TestFeather, SliceBooleanRoundTrip) {
INSTANTIATE_TEST_SUITE_P(
FeatherTests, TestFeather,
::testing::Values(TestParam(kFeatherV1Version), TestParam(kFeatherV2Version),
TestParam(kFeatherV2Version, Compression::LZ4),
TestParam(kFeatherV2Version, Compression::LZ4_FRAME),
TestParam(kFeatherV2Version, Compression::ZSTD)));

} // namespace feather
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ TEST_F(TestWriteRecordBatch, WriteWithCompression) {
auto batch =
RecordBatch::Make(schema, length, {rg.String(500, 0, 10, 0.1), dict_array});

std::vector<Compression::type> codecs = {Compression::GZIP, Compression::LZ4,
std::vector<Compression::type> codecs = {Compression::GZIP, Compression::LZ4_FRAME,
Compression::ZSTD, Compression::SNAPPY,
Compression::BROTLI};
for (auto codec : codecs) {
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class RecordBatchSerializer {
Status CompressBodyBuffers() {
std::unique_ptr<util::Codec> codec;

// TODO check allowed values for compression?
AppendCustomMetadata("ARROW:body_compression",
util::Codec::GetCodecAsString(options_.compression));

Expand Down
17 changes: 16 additions & 1 deletion cpp/src/arrow/util/compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ std::string Codec::GetCodecAsString(Compression::type t) {
case Compression::BROTLI:
return "BROTLI";
case Compression::LZ4:
return "LZ4_RAW";
case Compression::LZ4_FRAME:
return "LZ4";
case Compression::ZSTD:
return "ZSTD";
Expand All @@ -96,8 +98,10 @@ Result<Compression::type> Codec::GetCompressionType(const std::string& name) {
return Compression::LZO;
} else if (name == "BROTLI") {
return Compression::BROTLI;
} else if (name == "LZ4") {
} else if (name == "LZ4_RAW") {
return Compression::LZ4;
} else if (name == "LZ4") {
return Compression::LZ4_FRAME;
} else if (name == "ZSTD") {
return Compression::ZSTD;
} else if (name == "BZ2") {
Expand Down Expand Up @@ -155,6 +159,16 @@ Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
break;
#else
return Status::NotImplemented("LZ4 codec support not built");
#endif
case Compression::LZ4_FRAME:
#ifdef ARROW_WITH_LZ4
if (compression_level_set) {
return Status::Invalid("LZ4 doesn't support setting a compression level.");
}
codec.reset(new Lz4FrameCodec());
break;
#else
return Status::NotImplemented("LZ4 codec support not built");
#endif
case Compression::ZSTD:
#ifdef ARROW_WITH_ZSTD
Expand Down Expand Up @@ -203,6 +217,7 @@ bool Codec::IsAvailable(Compression::type codec_type) {
return false;
#endif
case Compression::LZ4:
case Compression::LZ4_FRAME:
#ifdef ARROW_WITH_LZ4
return true;
#else
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/util/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace arrow {

struct Compression {
/// \brief Compression algorithm
enum type { UNCOMPRESSED, SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO, BZ2 };
enum type { UNCOMPRESSED, SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZ4_FRAME, LZO, BZ2 };

static constexpr int kUseDefaultCompressionLevel = std::numeric_limits<int>::min();
};
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/util/compression_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ BENCHMARK_TEMPLATE(ReferenceStreamingDecompression, Compression::ZSTD);
#endif

#ifdef ARROW_WITH_LZ4
BENCHMARK_TEMPLATE(ReferenceStreamingCompression, Compression::LZ4);
BENCHMARK_TEMPLATE(ReferenceStreamingDecompression, Compression::LZ4);
BENCHMARK_TEMPLATE(ReferenceStreamingCompression, Compression::LZ4_FRAME);
BENCHMARK_TEMPLATE(ReferenceStreamingDecompression, Compression::LZ4_FRAME);
#endif

#endif
Expand Down
88 changes: 82 additions & 6 deletions cpp/src/arrow/util/compression_lz4.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,14 @@ static Status LZ4Error(LZ4F_errorCode_t ret, const char* prefix_msg) {
return Status::IOError(prefix_msg, LZ4F_getErrorName(ret));
}

static LZ4F_preferences_t DefaultPreferences() {
LZ4F_preferences_t prefs;
memset(&prefs, 0, sizeof(prefs));
return prefs;
}

// ----------------------------------------------------------------------
// Lz4 decompressor implementation
// Lz4 frame decompressor implementation

class LZ4Decompressor : public Decompressor {
public:
Expand Down Expand Up @@ -106,7 +112,7 @@ class LZ4Decompressor : public Decompressor {
};

// ----------------------------------------------------------------------
// Lz4 compressor implementation
// Lz4 frame compressor implementation

class LZ4Compressor : public Compressor {
public:
Expand All @@ -120,7 +126,7 @@ class LZ4Compressor : public Compressor {

Status Init() {
LZ4F_errorCode_t ret;
memset(&prefs_, 0, sizeof(prefs_));
prefs_ = DefaultPreferences();
first_time_ = true;

ret = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION);
Expand Down Expand Up @@ -225,20 +231,90 @@ class LZ4Compressor : public Compressor {
};

// ----------------------------------------------------------------------
// Lz4 codec implementation
// Lz4 frame codec implementation

Result<std::shared_ptr<Compressor>> Lz4Codec::MakeCompressor() {
struct Lz4FrameCodec::Impl {
Impl() : prefs_(DefaultPreferences()) {}

LZ4F_preferences_t prefs_;
};

Lz4FrameCodec::Lz4FrameCodec() : impl_(new Impl()) {}

Lz4FrameCodec::~Lz4FrameCodec() {}

Result<std::shared_ptr<Compressor>> Lz4FrameCodec::MakeCompressor() {
auto ptr = std::make_shared<LZ4Compressor>();
RETURN_NOT_OK(ptr->Init());
return ptr;
}

Result<std::shared_ptr<Decompressor>> Lz4Codec::MakeDecompressor() {
Result<std::shared_ptr<Decompressor>> Lz4FrameCodec::MakeDecompressor() {
auto ptr = std::make_shared<LZ4Decompressor>();
RETURN_NOT_OK(ptr->Init());
return ptr;
}

int64_t Lz4FrameCodec::MaxCompressedLen(int64_t input_len,
const uint8_t* ARROW_ARG_UNUSED(input)) {
return static_cast<int64_t>(
LZ4F_compressFrameBound(static_cast<size_t>(input_len), &impl_->prefs_));
}

Result<int64_t> Lz4FrameCodec::Compress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len,
uint8_t* output_buffer) {
auto output_len =
LZ4F_compressFrame(output_buffer, static_cast<size_t>(output_buffer_len), input,
static_cast<size_t>(input_len), &impl_->prefs_);
if (LZ4F_isError(output_len)) {
return LZ4Error(output_len, "Lz4 compression failure: ");
}
return static_cast<int64_t>(output_len);
}

Result<int64_t> Lz4FrameCodec::Decompress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len,
uint8_t* output_buffer) {
ARROW_ASSIGN_OR_RAISE(auto decomp, MakeDecompressor());

int64_t total_bytes_written = 0;
while (!decomp->IsFinished() && input_len != 0) {
ARROW_ASSIGN_OR_RAISE(
auto res, decomp->Decompress(input_len, input, output_buffer_len, output_buffer));
input += res.bytes_read;
input_len -= res.bytes_read;
output_buffer += res.bytes_written;
output_buffer_len -= res.bytes_written;
total_bytes_written += res.bytes_written;
if (res.need_more_output) {
return Status::IOError("Lz4 decompression buffer too small");
}
}
if (!decomp->IsFinished()) {
return Status::IOError("Lz4 compressed input contains less than one frame");
}
if (input_len != 0) {
return Status::IOError("Lz4 compressed input contains more than one frame");
}
return total_bytes_written;
}

// ----------------------------------------------------------------------
// Lz4 "raw" codec implementation

Result<std::shared_ptr<Compressor>> Lz4Codec::MakeCompressor() {
return Status::NotImplemented(
"Streaming compression unsupported with LZ4 raw format. "
"Try using LZ4 frame format instead.");
}

Result<std::shared_ptr<Decompressor>> Lz4Codec::MakeDecompressor() {
return Status::NotImplemented(
"Streaming decompression unsupported with LZ4 raw format. "
"Try using LZ4 frame format instead.");
}

Result<int64_t> Lz4Codec::Decompress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer) {
int64_t decompressed_size = LZ4_decompress_safe(
Expand Down
27 changes: 26 additions & 1 deletion cpp/src/arrow/util/compression_lz4.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
namespace arrow {
namespace util {

// Lz4 codec.
// Lz4 "raw" format codec.
class ARROW_EXPORT Lz4Codec : public Codec {
public:
Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
Expand All @@ -42,7 +42,32 @@ class ARROW_EXPORT Lz4Codec : public Codec {

Result<std::shared_ptr<Decompressor>> MakeDecompressor() override;

const char* name() const override { return "lz4_raw"; }
};

// Lz4 frame format codec.
class ARROW_EXPORT Lz4FrameCodec : public Codec {
public:
Lz4FrameCodec();
~Lz4FrameCodec();

Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer) override;

Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer) override;

int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override;

Result<std::shared_ptr<Compressor>> MakeCompressor() override;

Result<std::shared_ptr<Decompressor>> MakeDecompressor() override;

const char* name() const override { return "lz4"; }

protected:
struct Impl;
std::unique_ptr<Impl> impl_;
};

} // namespace util
Expand Down
25 changes: 19 additions & 6 deletions cpp/src/arrow/util/compression_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ TEST(TestCodecMisc, GetCodecAsString) {
ASSERT_EQ("GZIP", Codec::GetCodecAsString(Compression::GZIP));
ASSERT_EQ("LZO", Codec::GetCodecAsString(Compression::LZO));
ASSERT_EQ("BROTLI", Codec::GetCodecAsString(Compression::BROTLI));
ASSERT_EQ("LZ4", Codec::GetCodecAsString(Compression::LZ4));
ASSERT_EQ("LZ4_RAW", Codec::GetCodecAsString(Compression::LZ4));
ASSERT_EQ("LZ4", Codec::GetCodecAsString(Compression::LZ4_FRAME));
ASSERT_EQ("ZSTD", Codec::GetCodecAsString(Compression::ZSTD));
ASSERT_EQ("BZ2", Codec::GetCodecAsString(Compression::BZ2));
}
Expand All @@ -333,7 +334,8 @@ TEST(TestCodecMisc, GetCompressionType) {
ASSERT_OK_AND_EQ(Compression::GZIP, Codec::GetCompressionType("GZIP"));
ASSERT_OK_AND_EQ(Compression::LZO, Codec::GetCompressionType("LZO"));
ASSERT_OK_AND_EQ(Compression::BROTLI, Codec::GetCompressionType("BROTLI"));
ASSERT_OK_AND_EQ(Compression::LZ4, Codec::GetCompressionType("LZ4"));
ASSERT_OK_AND_EQ(Compression::LZ4, Codec::GetCompressionType("LZ4_RAW"));
ASSERT_OK_AND_EQ(Compression::LZ4_FRAME, Codec::GetCompressionType("LZ4"));
ASSERT_OK_AND_EQ(Compression::ZSTD, Codec::GetCompressionType("ZSTD"));
ASSERT_OK_AND_EQ(Compression::BZ2, Codec::GetCompressionType("BZ2"));

Expand Down Expand Up @@ -432,8 +434,7 @@ TEST_P(CodecTest, StreamingCompressor) {
return;
}
if (GetCompression() == Compression::LZ4) {
// SKIP: LZ4 streaming compression uses the LZ4 framing format,
// which must be tested against a streaming decompressor
// SKIP: LZ4 raw format doesn't support streaming compression.
return;
}

Expand All @@ -459,8 +460,7 @@ TEST_P(CodecTest, StreamingDecompressor) {
return;
}
if (GetCompression() == Compression::LZ4) {
// SKIP: LZ4 streaming decompression uses the LZ4 framing format,
// which must be tested against a streaming compressor
// SKIP: LZ4 raw format doesn't support streaming decompression.
return;
}

Expand All @@ -481,6 +481,10 @@ TEST_P(CodecTest, StreamingRoundtrip) {
// SKIP: snappy doesn't support streaming decompression
return;
}
if (GetCompression() == Compression::LZ4) {
// SKIP: LZ4 raw format doesn't support streaming compression.
return;
}

int sizes[] = {0, 10, 100000};
for (int data_size : sizes) {
Expand All @@ -499,6 +503,10 @@ TEST_P(CodecTest, StreamingDecompressorReuse) {
// SKIP: snappy doesn't support streaming decompression
return;
}
if (GetCompression() == Compression::LZ4) {
// SKIP: LZ4 raw format doesn't support streaming decompression.
return;
}

auto codec = MakeCodec();
std::shared_ptr<Compressor> compressor;
Expand Down Expand Up @@ -527,6 +535,11 @@ INSTANTIATE_TEST_SUITE_P(TestSnappy, CodecTest, ::testing::Values(Compression::S
INSTANTIATE_TEST_SUITE_P(TestLZ4, CodecTest, ::testing::Values(Compression::LZ4));
#endif

#ifdef ARROW_WITH_LZ4
INSTANTIATE_TEST_SUITE_P(TestLZ4Frame, CodecTest,
::testing::Values(Compression::LZ4_FRAME));
#endif

#ifdef ARROW_WITH_BROTLI
INSTANTIATE_TEST_SUITE_P(TestBrotli, CodecTest, ::testing::Values(Compression::BROTLI));
#endif
Expand Down
Loading

0 comments on commit 895f220

Please sign in to comment.