diff --git a/c_glib/arrow-glib/codec.cpp b/c_glib/arrow-glib/codec.cpp index fdd61e70a177..33b3d1c9149e 100644 --- a/c_glib/arrow-glib/codec.cpp +++ b/c_glib/arrow-glib/codec.cpp @@ -38,7 +38,7 @@ G_BEGIN_DECLS */ typedef struct GArrowCodecPrivate_ { - arrow::util::Codec *codec; + std::shared_ptr codec; } GArrowCodecPrivate; enum { @@ -57,7 +57,7 @@ garrow_codec_finalize(GObject *object) { auto priv = GARROW_CODEC_GET_PRIVATE(object); - delete priv->codec; + priv->codec.~shared_ptr(); G_OBJECT_CLASS(garrow_codec_parent_class)->finalize(object); } @@ -72,7 +72,8 @@ garrow_codec_set_property(GObject *object, switch (prop_id) { case PROP_CODEC: - priv->codec = static_cast(g_value_get_pointer(value)); + priv->codec = + *static_cast *>(g_value_get_pointer(value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); @@ -96,6 +97,8 @@ garrow_codec_get_property(GObject *object, static void garrow_codec_init(GArrowCodec *object) { + auto priv = GARROW_CODEC_GET_PRIVATE(object); + new(&priv->codec) std::shared_ptr; } static void @@ -111,7 +114,7 @@ garrow_codec_class_init(GArrowCodecClass *klass) spec = g_param_spec_pointer("codec", "Codec", - "The raw arrow::util::Codec *", + "The raw std::shared_ptr *", static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_CODEC, spec); @@ -133,7 +136,9 @@ garrow_codec_new(GArrowCompressionType type, auto arrow_type = garrow_compression_type_to_raw(type); auto arrow_codec = arrow::util::Codec::Create(arrow_type); if (garrow::check(error, arrow_codec, "[codec][new]")) { - return garrow_codec_new_raw(arrow_codec.ValueOrDie().release()); + std::shared_ptr arrow_codec_shared = + std::move(*arrow_codec); + return garrow_codec_new_raw(&arrow_codec_shared); } else { return NULL; } @@ -151,7 +156,46 @@ const gchar * garrow_codec_get_name(GArrowCodec *codec) { auto arrow_codec = garrow_codec_get_raw(codec); - return arrow_codec->name(); + if (!arrow_codec) { + return NULL; + } + return arrow_codec->name().c_str(); +} + +/** + * garrow_codec_get_compression_type: + * @codec: A #GArrowCodec. + * + * Returns: The compression type of the codec. + * + * Since: 2.0.0 + */ +GArrowCompressionType +garrow_codec_get_compression_type(GArrowCodec *codec) +{ + auto arrow_codec = garrow_codec_get_raw(codec); + if (!arrow_codec) { + return GARROW_COMPRESSION_TYPE_UNCOMPRESSED; + } + return garrow_compression_type_from_raw(arrow_codec->compression_type()); +} + +/** + * garrow_codec_get_compression_level: + * @codec: A #GArrowCodec. + * + * Returns: The compression level of the codec. + * + * Since: 2.0.0 + */ +gint +garrow_codec_get_compression_level(GArrowCodec *codec) +{ + auto arrow_codec = garrow_codec_get_raw(codec); + if (!arrow_codec) { + return arrow::util::Codec::UseDefaultCompressionLevel(); + } + return arrow_codec->compression_level(); } G_END_DECLS @@ -207,7 +251,7 @@ garrow_compression_type_to_raw(GArrowCompressionType type) } GArrowCodec * -garrow_codec_new_raw(arrow::util::Codec *arrow_codec) +garrow_codec_new_raw(std::shared_ptr *arrow_codec) { auto codec = GARROW_CODEC(g_object_new(GARROW_TYPE_CODEC, "codec", arrow_codec, @@ -215,7 +259,7 @@ garrow_codec_new_raw(arrow::util::Codec *arrow_codec) return codec; } -arrow::util::Codec * +std::shared_ptr garrow_codec_get_raw(GArrowCodec *codec) { auto priv = GARROW_CODEC_GET_PRIVATE(codec); diff --git a/c_glib/arrow-glib/codec.h b/c_glib/arrow-glib/codec.h index 5feab2b7d4d1..6e177af9eede 100644 --- a/c_glib/arrow-glib/codec.h +++ b/c_glib/arrow-glib/codec.h @@ -20,6 +20,7 @@ #pragma once #include +#include G_BEGIN_DECLS @@ -63,5 +64,11 @@ GArrowCodec *garrow_codec_new(GArrowCompressionType type, GError **error); const gchar *garrow_codec_get_name(GArrowCodec *codec); +GARROW_AVAILABLE_IN_2_0 +GArrowCompressionType +garrow_codec_get_compression_type(GArrowCodec *codec); +GARROW_AVAILABLE_IN_2_0 +gint +garrow_codec_get_compression_level(GArrowCodec *codec); G_END_DECLS diff --git a/c_glib/arrow-glib/codec.hpp b/c_glib/arrow-glib/codec.hpp index 14c3ad77ccf1..f4cfaba18a00 100644 --- a/c_glib/arrow-glib/codec.hpp +++ b/c_glib/arrow-glib/codec.hpp @@ -29,6 +29,6 @@ arrow::Compression::type garrow_compression_type_to_raw(GArrowCompressionType type); GArrowCodec * -garrow_codec_new_raw(arrow::util::Codec *arrow_codec); -arrow::util::Codec * +garrow_codec_new_raw(std::shared_ptr *arrow_codec); +std::shared_ptr garrow_codec_get_raw(GArrowCodec *codec); diff --git a/c_glib/arrow-glib/input-stream.cpp b/c_glib/arrow-glib/input-stream.cpp index 3751d41ad3ab..84904b742658 100644 --- a/c_glib/arrow-glib/input-stream.cpp +++ b/c_glib/arrow-glib/input-stream.cpp @@ -1132,7 +1132,7 @@ garrow_compressed_input_stream_new(GArrowCodec *codec, GArrowInputStream *raw, GError **error) { - auto arrow_codec = garrow_codec_get_raw(codec); + auto arrow_codec = garrow_codec_get_raw(codec).get(); auto arrow_raw = garrow_input_stream_get_raw(raw); auto arrow_stream = arrow::io::CompressedInputStream::Make(arrow_codec, arrow_raw); diff --git a/c_glib/arrow-glib/ipc-options.cpp b/c_glib/arrow-glib/ipc-options.cpp index 1cddd25bb6de..b9b2c4143486 100644 --- a/c_glib/arrow-glib/ipc-options.cpp +++ b/c_glib/arrow-glib/ipc-options.cpp @@ -21,6 +21,7 @@ # include #endif +#include #include #include @@ -242,6 +243,7 @@ garrow_read_options_set_included_fields(GArrowReadOptions *options, typedef struct GArrowWriteOptionsPrivate_ { arrow::ipc::IpcWriteOptions options; + GArrowCodec *codec; } GArrowWriteOptionsPrivate; enum { @@ -249,8 +251,7 @@ enum { PROP_WRITE_OPTIONS_MAX_RECURSION_DEPTH, PROP_WRITE_OPTIONS_ALIGNMENT, PROP_WRITE_OPTIONS_WRITE_LEGACY_IPC_FORMAT, - PROP_WRITE_OPTIONS_COMPRESSION, - PROP_WRITE_OPTIONS_COMPRESSION_LEVEL, + PROP_WRITE_OPTIONS_CODEC, PROP_WRITE_OPTIONS_USE_THREADS, }; @@ -263,6 +264,19 @@ G_DEFINE_TYPE_WITH_PRIVATE(GArrowWriteOptions, garrow_write_options_get_instance_private( \ GARROW_WRITE_OPTIONS(obj))) +static void +garrow_write_options_dispose(GObject *object) +{ + auto priv = GARROW_WRITE_OPTIONS_GET_PRIVATE(object); + + if (priv->codec) { + g_object_unref(priv->codec); + priv->codec = NULL; + } + + G_OBJECT_CLASS(garrow_write_options_parent_class)->dispose(object); +} + static void garrow_write_options_finalize(GObject *object) { @@ -294,12 +308,12 @@ garrow_write_options_set_property(GObject *object, case PROP_WRITE_OPTIONS_WRITE_LEGACY_IPC_FORMAT: priv->options.write_legacy_ipc_format = g_value_get_boolean(value); break; - case PROP_WRITE_OPTIONS_COMPRESSION: - priv->options.compression = - static_cast(g_value_get_enum(value)); - break; - case PROP_WRITE_OPTIONS_COMPRESSION_LEVEL: - priv->options.compression_level = g_value_get_int(value); + case PROP_WRITE_OPTIONS_CODEC: + if (priv->codec) { + g_object_unref(priv->codec); + } + priv->codec = GARROW_CODEC(g_value_dup_object(value)); + priv->options.codec = garrow_codec_get_raw(priv->codec); break; case PROP_WRITE_OPTIONS_USE_THREADS: priv->options.use_threads = g_value_get_boolean(value); @@ -331,11 +345,8 @@ garrow_write_options_get_property(GObject *object, case PROP_WRITE_OPTIONS_WRITE_LEGACY_IPC_FORMAT: g_value_set_boolean(value, priv->options.write_legacy_ipc_format); break; - case PROP_WRITE_OPTIONS_COMPRESSION: - g_value_set_enum(value, priv->options.compression); - break; - case PROP_WRITE_OPTIONS_COMPRESSION_LEVEL: - g_value_set_int(value, priv->options.compression_level); + case PROP_WRITE_OPTIONS_CODEC: + g_value_set_object(value, priv->codec); break; case PROP_WRITE_OPTIONS_USE_THREADS: g_value_set_boolean(value, priv->options.use_threads); @@ -352,6 +363,11 @@ garrow_write_options_init(GArrowWriteOptions *object) auto priv = GARROW_WRITE_OPTIONS_GET_PRIVATE(object); new(&priv->options) arrow::ipc::IpcWriteOptions; priv->options = arrow::ipc::IpcWriteOptions::Defaults(); + if (priv->options.codec) { + priv->codec = garrow_codec_new_raw(&(priv->options.codec)); + } else { + priv->codec = NULL; + } } static void @@ -359,6 +375,7 @@ garrow_write_options_class_init(GArrowWriteOptionsClass *klass) { auto gobject_class = G_OBJECT_CLASS(klass); + gobject_class->dispose = garrow_write_options_dispose; gobject_class->finalize = garrow_write_options_finalize; gobject_class->set_property = garrow_write_options_set_property; gobject_class->get_property = garrow_write_options_get_property; @@ -441,42 +458,24 @@ garrow_write_options_class_init(GArrowWriteOptionsClass *klass) spec); /** - * GArrowWriteOptions:compression: + * GArrowWriteOptions:codec: * * Codec to use for compressing and decompressing record batch body * buffers. This is not part of the Arrow IPC protocol and only for - * internal use (e.g. Feather files). May only be LZ4_FRAME and - * ZSTD. + * internal use (e.g. Feather files). * - * Since: 1.0.0 - */ - spec = g_param_spec_enum("compression", - "Compression", - "Codec to use for " - "compressing record batch body buffers.", - GARROW_TYPE_COMPRESSION_TYPE, - options.compression, - static_cast(G_PARAM_READWRITE)); - g_object_class_install_property(gobject_class, - PROP_WRITE_OPTIONS_COMPRESSION, - spec); - - /** - * GArrowWriteOptions:compression-level: - * - * The level for compression. + * May only be UNCOMPRESSED, LZ4_FRAME and ZSTD. * - * Since: 1.0.0 + * Since: 2.0.0 */ - spec = g_param_spec_int("compression-level", - "Compression level", - "The level for compression", - G_MININT, - G_MAXINT, - options.compression_level, - static_cast(G_PARAM_READWRITE)); + spec = g_param_spec_object("codec", + "Codec", + "Codec to use for " + "compressing record batch body buffers.", + GARROW_TYPE_CODEC, + static_cast(G_PARAM_READWRITE)); g_object_class_install_property(gobject_class, - PROP_WRITE_OPTIONS_COMPRESSION_LEVEL, + PROP_WRITE_OPTIONS_CODEC, spec); /** diff --git a/c_glib/arrow-glib/output-stream.cpp b/c_glib/arrow-glib/output-stream.cpp index 2c3ccafdb138..1619bac45d4d 100644 --- a/c_glib/arrow-glib/output-stream.cpp +++ b/c_glib/arrow-glib/output-stream.cpp @@ -688,7 +688,7 @@ garrow_compressed_output_stream_new(GArrowCodec *codec, GArrowOutputStream *raw, GError **error) { - auto arrow_codec = garrow_codec_get_raw(codec); + auto arrow_codec = garrow_codec_get_raw(codec).get(); auto arrow_raw = garrow_output_stream_get_raw(raw); auto arrow_stream = arrow::io::CompressedOutputStream::Make(arrow_codec, arrow_raw); diff --git a/c_glib/test/test-codec.rb b/c_glib/test/test-codec.rb index 6617815df9b8..a32ec4dc7579 100644 --- a/c_glib/test/test-codec.rb +++ b/c_glib/test/test-codec.rb @@ -20,4 +20,14 @@ def test_name codec = Arrow::Codec.new(:gzip) assert_equal("gzip", codec.name) end + + def test_compression_type + codec = Arrow::Codec.new(:gzip) + assert_equal(Arrow::CompressionType::GZIP, codec.compression_type) + end + + def test_compression_level + codec = Arrow::Codec.new(:gzip) + assert_equal(9, codec.compression_level) + end end diff --git a/c_glib/test/test-write-options.rb b/c_glib/test/test-write-options.rb index d30b78b9cdbf..c528ce673d45 100644 --- a/c_glib/test/test-write-options.rb +++ b/c_glib/test/test-write-options.rb @@ -73,27 +73,15 @@ def test_accessor end end - sub_test_case("compression") do + sub_test_case("codec") do def test_default - assert_equal(Arrow::CompressionType::UNCOMPRESSED, - @options.compression) + assert_nil(@options.codec) end def test_accessor - @options.compression = :zstd - assert_equal(Arrow::CompressionType::ZSTD, - @options.compression) - end - end - - sub_test_case("compression-level") do - def test_default - assert_equal(-(2 ** 31), @options.compression_level) - end - - def test_accessor - @options.compression_level = 8 - assert_equal(8, @options.compression_level) + @options.codec = Arrow::Codec.new(:zstd) + assert_equal("zstd", + @options.codec.name) end end diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index e15de84b9bf2..8bd012183448 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -168,12 +168,12 @@ Result IpcFileFormat::ScanFile(std::shared_ptr op // std::shared_ptr IpcFileFormat::DefaultWriteOptions() { - std::shared_ptr options( + std::shared_ptr ipc_options( new IpcFileWriteOptions(shared_from_this())); - options->ipc_options = + ipc_options->options = std::make_shared(ipc::IpcWriteOptions::Defaults()); - return options; + return ipc_options; } Result> IpcFileFormat::MakeWriter( @@ -185,7 +185,13 @@ Result> IpcFileFormat::MakeWriter( auto ipc_options = checked_pointer_cast(options); - ARROW_ASSIGN_OR_RAISE(auto writer, ipc::MakeFileWriter(destination, schema)); + // override use_threads to avoid nested parallelism + ipc_options->options->use_threads = false; + + ARROW_ASSIGN_OR_RAISE(auto writer, + ipc::MakeFileWriter(destination, schema, *ipc_options->options, + ipc_options->metadata)); + return std::shared_ptr( new IpcFileWriter(std::move(writer), std::move(schema), std::move(ipc_options))); } diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h index 50650bfedb0c..2cdd837430e4 100644 --- a/cpp/src/arrow/dataset/file_ipc.h +++ b/cpp/src/arrow/dataset/file_ipc.h @@ -66,7 +66,11 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat { class ARROW_DS_EXPORT IpcFileWriteOptions : public FileWriteOptions { public: - std::shared_ptr ipc_options; + /// Options passed to ipc::MakeFileWriter. use_threads is ignored + std::shared_ptr options; + + /// custom_metadata written to the file's footer + std::shared_ptr metadata; protected: using FileWriteOptions::FileWriteOptions; diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index 808bae568b4f..25bbe559cf40 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -28,11 +28,13 @@ #include "arrow/dataset/partition.h" #include "arrow/dataset/test_util.h" #include "arrow/io/memory.h" +#include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" #include "arrow/record_batch.h" #include "arrow/table.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/util.h" +#include "arrow/util/key_value_metadata.h" namespace arrow { namespace dataset { @@ -166,6 +168,35 @@ TEST_F(TestIpcFileFormat, WriteRecordBatchReader) { AssertBufferEqual(*written, *source->buffer()); } +TEST_F(TestIpcFileFormat, WriteRecordBatchReaderCustomOptions) { + std::shared_ptr reader = GetRecordBatchReader(); + auto source = GetFileSource(reader.get()); + reader = GetRecordBatchReader(); + + opts_ = ScanOptions::Make(reader->schema()); + + EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink()); + + auto ipc_options = + checked_pointer_cast(format_->DefaultWriteOptions()); + if (util::Codec::IsAvailable(Compression::ZSTD)) { + EXPECT_OK_AND_ASSIGN(ipc_options->options->codec, + util::Codec::Create(Compression::ZSTD)); + } + ipc_options->metadata = key_value_metadata({{"hello", "world"}}); + EXPECT_OK_AND_ASSIGN(auto writer, + format_->MakeWriter(sink, reader->schema(), ipc_options)); + ASSERT_OK(writer->Write(reader.get())); + ASSERT_OK(writer->Finish()); + + EXPECT_OK_AND_ASSIGN(auto written, sink->Finish()); + EXPECT_OK_AND_ASSIGN(auto ipc_reader, ipc::RecordBatchFileReader::Open( + std::make_shared(written))); + + EXPECT_EQ(ipc_reader->metadata()->sorted_pairs(), + ipc_options->metadata->sorted_pairs()); +} + class TestIpcFileSystemDataset : public testing::Test, public WriteFileSystemDatasetMixin { public: diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 017e545ff23d..019416041aa2 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -114,7 +114,7 @@ Status ScannerBuilder::Project(std::vector columns) { Status ScannerBuilder::Filter(std::shared_ptr filter) { RETURN_NOT_OK(schema()->CanReferenceFieldsByNames(FieldsInExpression(*filter))); - RETURN_NOT_OK(filter->Validate(*schema()).status()); + RETURN_NOT_OK(filter->Validate(*schema())); scan_options_->filter = std::move(filter); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc index 5aa46012d6b8..5ce8885341c0 100644 --- a/cpp/src/arrow/ipc/feather.cc +++ b/cpp/src/arrow/ipc/feather.cc @@ -795,8 +795,9 @@ Status WriteTable(const Table& table, io::OutputStream* dst, return WriteFeatherV1(table, dst); } else { IpcWriteOptions ipc_options = IpcWriteOptions::Defaults(); - ipc_options.compression = properties.compression; - ipc_options.compression_level = properties.compression_level; + ARROW_ASSIGN_OR_RAISE( + ipc_options.codec, + util::Codec::Create(properties.compression, properties.compression_level)); std::shared_ptr writer; ARROW_ASSIGN_OR_RAISE(writer, MakeFileWriter(dst, table.schema(), ipc_options)); diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc index 9c967a5423db..a82aef328d6f 100644 --- a/cpp/src/arrow/ipc/metadata_internal.cc +++ b/cpp/src/arrow/ipc/metadata_internal.cc @@ -903,15 +903,15 @@ static Status WriteBuffers(FBB& fbb, const std::vector& buffers, static Status GetBodyCompression(FBB& fbb, const IpcWriteOptions& options, BodyCompressionOffset* out) { - if (options.compression != Compression::UNCOMPRESSED) { + if (options.codec != nullptr) { flatbuf::CompressionType codec; - if (options.compression == Compression::LZ4_FRAME) { + if (options.codec->compression_type() == Compression::LZ4_FRAME) { codec = flatbuf::CompressionType::LZ4_FRAME; - } else if (options.compression == Compression::ZSTD) { + } else if (options.codec->compression_type() == Compression::ZSTD) { codec = flatbuf::CompressionType::ZSTD; } else { return Status::Invalid("Unsupported IPC compression codec: ", - util::Codec::GetCodecAsString(options.compression)); + options.codec->name()); } *out = flatbuf::CreateBodyCompression(fbb, codec, flatbuf::BodyCompressionMethod::BUFFER); diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h index 6bbd7b87de23..bf535cdacf3a 100644 --- a/cpp/src/arrow/ipc/options.h +++ b/cpp/src/arrow/ipc/options.h @@ -59,8 +59,7 @@ struct ARROW_EXPORT IpcWriteOptions { /// \brief Compression codec to use for record batch body buffers /// /// May only be UNCOMPRESSED, LZ4_FRAME and ZSTD. - Compression::type compression = Compression::UNCOMPRESSED; - int compression_level = Compression::kUseDefaultCompressionLevel; + std::shared_ptr codec; /// \brief Use global CPU thread pool to parallelize any computational tasks /// like compression diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 5d11c275082a..85f6e39d05ba 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -619,7 +619,7 @@ TEST_F(TestWriteRecordBatch, WriteWithCompression) { continue; } IpcWriteOptions write_options = IpcWriteOptions::Defaults(); - write_options.compression = codec; + ASSERT_OK_AND_ASSIGN(write_options.codec, util::Codec::Create(codec)); CheckRoundtrip(*batch, write_options); // Check non-parallel read and write @@ -636,9 +636,9 @@ TEST_F(TestWriteRecordBatch, WriteWithCompression) { if (!util::Codec::IsAvailable(codec)) { continue; } - IpcWriteOptions options = IpcWriteOptions::Defaults(); - options.compression = codec; - ASSERT_RAISES(Invalid, SerializeRecordBatch(*batch, options)); + IpcWriteOptions write_options = IpcWriteOptions::Defaults(); + ASSERT_OK_AND_ASSIGN(write_options.codec, util::Codec::Create(codec)); + ASSERT_RAISES(Invalid, SerializeRecordBatch(*batch, write_options)); } } diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 9f9be32e2468..adce911e1021 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -185,17 +185,13 @@ class RecordBatchSerializer { } Status CompressBodyBuffers() { - std::unique_ptr codec; - - RETURN_NOT_OK(internal::CheckCompressionSupported(options_.compression)); - - ARROW_ASSIGN_OR_RAISE( - codec, util::Codec::Create(options_.compression, options_.compression_level)); + RETURN_NOT_OK( + internal::CheckCompressionSupported(options_.codec->compression_type())); auto CompressOne = [&](size_t i) { if (out_->body_buffers[i]->size() > 0) { - RETURN_NOT_OK( - CompressBuffer(*out_->body_buffers[i], codec.get(), &out_->body_buffers[i])); + RETURN_NOT_OK(CompressBuffer(*out_->body_buffers[i], options_.codec.get(), + &out_->body_buffers[i])); } return Status::OK(); }; @@ -216,7 +212,7 @@ class RecordBatchSerializer { RETURN_NOT_OK(VisitArray(*batch.column(i))); } - if (options_.compression != Compression::UNCOMPRESSED) { + if (options_.codec != nullptr) { RETURN_NOT_OK(CompressBodyBuffers()); } @@ -227,8 +223,7 @@ class RecordBatchSerializer { buffer_meta_.reserve(out_->body_buffers.size()); // Construct the buffer metadata for the record batch header - for (size_t i = 0; i < out_->body_buffers.size(); ++i) { - const Buffer* buffer = out_->body_buffers[i].get(); + for (const auto& buffer : out_->body_buffers) { int64_t size = 0; int64_t padding = 0; diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index feba8ba83d9e..fe90c37c0555 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -464,3 +464,15 @@ class ARROW_TESTING_EXPORT EnvVarGuard { #endif } // namespace arrow + +namespace nonstd { +namespace sv_lite { + +// Without this hint, GTest will print string_views as a container of char +template > +void PrintTo(const basic_string_view& view, std::ostream* os) { + *os << view; +} + +} // namespace sv_lite +} // namespace nonstd diff --git a/cpp/src/arrow/util/compression.cc b/cpp/src/arrow/util/compression.cc index d05bd0f66c03..f9c084f6c266 100644 --- a/cpp/src/arrow/util/compression.cc +++ b/cpp/src/arrow/util/compression.cc @@ -24,160 +24,156 @@ #include "arrow/result.h" #include "arrow/status.h" #include "arrow/util/compression_internal.h" +#include "arrow/util/logging.h" namespace arrow { namespace util { -Compressor::~Compressor() {} - -Decompressor::~Decompressor() {} - -Codec::~Codec() {} - int Codec::UseDefaultCompressionLevel() { return kUseDefaultCompressionLevel; } Status Codec::Init() { return Status::OK(); } -std::string Codec::GetCodecAsString(Compression::type t) { +const std::string& Codec::GetCodecAsString(Compression::type t) { + static const std::string uncompressed = "uncompressed", snappy = "snappy", + gzip = "gzip", lzo = "lzo", brotli = "brotli", + lz4_raw = "lz4_raw", lz4 = "lz4", lz4_hadoop = "lz4_hadoop", + zstd = "zstd", bz2 = "bz2", unknown = "unknown"; + switch (t) { case Compression::UNCOMPRESSED: - return "UNCOMPRESSED"; + return uncompressed; case Compression::SNAPPY: - return "SNAPPY"; + return snappy; case Compression::GZIP: - return "GZIP"; + return gzip; case Compression::LZO: - return "LZO"; + return lzo; case Compression::BROTLI: - return "BROTLI"; + return brotli; case Compression::LZ4: - return "LZ4_RAW"; + return lz4_raw; case Compression::LZ4_FRAME: - return "LZ4"; + return lz4; case Compression::LZ4_HADOOP: - return "LZ4_HADOOP"; + return lz4_hadoop; case Compression::ZSTD: - return "ZSTD"; + return zstd; case Compression::BZ2: - return "BZ2"; + return bz2; default: - return "UNKNOWN"; + return unknown; } } Result Codec::GetCompressionType(const std::string& name) { - if (name == "UNCOMPRESSED") { + if (name == "uncompressed") { return Compression::UNCOMPRESSED; - } else if (name == "GZIP") { + } else if (name == "gzip") { return Compression::GZIP; - } else if (name == "SNAPPY") { + } else if (name == "snappy") { return Compression::SNAPPY; - } else if (name == "LZO") { + } else if (name == "lzo") { return Compression::LZO; - } else if (name == "BROTLI") { + } else if (name == "brotli") { return Compression::BROTLI; - } else if (name == "LZ4_RAW") { + } else if (name == "lz4_raw") { return Compression::LZ4; - } else if (name == "LZ4") { + } else if (name == "lz4") { return Compression::LZ4_FRAME; - } else if (name == "LZ4_HADOOP") { + } else if (name == "lz4_hadoop") { return Compression::LZ4_HADOOP; - } else if (name == "ZSTD") { + } else if (name == "zstd") { return Compression::ZSTD; - } else if (name == "BZ2") { + } else if (name == "bz2") { return Compression::BZ2; } else { return Status::Invalid("Unrecognized compression type: ", name); } } +bool Codec::SupportsCompressionLevel(Compression::type codec) { + switch (codec) { + case Compression::GZIP: + case Compression::BROTLI: + case Compression::ZSTD: + case Compression::BZ2: + return true; + default: + return false; + } +} + Result> Codec::Create(Compression::type codec_type, int compression_level) { + if (!IsAvailable(codec_type)) { + if (codec_type == Compression::LZO) { + return Status::NotImplemented("LZO codec not implemented"); + } + + auto name = GetCodecAsString(codec_type); + if (name == "unknown") { + return Status::Invalid("Unrecognized codec"); + } + + return Status::NotImplemented("Support for codec '", GetCodecAsString(codec_type), + "' not built"); + } + + if (compression_level != kUseDefaultCompressionLevel && + !SupportsCompressionLevel(codec_type)) { + return Status::Invalid("Codec '", GetCodecAsString(codec_type), + "' doesn't support setting a compression level."); + } + std::unique_ptr codec; - const bool compression_level_set{compression_level != kUseDefaultCompressionLevel}; switch (codec_type) { case Compression::UNCOMPRESSED: - if (compression_level_set) { - return Status::Invalid("Compression level cannot be specified for UNCOMPRESSED."); - } return nullptr; case Compression::SNAPPY: #ifdef ARROW_WITH_SNAPPY - if (compression_level_set) { - return Status::Invalid("Snappy doesn't support setting a compression level."); - } codec = internal::MakeSnappyCodec(); - break; -#else - return Status::NotImplemented("Snappy codec support not built"); #endif + break; case Compression::GZIP: #ifdef ARROW_WITH_ZLIB codec = internal::MakeGZipCodec(compression_level); - break; -#else - return Status::NotImplemented("Gzip codec support not built"); #endif - case Compression::LZO: - if (compression_level_set) { - return Status::Invalid("LZ0 doesn't support setting a compression level."); - } - return Status::NotImplemented("LZO codec not implemented"); + break; case Compression::BROTLI: #ifdef ARROW_WITH_BROTLI codec = internal::MakeBrotliCodec(compression_level); - break; -#else - return Status::NotImplemented("Brotli codec support not built"); #endif + break; case Compression::LZ4: #ifdef ARROW_WITH_LZ4 - if (compression_level_set) { - return Status::Invalid("LZ4 doesn't support setting a compression level."); - } codec = internal::MakeLz4RawCodec(); - break; -#else - return Status::NotImplemented("LZ4 codec support not built"); #endif + break; case Compression::LZ4_FRAME: #ifdef ARROW_WITH_LZ4 - if (compression_level_set) { - return Status::Invalid("LZ4 doesn't support setting a compression level."); - } codec = internal::MakeLz4FrameCodec(); - break; -#else - return Status::NotImplemented("LZ4 codec support not built"); #endif + break; case Compression::LZ4_HADOOP: #ifdef ARROW_WITH_LZ4 - if (compression_level_set) { - return Status::Invalid("LZ4 doesn't support setting a compression level."); - } codec = internal::MakeLz4HadoopRawCodec(); - break; -#else - return Status::NotImplemented("LZ4 codec support not built"); #endif + break; case Compression::ZSTD: #ifdef ARROW_WITH_ZSTD codec = internal::MakeZSTDCodec(compression_level); - break; -#else - return Status::NotImplemented("ZSTD codec support not built"); #endif + break; case Compression::BZ2: #ifdef ARROW_WITH_BZ2 codec = internal::MakeBZ2Codec(compression_level); - break; -#else - return Status::NotImplemented("BZ2 codec support not built"); #endif + break; default: - return Status::Invalid("Unrecognized codec"); + break; } + DCHECK_NE(codec, nullptr); RETURN_NOT_OK(codec->Init()); return std::move(codec); } diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h index 551ecbe4e219..d3e8e1e62f1e 100644 --- a/cpp/src/arrow/util/compression.h +++ b/cpp/src/arrow/util/compression.h @@ -54,7 +54,7 @@ constexpr int kUseDefaultCompressionLevel = Compression::kUseDefaultCompressionL /// class ARROW_EXPORT Compressor { public: - virtual ~Compressor(); + virtual ~Compressor() = default; struct CompressResult { int64_t bytes_read; @@ -96,7 +96,7 @@ class ARROW_EXPORT Compressor { /// class ARROW_EXPORT Decompressor { public: - virtual ~Decompressor(); + virtual ~Decompressor() = default; struct DecompressResult { // XXX is need_more_output necessary? (Brotli?) @@ -128,14 +128,14 @@ class ARROW_EXPORT Decompressor { /// \brief Compression codec class ARROW_EXPORT Codec { public: - virtual ~Codec(); + virtual ~Codec() = default; /// \brief Return special value to indicate that a codec implementation /// should use its default compression level static int UseDefaultCompressionLevel(); /// \brief Return a string name for compression type - static std::string GetCodecAsString(Compression::type t); + static const std::string& GetCodecAsString(Compression::type t); /// \brief Return compression type for name (all upper case) static Result GetCompressionType(const std::string& name); @@ -147,6 +147,9 @@ class ARROW_EXPORT Codec { /// \brief Return true if support for indicated codec has been enabled static bool IsAvailable(Compression::type codec); + /// \brief Return true if indicated codec supports setting a compression level + static bool SupportsCompressionLevel(Compression::type codec); + /// \brief One-shot decompression function /// /// output_buffer_len must be correct and therefore be obtained in advance. @@ -178,7 +181,14 @@ class ARROW_EXPORT Codec { /// \brief Create a streaming compressor instance virtual Result> MakeDecompressor() = 0; - virtual const char* name() const = 0; + /// \brief This Codec's compression type + virtual Compression::type compression_type() const = 0; + + /// \brief The name of this Codec's compression type + const std::string& name() const { return GetCodecAsString(compression_type()); } + + /// \brief This Codec's compression level, if applicable + virtual int compression_level() const { return UseDefaultCompressionLevel(); } private: /// \brief Initializes the codec's resources. diff --git a/cpp/src/arrow/util/compression_brotli.cc b/cpp/src/arrow/util/compression_brotli.cc index ca4f523a6570..4feabe23345a 100644 --- a/cpp/src/arrow/util/compression_brotli.cc +++ b/cpp/src/arrow/util/compression_brotli.cc @@ -38,8 +38,6 @@ namespace { class BrotliDecompressor : public Decompressor { public: - BrotliDecompressor() {} - ~BrotliDecompressor() override { if (state_ != nullptr) { BrotliDecoderDestroyInstance(state_); @@ -167,7 +165,7 @@ class BrotliCompressor : public Compressor { BrotliEncoderState* state_ = nullptr; private: - int compression_level_; + const int compression_level_; }; // ---------------------------------------------------------------------- @@ -175,11 +173,10 @@ class BrotliCompressor : public Compressor { class BrotliCodec : public Codec { public: - explicit BrotliCodec(int compression_level) { - compression_level_ = compression_level == kUseDefaultCompressionLevel - ? kBrotliDefaultCompressionLevel - : compression_level; - } + explicit BrotliCodec(int compression_level) + : compression_level_(compression_level == kUseDefaultCompressionLevel + ? kBrotliDefaultCompressionLevel + : compression_level) {} Result Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) override { @@ -224,10 +221,12 @@ class BrotliCodec : public Codec { return ptr; } - const char* name() const override { return "brotli"; } + Compression::type compression_type() const override { return Compression::BROTLI; } + + int compression_level() const override { return compression_level_; } private: - int compression_level_; + const int compression_level_; }; } // namespace diff --git a/cpp/src/arrow/util/compression_bz2.cc b/cpp/src/arrow/util/compression_bz2.cc index dfb266ab8700..8a8c1cb7a45d 100644 --- a/cpp/src/arrow/util/compression_bz2.cc +++ b/cpp/src/arrow/util/compression_bz2.cc @@ -262,7 +262,9 @@ class BZ2Codec : public Codec { return ptr; } - const char* name() const override { return "bz2"; } + Compression::type compression_type() const override { return Compression::BZ2; } + + int compression_level() const override { return compression_level_; } private: int compression_level_; diff --git a/cpp/src/arrow/util/compression_lz4.cc b/cpp/src/arrow/util/compression_lz4.cc index 17a8514611e3..365cd0f523e7 100644 --- a/cpp/src/arrow/util/compression_lz4.cc +++ b/cpp/src/arrow/util/compression_lz4.cc @@ -298,10 +298,10 @@ class Lz4FrameCodec : public Codec { return ptr; } - const char* name() const override { return "lz4"; } + Compression::type compression_type() const override { return Compression::LZ4_FRAME; } protected: - LZ4F_preferences_t prefs_; + const LZ4F_preferences_t prefs_; }; // ---------------------------------------------------------------------- @@ -348,7 +348,7 @@ class Lz4Codec : public Codec { "Try using LZ4 frame format instead."); } - const char* name() const override { return "lz4_raw"; } + Compression::type compression_type() const override { return Compression::LZ4; } }; // ---------------------------------------------------------------------- @@ -407,7 +407,7 @@ class Lz4HadoopCodec : public Lz4Codec { "Try using LZ4 frame format instead."); } - const char* name() const override { return "lz4_hadoop_raw"; } + Compression::type compression_type() const override { return Compression::LZ4_HADOOP; } protected: // Offset starting at which page data can be read/written diff --git a/cpp/src/arrow/util/compression_snappy.cc b/cpp/src/arrow/util/compression_snappy.cc index 9cd06dc4f02f..9b016874b56b 100644 --- a/cpp/src/arrow/util/compression_snappy.cc +++ b/cpp/src/arrow/util/compression_snappy.cc @@ -41,8 +41,6 @@ namespace { class SnappyCodec : public Codec { public: - SnappyCodec() {} - Result Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) override { size_t decompressed_size; @@ -87,7 +85,7 @@ class SnappyCodec : public Codec { return Status::NotImplemented("Streaming decompression unsupported with Snappy"); } - const char* name() const override { return "snappy"; } + Compression::type compression_type() const override { return Compression::SNAPPY; } }; } // namespace diff --git a/cpp/src/arrow/util/compression_test.cc b/cpp/src/arrow/util/compression_test.cc index 1df52364a724..8b184f2dc41b 100644 --- a/cpp/src/arrow/util/compression_test.cc +++ b/cpp/src/arrow/util/compression_test.cc @@ -320,30 +320,30 @@ class CodecTest : public ::testing::TestWithParam { }; TEST(TestCodecMisc, GetCodecAsString) { - ASSERT_EQ("UNCOMPRESSED", Codec::GetCodecAsString(Compression::UNCOMPRESSED)); - ASSERT_EQ("SNAPPY", Codec::GetCodecAsString(Compression::SNAPPY)); - ASSERT_EQ("GZIP", Codec::GetCodecAsString(Compression::GZIP)); - ASSERT_EQ("LZO", Codec::GetCodecAsString(Compression::LZO)); - ASSERT_EQ("BROTLI", Codec::GetCodecAsString(Compression::BROTLI)); - ASSERT_EQ("LZ4_RAW", Codec::GetCodecAsString(Compression::LZ4)); - ASSERT_EQ("LZ4", Codec::GetCodecAsString(Compression::LZ4_FRAME)); - ASSERT_EQ("ZSTD", Codec::GetCodecAsString(Compression::ZSTD)); - ASSERT_EQ("BZ2", Codec::GetCodecAsString(Compression::BZ2)); + EXPECT_EQ(Codec::GetCodecAsString(Compression::UNCOMPRESSED), "uncompressed"); + EXPECT_EQ(Codec::GetCodecAsString(Compression::SNAPPY), "snappy"); + EXPECT_EQ(Codec::GetCodecAsString(Compression::GZIP), "gzip"); + EXPECT_EQ(Codec::GetCodecAsString(Compression::LZO), "lzo"); + EXPECT_EQ(Codec::GetCodecAsString(Compression::BROTLI), "brotli"); + EXPECT_EQ(Codec::GetCodecAsString(Compression::LZ4), "lz4_raw"); + EXPECT_EQ(Codec::GetCodecAsString(Compression::LZ4_FRAME), "lz4"); + EXPECT_EQ(Codec::GetCodecAsString(Compression::ZSTD), "zstd"); + EXPECT_EQ(Codec::GetCodecAsString(Compression::BZ2), "bz2"); } TEST(TestCodecMisc, GetCompressionType) { - ASSERT_OK_AND_EQ(Compression::UNCOMPRESSED, Codec::GetCompressionType("UNCOMPRESSED")); - ASSERT_OK_AND_EQ(Compression::SNAPPY, Codec::GetCompressionType("SNAPPY")); - ASSERT_OK_AND_EQ(Compression::GZIP, Codec::GetCompressionType("GZIP")); - ASSERT_OK_AND_EQ(Compression::LZO, Codec::GetCompressionType("LZO")); - ASSERT_OK_AND_EQ(Compression::BROTLI, Codec::GetCompressionType("BROTLI")); - ASSERT_OK_AND_EQ(Compression::LZ4, Codec::GetCompressionType("LZ4_RAW")); - ASSERT_OK_AND_EQ(Compression::LZ4_FRAME, Codec::GetCompressionType("LZ4")); - ASSERT_OK_AND_EQ(Compression::ZSTD, Codec::GetCompressionType("ZSTD")); - ASSERT_OK_AND_EQ(Compression::BZ2, Codec::GetCompressionType("BZ2")); + ASSERT_OK_AND_EQ(Compression::UNCOMPRESSED, Codec::GetCompressionType("uncompressed")); + ASSERT_OK_AND_EQ(Compression::SNAPPY, Codec::GetCompressionType("snappy")); + ASSERT_OK_AND_EQ(Compression::GZIP, Codec::GetCompressionType("gzip")); + ASSERT_OK_AND_EQ(Compression::LZO, Codec::GetCompressionType("lzo")); + ASSERT_OK_AND_EQ(Compression::BROTLI, Codec::GetCompressionType("brotli")); + ASSERT_OK_AND_EQ(Compression::LZ4, Codec::GetCompressionType("lz4_raw")); + ASSERT_OK_AND_EQ(Compression::LZ4_FRAME, Codec::GetCompressionType("lz4")); + ASSERT_OK_AND_EQ(Compression::ZSTD, Codec::GetCompressionType("zstd")); + ASSERT_OK_AND_EQ(Compression::BZ2, Codec::GetCompressionType("bz2")); ASSERT_RAISES(Invalid, Codec::GetCompressionType("unk")); - ASSERT_RAISES(Invalid, Codec::GetCompressionType("snappy")); + ASSERT_RAISES(Invalid, Codec::GetCompressionType("SNAPPY")); } TEST_P(CodecTest, CodecRoundtrip) { diff --git a/cpp/src/arrow/util/compression_zlib.cc b/cpp/src/arrow/util/compression_zlib.cc index 717bbc61d468..84e517e2632f 100644 --- a/cpp/src/arrow/util/compression_zlib.cc +++ b/cpp/src/arrow/util/compression_zlib.cc @@ -463,7 +463,9 @@ class GZipCodec : public Codec { return InitDecompressor(); } - const char* name() const override { return "gzip"; } + Compression::type compression_type() const override { return Compression::GZIP; } + + int compression_level() const override { return compression_level_; } private: // zlib is stateful and the z_stream state variable must be initialized diff --git a/cpp/src/arrow/util/compression_zstd.cc b/cpp/src/arrow/util/compression_zstd.cc index e7409604a4c1..382e0573b291 100644 --- a/cpp/src/arrow/util/compression_zstd.cc +++ b/cpp/src/arrow/util/compression_zstd.cc @@ -173,20 +173,19 @@ class ZSTDCompressor : public Compressor { class ZSTDCodec : public Codec { public: - explicit ZSTDCodec(int compression_level) { - compression_level_ = compression_level == kUseDefaultCompressionLevel - ? kZSTDDefaultCompressionLevel - : compression_level; - } + explicit ZSTDCodec(int compression_level) + : compression_level_(compression_level == kUseDefaultCompressionLevel + ? kZSTDDefaultCompressionLevel + : compression_level) {} Result Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) override { if (output_buffer == nullptr) { // We may pass a NULL 0-byte output buffer but some zstd versions demand // a valid pointer: https://github.com/facebook/zstd/issues/1385 - static uint8_t empty_buffer[1]; + static uint8_t empty_buffer; DCHECK_EQ(output_buffer_len, 0); - output_buffer = empty_buffer; + output_buffer = &empty_buffer; } size_t ret = ZSTD_decompress(output_buffer, static_cast(output_buffer_len), @@ -228,10 +227,12 @@ class ZSTDCodec : public Codec { return ptr; } - const char* name() const override { return "zstd"; } + Compression::type compression_type() const override { return Compression::ZSTD; } + + int compression_level() const override { return compression_level_; } private: - int compression_level_; + const int compression_level_; }; } // namespace diff --git a/cpp/src/arrow/util/key_value_metadata.h b/cpp/src/arrow/util/key_value_metadata.h index 2f6452256e3f..d4207a53dc43 100644 --- a/cpp/src/arrow/util/key_value_metadata.h +++ b/cpp/src/arrow/util/key_value_metadata.h @@ -51,10 +51,13 @@ class ARROW_EXPORT KeyValueMetadata { Status Set(const std::string& key, const std::string& value); void reserve(int64_t n); - int64_t size() const; + int64_t size() const; const std::string& key(int64_t i) const; const std::string& value(int64_t i) const; + const std::vector& keys() const { return keys_; } + const std::vector& values() const { return values_; } + std::vector> sorted_pairs() const; /// \brief Perform linear search for key, returning -1 if not found diff --git a/cpp/src/arrow/util/string.cc b/cpp/src/arrow/util/string.cc index 625086c39b28..691f10e2793f 100644 --- a/cpp/src/arrow/util/string.cc +++ b/cpp/src/arrow/util/string.cc @@ -144,6 +144,14 @@ std::string AsciiToLower(util::string_view value) { return result; } +std::string AsciiToUpper(util::string_view value) { + // TODO: ASCII validation + std::string result = std::string(value); + std::transform(result.begin(), result.end(), result.begin(), + [](unsigned char c) { return std::toupper(c); }); + return result; +} + util::optional Replace(util::string_view s, util::string_view token, util::string_view replacement) { size_t token_start = s.find(token); diff --git a/cpp/src/arrow/util/string.h b/cpp/src/arrow/util/string.h index 56a02dfb20b6..feb75d45b354 100644 --- a/cpp/src/arrow/util/string.h +++ b/cpp/src/arrow/util/string.h @@ -57,6 +57,9 @@ bool AsciiEqualsCaseInsensitive(util::string_view left, util::string_view right) ARROW_EXPORT std::string AsciiToLower(util::string_view value); +ARROW_EXPORT +std::string AsciiToUpper(util::string_view value); + /// \brief Search for the first instance of a token and replace it or return nullopt if /// the token is not found. ARROW_EXPORT diff --git a/cpp/src/parquet/printer.cc b/cpp/src/parquet/printer.cc index a667a4857ec8..224a19dda6b6 100644 --- a/cpp/src/parquet/printer.cc +++ b/cpp/src/parquet/printer.cc @@ -25,6 +25,7 @@ #include #include "arrow/util/key_value_metadata.h" +#include "arrow/util/string.h" #include "parquet/column_scanner.h" #include "parquet/exception.h" @@ -121,7 +122,9 @@ void ParquetFilePrinter::DebugPrint(std::ostream& stream, std::list selecte stream << " Statistics Not Set"; } stream << std::endl - << " Compression: " << Codec::GetCodecAsString(column_chunk->compression()) + << " Compression: " + << arrow::internal::AsciiToUpper( + Codec::GetCodecAsString(column_chunk->compression())) << ", Encodings:"; for (auto encoding : column_chunk->encodings()) { stream << " " << EncodingToString(encoding); @@ -256,7 +259,8 @@ void ParquetFilePrinter::JSONPrint(std::ostream& stream, std::list selected stream << "\"False\","; } stream << "\n \"Compression\": \"" - << Codec::GetCodecAsString(column_chunk->compression()) + << arrow::internal::AsciiToUpper( + Codec::GetCodecAsString(column_chunk->compression())) << "\", \"Encodings\": \""; for (auto encoding : column_chunk->encodings()) { stream << EncodingToString(encoding) << " "; diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index dee022f5ca70..afcfacce29bc 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1337,7 +1337,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: c_bool write_legacy_ipc_format CMemoryPool* memory_pool CMetadataVersion metadata_version - CCompressionType compression + shared_ptr[CCodec] codec c_bool use_threads @staticmethod @@ -2055,7 +2055,7 @@ cdef extern from 'arrow/util/compression.h' namespace 'arrow' nogil: CResult[int64_t] Compress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) - const char* name() const + c_string name() const int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index a12915ec6161..3fc098478d61 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -1557,25 +1557,6 @@ cdef CCompressionType _ensure_compression(str name) except *: raise ValueError('Invalid value for compression: {!r}'.format(name)) -cdef str _compression_name(CCompressionType ctype): - if ctype == CCompressionType_GZIP: - return 'gzip' - elif ctype == CCompressionType_BROTLI: - return 'brotli' - elif ctype == CCompressionType_BZ2: - return 'bz2' - elif ctype == CCompressionType_LZ4_FRAME: - return 'lz4' - elif ctype == CCompressionType_LZ4: - return 'lz4_raw' - elif ctype == CCompressionType_SNAPPY: - return 'snappy' - elif ctype == CCompressionType_ZSTD: - return 'zstd' - else: - raise RuntimeError('Unexpected CCompressionType value') - - cdef class Codec(_Weakrefable): """ Compression codec. diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index 74a81c60ecff..5c8194d10c7c 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -94,17 +94,18 @@ cdef class IpcWriteOptions(_Weakrefable): @property def compression(self): - if self.c_options.compression == CCompressionType_UNCOMPRESSED: + if self.c_options.codec == nullptr: return None else: - return _compression_name(self.c_options.compression) + return frombytes(self.c_options.codec.get().name()) @compression.setter def compression(self, value): if value is None: - self.c_options.compression = CCompressionType_UNCOMPRESSED + self.c_options.codec.reset() else: - self.c_options.compression = _ensure_compression(value) + self.c_options.codec = shared_ptr[CCodec](GetResultValue( + CCodec.Create(_ensure_compression(value))).release()) @property def use_threads(self): diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index a79cbe74fd8a..6210dd11943c 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -432,6 +432,14 @@ dataset___ParquetFileWriteOptions__update <- function(options, writer_props, arr invisible(.Call(`_arrow_dataset___ParquetFileWriteOptions__update` , options, writer_props, arrow_writer_props)) } +dataset___IpcFileWriteOptions__update2 <- function(ipc_options, use_legacy_format, codec, metadata_version){ + invisible(.Call(`_arrow_dataset___IpcFileWriteOptions__update2` , ipc_options, use_legacy_format, codec, metadata_version)) +} + +dataset___IpcFileWriteOptions__update1 <- function(ipc_options, use_legacy_format, metadata_version){ + invisible(.Call(`_arrow_dataset___IpcFileWriteOptions__update1` , ipc_options, use_legacy_format, metadata_version)) +} + dataset___IpcFileFormat__Make <- function(){ .Call(`_arrow_dataset___IpcFileFormat__Make` ) } diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R index ec3ac36f9e57..8300e415e2c9 100644 --- a/r/R/dataset-format.R +++ b/r/R/dataset-format.R @@ -139,6 +139,18 @@ FileWriteOptions <- R6Class("FileWriteOptions", inherit = ArrowObject, dataset___ParquetFileWriteOptions__update(self, ParquetWriterProperties$create(...), ParquetArrowWriterProperties$create(...)) + } else if (self$type == "ipc") { + args <- list(...) + if (is.null(args$codec)) { + dataset___IpcFileWriteOptions__update1(self, + get_ipc_use_legacy_format(args$use_legacy_format), + get_ipc_metadata_version(args$metadata_version)) + } else { + dataset___IpcFileWriteOptions__update2(self, + get_ipc_use_legacy_format(args$use_legacy_format), + args$codec, + get_ipc_metadata_version(args$metadata_version)) + } } invisible(self) } diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index abeb0ce4393d..cd30c3d38d08 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -44,7 +44,16 @@ #' @param filesystem A [FileSystem] where the dataset should be written if it is a #' string file path; default is the local file system #' @param ... additional format-specific arguments. For available Parquet -#' options, see [write_parquet()]. +#' options, see [write_parquet()]. The available Feather options are +#' - `use_legacy_format` logical: write data formatted so that Arrow libraries +#' versions 0.14 and lower can read it. Default is `FALSE`. You can also +#' enable this by setting the environment variable `ARROW_PRE_0_15_IPC_FORMAT=1`. +#' - `metadata_version`: A string like "V5" or the equivalent integer indicating +#' the Arrow IPC MetadataVersion. Default (NULL) will use the latest version, +#' unless the environment variable `ARROW_PRE_1_0_METADATA_VERSION=1`, in +#' which case it will be V4. +#' - `codec`: A [Codec] which will be used to compress body buffers of written +#' files. Default (NULL) will not compress body buffers. #' @return The input `dataset`, invisibly #' @export write_dataset <- function(dataset, diff --git a/r/R/record-batch-writer.R b/r/R/record-batch-writer.R index 4c4d0bb8703f..8b51603110d9 100644 --- a/r/R/record-batch-writer.R +++ b/r/R/record-batch-writer.R @@ -39,7 +39,7 @@ #' - `sink` An `OutputStream` #' - `schema` A [Schema] for the data to be written #' - `use_legacy_format` logical: write data formatted so that Arrow libraries -#' versions 0.14 and lower can read it? Default is `FALSE`. You can also +#' versions 0.14 and lower can read it. Default is `FALSE`. You can also #' enable this by setting the environment variable `ARROW_PRE_0_15_IPC_FORMAT=1`. #' - `metadata_version`: A string like "V5" or the equivalent integer indicating #' the Arrow IPC MetadataVersion. Default (NULL) will use the latest version, @@ -130,7 +130,6 @@ RecordBatchStreamWriter$create <- function(sink, call. = FALSE ) } - use_legacy_format <- use_legacy_format %||% identical(Sys.getenv("ARROW_PRE_0_15_IPC_FORMAT"), "1") assert_is(sink, "OutputStream") assert_is(schema, "Schema") @@ -138,7 +137,7 @@ RecordBatchStreamWriter$create <- function(sink, ipc___RecordBatchStreamWriter__Open( sink, schema, - isTRUE(use_legacy_format), + get_ipc_use_legacy_format(use_legacy_format), get_ipc_metadata_version(metadata_version) ) ) @@ -160,7 +159,6 @@ RecordBatchFileWriter$create <- function(sink, call. = FALSE ) } - use_legacy_format <- use_legacy_format %||% identical(Sys.getenv("ARROW_PRE_0_15_IPC_FORMAT"), "1") assert_is(sink, "OutputStream") assert_is(schema, "Schema") @@ -168,7 +166,7 @@ RecordBatchFileWriter$create <- function(sink, ipc___RecordBatchFileWriter__Open( sink, schema, - isTRUE(use_legacy_format), + get_ipc_use_legacy_format(use_legacy_format), get_ipc_metadata_version(metadata_version) ) ) @@ -196,3 +194,7 @@ get_ipc_metadata_version <- function(x) { } out } + +get_ipc_use_legacy_format <- function(x) { + isTRUE(x %||% identical(Sys.getenv("ARROW_PRE_0_15_IPC_FORMAT"), "1")) +} diff --git a/r/man/RecordBatchWriter.Rd b/r/man/RecordBatchWriter.Rd index 0422da6caaee..038653b9e241 100644 --- a/r/man/RecordBatchWriter.Rd +++ b/r/man/RecordBatchWriter.Rd @@ -23,7 +23,7 @@ factory methods instantiate the object and take the following arguments: \item \code{sink} An \code{OutputStream} \item \code{schema} A \link{Schema} for the data to be written \item \code{use_legacy_format} logical: write data formatted so that Arrow libraries -versions 0.14 and lower can read it? Default is \code{FALSE}. You can also +versions 0.14 and lower can read it. Default is \code{FALSE}. You can also enable this by setting the environment variable \code{ARROW_PRE_0_15_IPC_FORMAT=1}. \item \code{metadata_version}: A string like "V5" or the equivalent integer indicating the Arrow IPC MetadataVersion. Default (NULL) will use the latest version, diff --git a/r/man/write_dataset.Rd b/r/man/write_dataset.Rd index e12c4287266d..4f1c89ffeccf 100644 --- a/r/man/write_dataset.Rd +++ b/r/man/write_dataset.Rd @@ -46,7 +46,18 @@ will yield \verb{"part-0.feather", ...}.} string file path; default is the local file system} \item{...}{additional format-specific arguments. For available Parquet -options, see \code{\link[=write_parquet]{write_parquet()}}.} +options, see \code{\link[=write_parquet]{write_parquet()}}. The available Feather options are +\itemize{ +\item \code{use_legacy_format} logical: write data formatted so that Arrow libraries +versions 0.14 and lower can read it. Default is \code{FALSE}. You can also +enable this by setting the environment variable \code{ARROW_PRE_0_15_IPC_FORMAT=1}. +\item \code{metadata_version}: A string like "V5" or the equivalent integer indicating +the Arrow IPC MetadataVersion. Default (NULL) will use the latest version, +unless the environment variable \code{ARROW_PRE_1_0_METADATA_VERSION=1}, in +which case it will be V4. +\item \code{codec}: A \link{Codec} which will be used to compress body buffers of written +files. Default (NULL) will not compress body buffers. +}} } \value{ The input \code{dataset}, invisibly diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index d2f44654c268..c5983128efab 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1695,6 +1695,43 @@ extern "C" SEXP _arrow_dataset___ParquetFileWriteOptions__update(SEXP options_se } #endif +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +void dataset___IpcFileWriteOptions__update2(const std::shared_ptr& ipc_options, bool use_legacy_format, const std::shared_ptr& codec, arrow::ipc::MetadataVersion metadata_version); +extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update2(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP metadata_version_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type ipc_options(ipc_options_sexp); + arrow::r::Input::type use_legacy_format(use_legacy_format_sexp); + arrow::r::Input&>::type codec(codec_sexp); + arrow::r::Input::type metadata_version(metadata_version_sexp); + dataset___IpcFileWriteOptions__update2(ipc_options, use_legacy_format, codec, metadata_version); + return R_NilValue; +END_CPP11 +} +#else +extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update2(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP metadata_version_sexp){ + Rf_error("Cannot call dataset___IpcFileWriteOptions__update2(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +void dataset___IpcFileWriteOptions__update1(const std::shared_ptr& ipc_options, bool use_legacy_format, arrow::ipc::MetadataVersion metadata_version); +extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update1(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP metadata_version_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type ipc_options(ipc_options_sexp); + arrow::r::Input::type use_legacy_format(use_legacy_format_sexp); + arrow::r::Input::type metadata_version(metadata_version_sexp); + dataset___IpcFileWriteOptions__update1(ipc_options, use_legacy_format, metadata_version); + return R_NilValue; +END_CPP11 +} +#else +extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update1(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP metadata_version_sexp){ + Rf_error("Cannot call dataset___IpcFileWriteOptions__update1(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // dataset.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr dataset___IpcFileFormat__Make(); @@ -6354,6 +6391,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 3}, { "_arrow_dataset___FileWriteOptions__type_name", (DL_FUNC) &_arrow_dataset___FileWriteOptions__type_name, 1}, { "_arrow_dataset___ParquetFileWriteOptions__update", (DL_FUNC) &_arrow_dataset___ParquetFileWriteOptions__update, 3}, + { "_arrow_dataset___IpcFileWriteOptions__update2", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update2, 4}, + { "_arrow_dataset___IpcFileWriteOptions__update1", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update1, 3}, { "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC) &_arrow_dataset___IpcFileFormat__Make, 0}, { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 1}, { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 1}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index f4c3ed1d994c..8a88ab02f87f 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -21,6 +21,7 @@ #include #include +#include #include #include @@ -207,6 +208,24 @@ void dataset___ParquetFileWriteOptions__update( options->arrow_writer_properties = arrow_writer_props; } +// [[arrow::export]] +void dataset___IpcFileWriteOptions__update2( + const std::shared_ptr& ipc_options, bool use_legacy_format, + const std::shared_ptr& codec, + arrow::ipc::MetadataVersion metadata_version) { + ipc_options->options->write_legacy_ipc_format = use_legacy_format; + ipc_options->options->codec = codec; + ipc_options->options->metadata_version = metadata_version; +} + +// [[arrow::export]] +void dataset___IpcFileWriteOptions__update1( + const std::shared_ptr& ipc_options, bool use_legacy_format, + arrow::ipc::MetadataVersion metadata_version) { + ipc_options->options->write_legacy_ipc_format = use_legacy_format; + ipc_options->options->metadata_version = metadata_version; +} + // [[arrow::export]] std::shared_ptr dataset___IpcFileFormat__Make() { return std::make_shared(); diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index f33f8ad1def1..31e76981c614 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -177,7 +177,7 @@ test_that("dataset from URI", { test_that("Simple interface for datasets (custom ParquetFileFormat)", { ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()), format = FileFormat$create("parquet", dict_columns = c("chr"))) - expect_equivalent(ds$schema$GetFieldByName("chr")$type, dictionary()) + expect_type_equal(ds$schema$GetFieldByName("chr")$type, dictionary()) }) test_that("Hive partitioning", { @@ -943,10 +943,38 @@ test_that("Dataset writing: from RecordBatch", { ) }) +test_that("Writing a dataset: Ipc format options & compression", { + skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651 + ds <- open_dataset(csv_dir, partitioning = "part", format = "csv") + dst_dir <- make_temp_dir() + + codec <- NULL + if (codec_is_available("zstd")) { + codec <- Codec$create("zstd") + } + + write_dataset(ds, dst_dir, format = "feather", codec = codec) + expect_true(dir.exists(dst_dir)) + + new_ds <- open_dataset(dst_dir, format = "feather") + expect_equivalent( + new_ds %>% + select(string = chr, integer = int) %>% + filter(integer > 6 & integer < 11) %>% + collect() %>% + summarize(mean = mean(integer)), + df1 %>% + select(string = chr, integer = int) %>% + filter(integer > 6) %>% + summarize(mean = mean(integer)) + ) +}) + test_that("Writing a dataset: Parquet format options", { skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651 ds <- open_dataset(csv_dir, partitioning = "part", format = "csv") dst_dir <- make_temp_dir() + dst_dir_no_truncated_timestamps <- make_temp_dir() # Use trace() to confirm that options are passed in trace( @@ -956,7 +984,7 @@ test_that("Writing a dataset: Parquet format options", { where = write_dataset ) expect_warning( - write_dataset(ds, make_temp_dir(), format = "parquet", partitioning = "int"), + write_dataset(ds, dst_dir_no_truncated_timestamps, format = "parquet", partitioning = "int"), "allow_truncated_timestamps == FALSE" ) expect_warning( diff --git a/r/tests/testthat/test-record-batch-reader.R b/r/tests/testthat/test-record-batch-reader.R index e03664e82f76..d9c34068425d 100644 --- a/r/tests/testthat/test-record-batch-reader.R +++ b/r/tests/testthat/test-record-batch-reader.R @@ -82,7 +82,7 @@ test_that("MetadataFormat", { Sys.setenv(ARROW_PRE_0_15_IPC_FORMAT = 1) expect_identical(get_ipc_metadata_version(NULL), 3L) Sys.setenv(ARROW_PRE_0_15_IPC_FORMAT = "") - + expect_identical(get_ipc_metadata_version(NULL), 4L) Sys.setenv(ARROW_PRE_1_0_METADATA_VERSION = 1) expect_identical(get_ipc_metadata_version(NULL), 3L)