From 47c4c3ddd22ae96bac318bc735084379f3abef8e Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 7 Oct 2020 12:04:35 -0400 Subject: [PATCH 01/14] ARROW-8296: [C++][Dataset] Add IpcFileWriteOptions --- cpp/src/arrow/dataset/file_ipc.cc | 14 ++++++--- cpp/src/arrow/dataset/file_ipc.h | 6 +++- cpp/src/arrow/dataset/file_ipc_test.cc | 27 ++++++++++++++++++ cpp/src/arrow/util/key_value_metadata.h | 5 +++- r/R/arrowExports.R | 8 ++++++ r/R/dataset-format.R | 6 ++++ r/R/record-batch-reader.R | 1 + r/R/record-batch-writer.R | 10 ++++--- r/src/arrowExports.cpp | 36 +++++++++++++++++++++++ r/src/arrow_metadata.h | 38 +++++++++++++++++++++++++ r/src/dataset.cpp | 12 ++++++++ r/src/recordbatchreader.cpp | 8 ++++++ r/tests/testthat/test-dataset.R | 19 +++++++++++-- 13 files changed, 178 insertions(+), 12 deletions(-) create mode 100644 r/src/arrow_metadata.h diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index e15de84b9bf2..8bd012183448 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -168,12 +168,12 @@ Result IpcFileFormat::ScanFile(std::shared_ptr op // std::shared_ptr IpcFileFormat::DefaultWriteOptions() { - std::shared_ptr options( + std::shared_ptr ipc_options( new IpcFileWriteOptions(shared_from_this())); - options->ipc_options = + ipc_options->options = std::make_shared(ipc::IpcWriteOptions::Defaults()); - return options; + return ipc_options; } Result> IpcFileFormat::MakeWriter( @@ -185,7 +185,13 @@ Result> IpcFileFormat::MakeWriter( auto ipc_options = checked_pointer_cast(options); - ARROW_ASSIGN_OR_RAISE(auto writer, ipc::MakeFileWriter(destination, schema)); + // override use_threads to avoid nested parallelism + ipc_options->options->use_threads = false; + + ARROW_ASSIGN_OR_RAISE(auto writer, + ipc::MakeFileWriter(destination, schema, *ipc_options->options, + ipc_options->metadata)); + return std::shared_ptr( new IpcFileWriter(std::move(writer), std::move(schema), std::move(ipc_options))); } diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h index 50650bfedb0c..2cdd837430e4 100644 --- a/cpp/src/arrow/dataset/file_ipc.h +++ b/cpp/src/arrow/dataset/file_ipc.h @@ -66,7 +66,11 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat { class ARROW_DS_EXPORT IpcFileWriteOptions : public FileWriteOptions { public: - std::shared_ptr ipc_options; + /// Options passed to ipc::MakeFileWriter. use_threads is ignored + std::shared_ptr options; + + /// custom_metadata written to the file's footer + std::shared_ptr metadata; protected: using FileWriteOptions::FileWriteOptions; diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index 808bae568b4f..218d3e1b50bd 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -28,11 +28,13 @@ #include "arrow/dataset/partition.h" #include "arrow/dataset/test_util.h" #include "arrow/io/memory.h" +#include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" #include "arrow/record_batch.h" #include "arrow/table.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/util.h" +#include "arrow/util/key_value_metadata.h" namespace arrow { namespace dataset { @@ -166,6 +168,31 @@ TEST_F(TestIpcFileFormat, WriteRecordBatchReader) { AssertBufferEqual(*written, *source->buffer()); } +TEST_F(TestIpcFileFormat, WriteRecordBatchReaderCustomOptions) { + std::shared_ptr reader = GetRecordBatchReader(); + auto source = GetFileSource(reader.get()); + reader = GetRecordBatchReader(); + + opts_ = ScanOptions::Make(reader->schema()); + + EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink()); + + auto ipc_options = + checked_pointer_cast(format_->DefaultWriteOptions()); + ipc_options->metadata = key_value_metadata({{"hello", "world"}}); + EXPECT_OK_AND_ASSIGN(auto writer, + format_->MakeWriter(sink, reader->schema(), ipc_options)); + ASSERT_OK(writer->Write(reader.get())); + ASSERT_OK(writer->Finish()); + + EXPECT_OK_AND_ASSIGN(auto written, sink->Finish()); + EXPECT_OK_AND_ASSIGN(auto ipc_reader, ipc::RecordBatchFileReader::Open( + std::make_shared(written))); + + EXPECT_EQ(ipc_reader->metadata()->sorted_pairs(), + ipc_options->metadata->sorted_pairs()); +} + class TestIpcFileSystemDataset : public testing::Test, public WriteFileSystemDatasetMixin { public: diff --git a/cpp/src/arrow/util/key_value_metadata.h b/cpp/src/arrow/util/key_value_metadata.h index 2f6452256e3f..d4207a53dc43 100644 --- a/cpp/src/arrow/util/key_value_metadata.h +++ b/cpp/src/arrow/util/key_value_metadata.h @@ -51,10 +51,13 @@ class ARROW_EXPORT KeyValueMetadata { Status Set(const std::string& key, const std::string& value); void reserve(int64_t n); - int64_t size() const; + int64_t size() const; const std::string& key(int64_t i) const; const std::string& value(int64_t i) const; + const std::vector& keys() const { return keys_; } + const std::vector& values() const { return values_; } + std::vector> sorted_pairs() const; /// \brief Perform linear search for key, returning -1 if not found diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index a79cbe74fd8a..29f13e853ea6 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -432,6 +432,10 @@ dataset___ParquetFileWriteOptions__update <- function(options, writer_props, arr invisible(.Call(`_arrow_dataset___ParquetFileWriteOptions__update` , options, writer_props, arrow_writer_props)) } +dataset___IpcFileWriteOptions__update <- function(ipc_options, use_legacy_format, metadata_version, metadata){ + invisible(.Call(`_arrow_dataset___IpcFileWriteOptions__update` , ipc_options, use_legacy_format, metadata_version, metadata)) +} + dataset___IpcFileFormat__Make <- function(){ .Call(`_arrow_dataset___IpcFileFormat__Make` ) } @@ -1384,6 +1388,10 @@ ipc___RecordBatchFileReader__num_record_batches <- function(reader){ .Call(`_arrow_ipc___RecordBatchFileReader__num_record_batches` , reader) } +ipc___RecordBatchFileReader__metadata <- function(reader){ + .Call(`_arrow_ipc___RecordBatchFileReader__metadata` , reader) +} + ipc___RecordBatchFileReader__ReadRecordBatch <- function(reader, i){ .Call(`_arrow_ipc___RecordBatchFileReader__ReadRecordBatch` , reader, i) } diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R index ec3ac36f9e57..5d6e99313c63 100644 --- a/r/R/dataset-format.R +++ b/r/R/dataset-format.R @@ -139,6 +139,12 @@ FileWriteOptions <- R6Class("FileWriteOptions", inherit = ArrowObject, dataset___ParquetFileWriteOptions__update(self, ParquetWriterProperties$create(...), ParquetArrowWriterProperties$create(...)) + } else if (self$type == "ipc") { + args <- list(...) + dataset___IpcFileWriteOptions__update(self, + get_ipc_use_legacy_format(args$use_legacy_format), + get_ipc_metadata_version(args$metadata_version), + prepare_key_value_metadata(args$metadata)) } invisible(self) } diff --git a/r/R/record-batch-reader.R b/r/R/record-batch-reader.R index 85ce839d0ce5..6cf53a654f83 100644 --- a/r/R/record-batch-reader.R +++ b/r/R/record-batch-reader.R @@ -140,6 +140,7 @@ RecordBatchFileReader <- R6Class("RecordBatchFileReader", inherit = ArrowObject, ), active = list( num_record_batches = function() ipc___RecordBatchFileReader__num_record_batches(self), + metadata = function() ipc___RecordBatchFileReader__metadata(self), schema = function() shared_ptr(Schema, ipc___RecordBatchFileReader__schema(self)) ) ) diff --git a/r/R/record-batch-writer.R b/r/R/record-batch-writer.R index 4c4d0bb8703f..1bc928b99bb6 100644 --- a/r/R/record-batch-writer.R +++ b/r/R/record-batch-writer.R @@ -130,7 +130,6 @@ RecordBatchStreamWriter$create <- function(sink, call. = FALSE ) } - use_legacy_format <- use_legacy_format %||% identical(Sys.getenv("ARROW_PRE_0_15_IPC_FORMAT"), "1") assert_is(sink, "OutputStream") assert_is(schema, "Schema") @@ -138,7 +137,7 @@ RecordBatchStreamWriter$create <- function(sink, ipc___RecordBatchStreamWriter__Open( sink, schema, - isTRUE(use_legacy_format), + get_ipc_use_legacy_format(use_legacy_format), get_ipc_metadata_version(metadata_version) ) ) @@ -160,7 +159,6 @@ RecordBatchFileWriter$create <- function(sink, call. = FALSE ) } - use_legacy_format <- use_legacy_format %||% identical(Sys.getenv("ARROW_PRE_0_15_IPC_FORMAT"), "1") assert_is(sink, "OutputStream") assert_is(schema, "Schema") @@ -168,7 +166,7 @@ RecordBatchFileWriter$create <- function(sink, ipc___RecordBatchFileWriter__Open( sink, schema, - isTRUE(use_legacy_format), + get_ipc_use_legacy_format(use_legacy_format), get_ipc_metadata_version(metadata_version) ) ) @@ -196,3 +194,7 @@ get_ipc_metadata_version <- function(x) { } out } + +get_ipc_use_legacy_format <- function(x) { + isTRUE(x %||% identical(Sys.getenv("ARROW_PRE_0_15_IPC_FORMAT"), "1")) +} diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index d2f44654c268..1547f1526a32 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1695,6 +1695,25 @@ extern "C" SEXP _arrow_dataset___ParquetFileWriteOptions__update(SEXP options_se } #endif +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +void dataset___IpcFileWriteOptions__update(const std::shared_ptr& ipc_options, bool use_legacy_format, arrow::ipc::MetadataVersion metadata_version, cpp11::strings metadata); +extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP metadata_version_sexp, SEXP metadata_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type ipc_options(ipc_options_sexp); + arrow::r::Input::type use_legacy_format(use_legacy_format_sexp); + arrow::r::Input::type metadata_version(metadata_version_sexp); + arrow::r::Input::type metadata(metadata_sexp); + dataset___IpcFileWriteOptions__update(ipc_options, use_legacy_format, metadata_version, metadata); + return R_NilValue; +END_CPP11 +} +#else +extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP metadata_version_sexp, SEXP metadata_sexp){ + Rf_error("Cannot call dataset___IpcFileWriteOptions__update(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // dataset.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr dataset___IpcFileFormat__Make(); @@ -5424,6 +5443,21 @@ extern "C" SEXP _arrow_ipc___RecordBatchFileReader__num_record_batches(SEXP read } #endif +// recordbatchreader.cpp +#if defined(ARROW_R_WITH_ARROW) +cpp11::strings ipc___RecordBatchFileReader__metadata(const std::shared_ptr& reader); +extern "C" SEXP _arrow_ipc___RecordBatchFileReader__metadata(SEXP reader_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type reader(reader_sexp); + return cpp11::as_sexp(ipc___RecordBatchFileReader__metadata(reader)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_ipc___RecordBatchFileReader__metadata(SEXP reader_sexp){ + Rf_error("Cannot call ipc___RecordBatchFileReader__metadata(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // recordbatchreader.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr ipc___RecordBatchFileReader__ReadRecordBatch(const std::shared_ptr& reader, int i); @@ -6354,6 +6388,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 3}, { "_arrow_dataset___FileWriteOptions__type_name", (DL_FUNC) &_arrow_dataset___FileWriteOptions__type_name, 1}, { "_arrow_dataset___ParquetFileWriteOptions__update", (DL_FUNC) &_arrow_dataset___ParquetFileWriteOptions__update, 3}, + { "_arrow_dataset___IpcFileWriteOptions__update", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update, 4}, { "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC) &_arrow_dataset___IpcFileFormat__Make, 0}, { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 1}, { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 1}, @@ -6592,6 +6627,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ipc___RecordBatchStreamReader__batches", (DL_FUNC) &_arrow_ipc___RecordBatchStreamReader__batches, 1}, { "_arrow_ipc___RecordBatchFileReader__schema", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__schema, 1}, { "_arrow_ipc___RecordBatchFileReader__num_record_batches", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__num_record_batches, 1}, + { "_arrow_ipc___RecordBatchFileReader__metadata", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__metadata, 1}, { "_arrow_ipc___RecordBatchFileReader__ReadRecordBatch", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__ReadRecordBatch, 2}, { "_arrow_ipc___RecordBatchFileReader__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__Open, 1}, { "_arrow_Table__from_RecordBatchFileReader", (DL_FUNC) &_arrow_Table__from_RecordBatchFileReader, 1}, diff --git a/r/src/arrow_metadata.h b/r/src/arrow_metadata.h new file mode 100644 index 000000000000..7bb2155ccaa2 --- /dev/null +++ b/r/src/arrow_metadata.h @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "./arrow_types.h" + +#if defined(ARROW_R_WITH_ARROW) +#include + +inline std::shared_ptr KeyValueMetadata__Make( + const cpp11::strings& metadata) { + // TODO(bkietz): Make this a custom conversion after + // https://github.com/r-lib/cpp11/pull/104 + return std::make_shared( + cpp11::as_cpp>(metadata.names()), + cpp11::as_cpp>(metadata)); +} + +inline cpp11::writable::strings KeyValueMetadata__as_vector( + const std::shared_ptr& metadata) { + cpp11::writable::strings metadata_vector(metadata->values()); + metadata_vector.names() = cpp11::writable::strings(metadata->keys()); + return metadata_vector; +} +#endif diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index f4c3ed1d994c..b766d11ebaf8 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -17,10 +17,13 @@ #include "./arrow_types.h" +#include "./arrow_metadata.h" + #if defined(ARROW_R_WITH_ARROW) #include #include +#include #include #include @@ -207,6 +210,15 @@ void dataset___ParquetFileWriteOptions__update( options->arrow_writer_properties = arrow_writer_props; } +// [[arrow::export]] +void dataset___IpcFileWriteOptions__update( + const std::shared_ptr& ipc_options, bool use_legacy_format, + arrow::ipc::MetadataVersion metadata_version, cpp11::strings metadata) { + ipc_options->options->write_legacy_ipc_format = use_legacy_format; + ipc_options->options->metadata_version = metadata_version; + ipc_options->metadata = KeyValueMetadata__Make(metadata); +} + // [[arrow::export]] std::shared_ptr dataset___IpcFileFormat__Make() { return std::make_shared(); diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp index 7ecb42002a90..433b779883b7 100644 --- a/r/src/recordbatchreader.cpp +++ b/r/src/recordbatchreader.cpp @@ -21,6 +21,8 @@ #include #include +#include "./arrow_metadata.h" + // [[arrow::export]] std::shared_ptr RecordBatchReader__schema( const std::shared_ptr& reader) { @@ -74,6 +76,12 @@ int ipc___RecordBatchFileReader__num_record_batches( return reader->num_record_batches(); } +// [[arrow::export]] +cpp11::strings ipc___RecordBatchFileReader__metadata( + const std::shared_ptr& reader) { + return KeyValueMetadata__as_vector(reader->metadata()); +} + // [[arrow::export]] std::shared_ptr ipc___RecordBatchFileReader__ReadRecordBatch( const std::shared_ptr& reader, int i) { diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index f33f8ad1def1..1c0ff7a5054c 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -177,7 +177,7 @@ test_that("dataset from URI", { test_that("Simple interface for datasets (custom ParquetFileFormat)", { ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()), format = FileFormat$create("parquet", dict_columns = c("chr"))) - expect_equivalent(ds$schema$GetFieldByName("chr")$type, dictionary()) + expect_type_equal(ds$schema$GetFieldByName("chr")$type, dictionary()) }) test_that("Hive partitioning", { @@ -943,10 +943,25 @@ test_that("Dataset writing: from RecordBatch", { ) }) +test_that("Writing a dataset: Ipc format options", { + skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651 + ds <- open_dataset(csv_dir, partitioning = "part", format = "csv") + dst_dir <- make_temp_dir() + + metadata <- c(hello = "world", eh = "!") + write_dataset(ds, dst_dir, format = "feather", + metadata = metadata) + expect_true(dir.exists(dst_dir)) + + file <- ds$filesystem$OpenInputStream(paste(dst_dir, dir(dst_dir)[[1]], sep = "/")) + expect_equivalent(metadata, RecordBatchFileReader$create(file)$metadata) +}) + test_that("Writing a dataset: Parquet format options", { skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651 ds <- open_dataset(csv_dir, partitioning = "part", format = "csv") dst_dir <- make_temp_dir() + dst_dir_no_truncated_timestamps <- make_temp_dir() # Use trace() to confirm that options are passed in trace( @@ -956,7 +971,7 @@ test_that("Writing a dataset: Parquet format options", { where = write_dataset ) expect_warning( - write_dataset(ds, make_temp_dir(), format = "parquet", partitioning = "int"), + write_dataset(ds, dst_dir_no_truncated_timestamps, format = "parquet", partitioning = "int"), "allow_truncated_timestamps == FALSE" ) expect_warning( From 848fa94508bf0f72db1b89f72fb3c8c34bb51ab3 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Thu, 8 Oct 2020 11:19:37 -0400 Subject: [PATCH 02/14] refactor IPC compression options, make Codec properties more regular --- cpp/src/arrow/ipc/feather.cc | 5 +- cpp/src/arrow/ipc/metadata_internal.cc | 8 +- cpp/src/arrow/ipc/options.h | 3 +- cpp/src/arrow/ipc/read_write_test.cc | 8 +- cpp/src/arrow/ipc/writer.cc | 17 +-- cpp/src/arrow/testing/gtest_util.h | 12 ++ cpp/src/arrow/util/compression.cc | 141 +++++++++++------------ cpp/src/arrow/util/compression.h | 24 ++-- cpp/src/arrow/util/compression_brotli.cc | 19 ++- cpp/src/arrow/util/compression_bz2.cc | 4 +- cpp/src/arrow/util/compression_lz4.cc | 8 +- cpp/src/arrow/util/compression_snappy.cc | 4 +- cpp/src/arrow/util/compression_test.cc | 38 +++--- cpp/src/arrow/util/compression_zlib.cc | 4 +- cpp/src/arrow/util/compression_zstd.cc | 19 +-- cpp/src/arrow/util/string.cc | 8 ++ cpp/src/arrow/util/string.h | 3 + cpp/src/parquet/printer.cc | 8 +- python/pyarrow/includes/libarrow.pxd | 4 +- python/pyarrow/io.pxi | 19 --- python/pyarrow/ipc.pxi | 9 +- 21 files changed, 186 insertions(+), 179 deletions(-) diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc index 5aa46012d6b8..5ce8885341c0 100644 --- a/cpp/src/arrow/ipc/feather.cc +++ b/cpp/src/arrow/ipc/feather.cc @@ -795,8 +795,9 @@ Status WriteTable(const Table& table, io::OutputStream* dst, return WriteFeatherV1(table, dst); } else { IpcWriteOptions ipc_options = IpcWriteOptions::Defaults(); - ipc_options.compression = properties.compression; - ipc_options.compression_level = properties.compression_level; + ARROW_ASSIGN_OR_RAISE( + ipc_options.codec, + util::Codec::Create(properties.compression, properties.compression_level)); std::shared_ptr writer; ARROW_ASSIGN_OR_RAISE(writer, MakeFileWriter(dst, table.schema(), ipc_options)); diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc index 9c967a5423db..a82aef328d6f 100644 --- a/cpp/src/arrow/ipc/metadata_internal.cc +++ b/cpp/src/arrow/ipc/metadata_internal.cc @@ -903,15 +903,15 @@ static Status WriteBuffers(FBB& fbb, const std::vector& buffers, static Status GetBodyCompression(FBB& fbb, const IpcWriteOptions& options, BodyCompressionOffset* out) { - if (options.compression != Compression::UNCOMPRESSED) { + if (options.codec != nullptr) { flatbuf::CompressionType codec; - if (options.compression == Compression::LZ4_FRAME) { + if (options.codec->compression_type() == Compression::LZ4_FRAME) { codec = flatbuf::CompressionType::LZ4_FRAME; - } else if (options.compression == Compression::ZSTD) { + } else if (options.codec->compression_type() == Compression::ZSTD) { codec = flatbuf::CompressionType::ZSTD; } else { return Status::Invalid("Unsupported IPC compression codec: ", - util::Codec::GetCodecAsString(options.compression)); + options.codec->name()); } *out = flatbuf::CreateBodyCompression(fbb, codec, flatbuf::BodyCompressionMethod::BUFFER); diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h index 6bbd7b87de23..bf535cdacf3a 100644 --- a/cpp/src/arrow/ipc/options.h +++ b/cpp/src/arrow/ipc/options.h @@ -59,8 +59,7 @@ struct ARROW_EXPORT IpcWriteOptions { /// \brief Compression codec to use for record batch body buffers /// /// May only be UNCOMPRESSED, LZ4_FRAME and ZSTD. - Compression::type compression = Compression::UNCOMPRESSED; - int compression_level = Compression::kUseDefaultCompressionLevel; + std::shared_ptr codec; /// \brief Use global CPU thread pool to parallelize any computational tasks /// like compression diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 5d11c275082a..85f6e39d05ba 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -619,7 +619,7 @@ TEST_F(TestWriteRecordBatch, WriteWithCompression) { continue; } IpcWriteOptions write_options = IpcWriteOptions::Defaults(); - write_options.compression = codec; + ASSERT_OK_AND_ASSIGN(write_options.codec, util::Codec::Create(codec)); CheckRoundtrip(*batch, write_options); // Check non-parallel read and write @@ -636,9 +636,9 @@ TEST_F(TestWriteRecordBatch, WriteWithCompression) { if (!util::Codec::IsAvailable(codec)) { continue; } - IpcWriteOptions options = IpcWriteOptions::Defaults(); - options.compression = codec; - ASSERT_RAISES(Invalid, SerializeRecordBatch(*batch, options)); + IpcWriteOptions write_options = IpcWriteOptions::Defaults(); + ASSERT_OK_AND_ASSIGN(write_options.codec, util::Codec::Create(codec)); + ASSERT_RAISES(Invalid, SerializeRecordBatch(*batch, write_options)); } } diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 9f9be32e2468..adce911e1021 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -185,17 +185,13 @@ class RecordBatchSerializer { } Status CompressBodyBuffers() { - std::unique_ptr codec; - - RETURN_NOT_OK(internal::CheckCompressionSupported(options_.compression)); - - ARROW_ASSIGN_OR_RAISE( - codec, util::Codec::Create(options_.compression, options_.compression_level)); + RETURN_NOT_OK( + internal::CheckCompressionSupported(options_.codec->compression_type())); auto CompressOne = [&](size_t i) { if (out_->body_buffers[i]->size() > 0) { - RETURN_NOT_OK( - CompressBuffer(*out_->body_buffers[i], codec.get(), &out_->body_buffers[i])); + RETURN_NOT_OK(CompressBuffer(*out_->body_buffers[i], options_.codec.get(), + &out_->body_buffers[i])); } return Status::OK(); }; @@ -216,7 +212,7 @@ class RecordBatchSerializer { RETURN_NOT_OK(VisitArray(*batch.column(i))); } - if (options_.compression != Compression::UNCOMPRESSED) { + if (options_.codec != nullptr) { RETURN_NOT_OK(CompressBodyBuffers()); } @@ -227,8 +223,7 @@ class RecordBatchSerializer { buffer_meta_.reserve(out_->body_buffers.size()); // Construct the buffer metadata for the record batch header - for (size_t i = 0; i < out_->body_buffers.size(); ++i) { - const Buffer* buffer = out_->body_buffers[i].get(); + for (const auto& buffer : out_->body_buffers) { int64_t size = 0; int64_t padding = 0; diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index feba8ba83d9e..fe90c37c0555 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -464,3 +464,15 @@ class ARROW_TESTING_EXPORT EnvVarGuard { #endif } // namespace arrow + +namespace nonstd { +namespace sv_lite { + +// Without this hint, GTest will print string_views as a container of char +template > +void PrintTo(const basic_string_view& view, std::ostream* os) { + *os << view; +} + +} // namespace sv_lite +} // namespace nonstd diff --git a/cpp/src/arrow/util/compression.cc b/cpp/src/arrow/util/compression.cc index d05bd0f66c03..a796c955645f 100644 --- a/cpp/src/arrow/util/compression.cc +++ b/cpp/src/arrow/util/compression.cc @@ -24,160 +24,151 @@ #include "arrow/result.h" #include "arrow/status.h" #include "arrow/util/compression_internal.h" +#include "arrow/util/logging.h" namespace arrow { namespace util { -Compressor::~Compressor() {} - -Decompressor::~Decompressor() {} - -Codec::~Codec() {} - int Codec::UseDefaultCompressionLevel() { return kUseDefaultCompressionLevel; } Status Codec::Init() { return Status::OK(); } -std::string Codec::GetCodecAsString(Compression::type t) { +util::string_view Codec::GetCodecAsString(Compression::type t) { switch (t) { case Compression::UNCOMPRESSED: - return "UNCOMPRESSED"; + return "uncompressed"; case Compression::SNAPPY: - return "SNAPPY"; + return "snappy"; case Compression::GZIP: - return "GZIP"; + return "gzip"; case Compression::LZO: - return "LZO"; + return "lzo"; case Compression::BROTLI: - return "BROTLI"; + return "brotli"; case Compression::LZ4: - return "LZ4_RAW"; + return "lz4_raw"; case Compression::LZ4_FRAME: - return "LZ4"; + return "lz4"; case Compression::LZ4_HADOOP: - return "LZ4_HADOOP"; + return "lz4_hadoop"; case Compression::ZSTD: - return "ZSTD"; + return "zstd"; case Compression::BZ2: - return "BZ2"; + return "bz2"; default: - return "UNKNOWN"; + return "unknown"; } } -Result Codec::GetCompressionType(const std::string& name) { - if (name == "UNCOMPRESSED") { +Result Codec::GetCompressionType(util::string_view name) { + if (name == "uncompressed") { return Compression::UNCOMPRESSED; - } else if (name == "GZIP") { + } else if (name == "gzip") { return Compression::GZIP; - } else if (name == "SNAPPY") { + } else if (name == "snappy") { return Compression::SNAPPY; - } else if (name == "LZO") { + } else if (name == "lzo") { return Compression::LZO; - } else if (name == "BROTLI") { + } else if (name == "brotli") { return Compression::BROTLI; - } else if (name == "LZ4_RAW") { + } else if (name == "lz4_raw") { return Compression::LZ4; - } else if (name == "LZ4") { + } else if (name == "lz4") { return Compression::LZ4_FRAME; - } else if (name == "LZ4_HADOOP") { + } else if (name == "lz4_hadoop") { return Compression::LZ4_HADOOP; - } else if (name == "ZSTD") { + } else if (name == "zstd") { return Compression::ZSTD; - } else if (name == "BZ2") { + } else if (name == "bz2") { return Compression::BZ2; } else { return Status::Invalid("Unrecognized compression type: ", name); } } +bool Codec::SupportsCompressionLevel(Compression::type codec) { + switch (codec) { + case Compression::GZIP: + case Compression::BROTLI: + case Compression::ZSTD: + case Compression::BZ2: + return true; + default: + return false; + } +} + Result> Codec::Create(Compression::type codec_type, int compression_level) { + if (!IsAvailable(codec_type)) { + if (codec_type == Compression::LZO) { + return Status::NotImplemented("LZO codec not implemented"); + } + + auto name = GetCodecAsString(codec_type); + if (name == "unknown") { + return Status::Invalid("Unrecognized codec"); + } + + return Status::NotImplemented("Support for codec '", GetCodecAsString(codec_type), + "' not built"); + } + + if (compression_level != kUseDefaultCompressionLevel && + !SupportsCompressionLevel(codec_type)) { + return Status::Invalid("Codec '", GetCodecAsString(codec_type), + "' doesn't support setting a compression level."); + } + std::unique_ptr codec; - const bool compression_level_set{compression_level != kUseDefaultCompressionLevel}; switch (codec_type) { case Compression::UNCOMPRESSED: - if (compression_level_set) { - return Status::Invalid("Compression level cannot be specified for UNCOMPRESSED."); - } return nullptr; case Compression::SNAPPY: #ifdef ARROW_WITH_SNAPPY - if (compression_level_set) { - return Status::Invalid("Snappy doesn't support setting a compression level."); - } codec = internal::MakeSnappyCodec(); - break; -#else - return Status::NotImplemented("Snappy codec support not built"); #endif + break; case Compression::GZIP: #ifdef ARROW_WITH_ZLIB codec = internal::MakeGZipCodec(compression_level); - break; -#else - return Status::NotImplemented("Gzip codec support not built"); #endif - case Compression::LZO: - if (compression_level_set) { - return Status::Invalid("LZ0 doesn't support setting a compression level."); - } - return Status::NotImplemented("LZO codec not implemented"); + break; case Compression::BROTLI: #ifdef ARROW_WITH_BROTLI codec = internal::MakeBrotliCodec(compression_level); - break; -#else - return Status::NotImplemented("Brotli codec support not built"); #endif + break; case Compression::LZ4: #ifdef ARROW_WITH_LZ4 - if (compression_level_set) { - return Status::Invalid("LZ4 doesn't support setting a compression level."); - } codec = internal::MakeLz4RawCodec(); - break; -#else - return Status::NotImplemented("LZ4 codec support not built"); #endif + break; case Compression::LZ4_FRAME: #ifdef ARROW_WITH_LZ4 - if (compression_level_set) { - return Status::Invalid("LZ4 doesn't support setting a compression level."); - } codec = internal::MakeLz4FrameCodec(); - break; -#else - return Status::NotImplemented("LZ4 codec support not built"); #endif + break; case Compression::LZ4_HADOOP: #ifdef ARROW_WITH_LZ4 - if (compression_level_set) { - return Status::Invalid("LZ4 doesn't support setting a compression level."); - } codec = internal::MakeLz4HadoopRawCodec(); - break; -#else - return Status::NotImplemented("LZ4 codec support not built"); #endif + break; case Compression::ZSTD: #ifdef ARROW_WITH_ZSTD codec = internal::MakeZSTDCodec(compression_level); - break; -#else - return Status::NotImplemented("ZSTD codec support not built"); #endif + break; case Compression::BZ2: #ifdef ARROW_WITH_BZ2 codec = internal::MakeBZ2Codec(compression_level); - break; -#else - return Status::NotImplemented("BZ2 codec support not built"); #endif + break; default: - return Status::Invalid("Unrecognized codec"); + break; } + DCHECK_NE(codec, nullptr); RETURN_NOT_OK(codec->Init()); return std::move(codec); } diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h index 551ecbe4e219..42cf9f0c7ec4 100644 --- a/cpp/src/arrow/util/compression.h +++ b/cpp/src/arrow/util/compression.h @@ -20,10 +20,10 @@ #include #include #include -#include #include "arrow/result.h" #include "arrow/status.h" +#include "arrow/util/string_view.h" #include "arrow/util/visibility.h" namespace arrow { @@ -54,7 +54,7 @@ constexpr int kUseDefaultCompressionLevel = Compression::kUseDefaultCompressionL /// class ARROW_EXPORT Compressor { public: - virtual ~Compressor(); + virtual ~Compressor() = default; struct CompressResult { int64_t bytes_read; @@ -96,7 +96,7 @@ class ARROW_EXPORT Compressor { /// class ARROW_EXPORT Decompressor { public: - virtual ~Decompressor(); + virtual ~Decompressor() = default; struct DecompressResult { // XXX is need_more_output necessary? (Brotli?) @@ -128,17 +128,17 @@ class ARROW_EXPORT Decompressor { /// \brief Compression codec class ARROW_EXPORT Codec { public: - virtual ~Codec(); + virtual ~Codec() = default; /// \brief Return special value to indicate that a codec implementation /// should use its default compression level static int UseDefaultCompressionLevel(); /// \brief Return a string name for compression type - static std::string GetCodecAsString(Compression::type t); + static util::string_view GetCodecAsString(Compression::type t); /// \brief Return compression type for name (all upper case) - static Result GetCompressionType(const std::string& name); + static Result GetCompressionType(util::string_view name); /// \brief Create a codec for the given compression algorithm static Result> Create( @@ -147,6 +147,9 @@ class ARROW_EXPORT Codec { /// \brief Return true if support for indicated codec has been enabled static bool IsAvailable(Compression::type codec); + /// \brief Return true if indicated codec supports setting a compression level + static bool SupportsCompressionLevel(Compression::type codec); + /// \brief One-shot decompression function /// /// output_buffer_len must be correct and therefore be obtained in advance. @@ -178,7 +181,14 @@ class ARROW_EXPORT Codec { /// \brief Create a streaming compressor instance virtual Result> MakeDecompressor() = 0; - virtual const char* name() const = 0; + /// \brief This Codec's compression type + virtual Compression::type compression_type() const = 0; + + /// \brief The name of this Codec's compression type + std::string name() const { return GetCodecAsString(compression_type()).to_string(); } + + /// \brief This Codec's compression level, if applicable + virtual int compression_level() const { return UseDefaultCompressionLevel(); } private: /// \brief Initializes the codec's resources. diff --git a/cpp/src/arrow/util/compression_brotli.cc b/cpp/src/arrow/util/compression_brotli.cc index ca4f523a6570..4feabe23345a 100644 --- a/cpp/src/arrow/util/compression_brotli.cc +++ b/cpp/src/arrow/util/compression_brotli.cc @@ -38,8 +38,6 @@ namespace { class BrotliDecompressor : public Decompressor { public: - BrotliDecompressor() {} - ~BrotliDecompressor() override { if (state_ != nullptr) { BrotliDecoderDestroyInstance(state_); @@ -167,7 +165,7 @@ class BrotliCompressor : public Compressor { BrotliEncoderState* state_ = nullptr; private: - int compression_level_; + const int compression_level_; }; // ---------------------------------------------------------------------- @@ -175,11 +173,10 @@ class BrotliCompressor : public Compressor { class BrotliCodec : public Codec { public: - explicit BrotliCodec(int compression_level) { - compression_level_ = compression_level == kUseDefaultCompressionLevel - ? kBrotliDefaultCompressionLevel - : compression_level; - } + explicit BrotliCodec(int compression_level) + : compression_level_(compression_level == kUseDefaultCompressionLevel + ? kBrotliDefaultCompressionLevel + : compression_level) {} Result Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) override { @@ -224,10 +221,12 @@ class BrotliCodec : public Codec { return ptr; } - const char* name() const override { return "brotli"; } + Compression::type compression_type() const override { return Compression::BROTLI; } + + int compression_level() const override { return compression_level_; } private: - int compression_level_; + const int compression_level_; }; } // namespace diff --git a/cpp/src/arrow/util/compression_bz2.cc b/cpp/src/arrow/util/compression_bz2.cc index dfb266ab8700..8a8c1cb7a45d 100644 --- a/cpp/src/arrow/util/compression_bz2.cc +++ b/cpp/src/arrow/util/compression_bz2.cc @@ -262,7 +262,9 @@ class BZ2Codec : public Codec { return ptr; } - const char* name() const override { return "bz2"; } + Compression::type compression_type() const override { return Compression::BZ2; } + + int compression_level() const override { return compression_level_; } private: int compression_level_; diff --git a/cpp/src/arrow/util/compression_lz4.cc b/cpp/src/arrow/util/compression_lz4.cc index 17a8514611e3..365cd0f523e7 100644 --- a/cpp/src/arrow/util/compression_lz4.cc +++ b/cpp/src/arrow/util/compression_lz4.cc @@ -298,10 +298,10 @@ class Lz4FrameCodec : public Codec { return ptr; } - const char* name() const override { return "lz4"; } + Compression::type compression_type() const override { return Compression::LZ4_FRAME; } protected: - LZ4F_preferences_t prefs_; + const LZ4F_preferences_t prefs_; }; // ---------------------------------------------------------------------- @@ -348,7 +348,7 @@ class Lz4Codec : public Codec { "Try using LZ4 frame format instead."); } - const char* name() const override { return "lz4_raw"; } + Compression::type compression_type() const override { return Compression::LZ4; } }; // ---------------------------------------------------------------------- @@ -407,7 +407,7 @@ class Lz4HadoopCodec : public Lz4Codec { "Try using LZ4 frame format instead."); } - const char* name() const override { return "lz4_hadoop_raw"; } + Compression::type compression_type() const override { return Compression::LZ4_HADOOP; } protected: // Offset starting at which page data can be read/written diff --git a/cpp/src/arrow/util/compression_snappy.cc b/cpp/src/arrow/util/compression_snappy.cc index 9cd06dc4f02f..9b016874b56b 100644 --- a/cpp/src/arrow/util/compression_snappy.cc +++ b/cpp/src/arrow/util/compression_snappy.cc @@ -41,8 +41,6 @@ namespace { class SnappyCodec : public Codec { public: - SnappyCodec() {} - Result Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) override { size_t decompressed_size; @@ -87,7 +85,7 @@ class SnappyCodec : public Codec { return Status::NotImplemented("Streaming decompression unsupported with Snappy"); } - const char* name() const override { return "snappy"; } + Compression::type compression_type() const override { return Compression::SNAPPY; } }; } // namespace diff --git a/cpp/src/arrow/util/compression_test.cc b/cpp/src/arrow/util/compression_test.cc index 1df52364a724..8b184f2dc41b 100644 --- a/cpp/src/arrow/util/compression_test.cc +++ b/cpp/src/arrow/util/compression_test.cc @@ -320,30 +320,30 @@ class CodecTest : public ::testing::TestWithParam { }; TEST(TestCodecMisc, GetCodecAsString) { - ASSERT_EQ("UNCOMPRESSED", Codec::GetCodecAsString(Compression::UNCOMPRESSED)); - ASSERT_EQ("SNAPPY", Codec::GetCodecAsString(Compression::SNAPPY)); - ASSERT_EQ("GZIP", Codec::GetCodecAsString(Compression::GZIP)); - ASSERT_EQ("LZO", Codec::GetCodecAsString(Compression::LZO)); - ASSERT_EQ("BROTLI", Codec::GetCodecAsString(Compression::BROTLI)); - ASSERT_EQ("LZ4_RAW", Codec::GetCodecAsString(Compression::LZ4)); - ASSERT_EQ("LZ4", Codec::GetCodecAsString(Compression::LZ4_FRAME)); - ASSERT_EQ("ZSTD", Codec::GetCodecAsString(Compression::ZSTD)); - ASSERT_EQ("BZ2", Codec::GetCodecAsString(Compression::BZ2)); + EXPECT_EQ(Codec::GetCodecAsString(Compression::UNCOMPRESSED), "uncompressed"); + EXPECT_EQ(Codec::GetCodecAsString(Compression::SNAPPY), "snappy"); + EXPECT_EQ(Codec::GetCodecAsString(Compression::GZIP), "gzip"); + EXPECT_EQ(Codec::GetCodecAsString(Compression::LZO), "lzo"); + EXPECT_EQ(Codec::GetCodecAsString(Compression::BROTLI), "brotli"); + EXPECT_EQ(Codec::GetCodecAsString(Compression::LZ4), "lz4_raw"); + EXPECT_EQ(Codec::GetCodecAsString(Compression::LZ4_FRAME), "lz4"); + EXPECT_EQ(Codec::GetCodecAsString(Compression::ZSTD), "zstd"); + EXPECT_EQ(Codec::GetCodecAsString(Compression::BZ2), "bz2"); } TEST(TestCodecMisc, GetCompressionType) { - ASSERT_OK_AND_EQ(Compression::UNCOMPRESSED, Codec::GetCompressionType("UNCOMPRESSED")); - ASSERT_OK_AND_EQ(Compression::SNAPPY, Codec::GetCompressionType("SNAPPY")); - ASSERT_OK_AND_EQ(Compression::GZIP, Codec::GetCompressionType("GZIP")); - ASSERT_OK_AND_EQ(Compression::LZO, Codec::GetCompressionType("LZO")); - ASSERT_OK_AND_EQ(Compression::BROTLI, Codec::GetCompressionType("BROTLI")); - ASSERT_OK_AND_EQ(Compression::LZ4, Codec::GetCompressionType("LZ4_RAW")); - ASSERT_OK_AND_EQ(Compression::LZ4_FRAME, Codec::GetCompressionType("LZ4")); - ASSERT_OK_AND_EQ(Compression::ZSTD, Codec::GetCompressionType("ZSTD")); - ASSERT_OK_AND_EQ(Compression::BZ2, Codec::GetCompressionType("BZ2")); + ASSERT_OK_AND_EQ(Compression::UNCOMPRESSED, Codec::GetCompressionType("uncompressed")); + ASSERT_OK_AND_EQ(Compression::SNAPPY, Codec::GetCompressionType("snappy")); + ASSERT_OK_AND_EQ(Compression::GZIP, Codec::GetCompressionType("gzip")); + ASSERT_OK_AND_EQ(Compression::LZO, Codec::GetCompressionType("lzo")); + ASSERT_OK_AND_EQ(Compression::BROTLI, Codec::GetCompressionType("brotli")); + ASSERT_OK_AND_EQ(Compression::LZ4, Codec::GetCompressionType("lz4_raw")); + ASSERT_OK_AND_EQ(Compression::LZ4_FRAME, Codec::GetCompressionType("lz4")); + ASSERT_OK_AND_EQ(Compression::ZSTD, Codec::GetCompressionType("zstd")); + ASSERT_OK_AND_EQ(Compression::BZ2, Codec::GetCompressionType("bz2")); ASSERT_RAISES(Invalid, Codec::GetCompressionType("unk")); - ASSERT_RAISES(Invalid, Codec::GetCompressionType("snappy")); + ASSERT_RAISES(Invalid, Codec::GetCompressionType("SNAPPY")); } TEST_P(CodecTest, CodecRoundtrip) { diff --git a/cpp/src/arrow/util/compression_zlib.cc b/cpp/src/arrow/util/compression_zlib.cc index 717bbc61d468..84e517e2632f 100644 --- a/cpp/src/arrow/util/compression_zlib.cc +++ b/cpp/src/arrow/util/compression_zlib.cc @@ -463,7 +463,9 @@ class GZipCodec : public Codec { return InitDecompressor(); } - const char* name() const override { return "gzip"; } + Compression::type compression_type() const override { return Compression::GZIP; } + + int compression_level() const override { return compression_level_; } private: // zlib is stateful and the z_stream state variable must be initialized diff --git a/cpp/src/arrow/util/compression_zstd.cc b/cpp/src/arrow/util/compression_zstd.cc index e7409604a4c1..382e0573b291 100644 --- a/cpp/src/arrow/util/compression_zstd.cc +++ b/cpp/src/arrow/util/compression_zstd.cc @@ -173,20 +173,19 @@ class ZSTDCompressor : public Compressor { class ZSTDCodec : public Codec { public: - explicit ZSTDCodec(int compression_level) { - compression_level_ = compression_level == kUseDefaultCompressionLevel - ? kZSTDDefaultCompressionLevel - : compression_level; - } + explicit ZSTDCodec(int compression_level) + : compression_level_(compression_level == kUseDefaultCompressionLevel + ? kZSTDDefaultCompressionLevel + : compression_level) {} Result Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) override { if (output_buffer == nullptr) { // We may pass a NULL 0-byte output buffer but some zstd versions demand // a valid pointer: https://github.com/facebook/zstd/issues/1385 - static uint8_t empty_buffer[1]; + static uint8_t empty_buffer; DCHECK_EQ(output_buffer_len, 0); - output_buffer = empty_buffer; + output_buffer = &empty_buffer; } size_t ret = ZSTD_decompress(output_buffer, static_cast(output_buffer_len), @@ -228,10 +227,12 @@ class ZSTDCodec : public Codec { return ptr; } - const char* name() const override { return "zstd"; } + Compression::type compression_type() const override { return Compression::ZSTD; } + + int compression_level() const override { return compression_level_; } private: - int compression_level_; + const int compression_level_; }; } // namespace diff --git a/cpp/src/arrow/util/string.cc b/cpp/src/arrow/util/string.cc index 625086c39b28..691f10e2793f 100644 --- a/cpp/src/arrow/util/string.cc +++ b/cpp/src/arrow/util/string.cc @@ -144,6 +144,14 @@ std::string AsciiToLower(util::string_view value) { return result; } +std::string AsciiToUpper(util::string_view value) { + // TODO: ASCII validation + std::string result = std::string(value); + std::transform(result.begin(), result.end(), result.begin(), + [](unsigned char c) { return std::toupper(c); }); + return result; +} + util::optional Replace(util::string_view s, util::string_view token, util::string_view replacement) { size_t token_start = s.find(token); diff --git a/cpp/src/arrow/util/string.h b/cpp/src/arrow/util/string.h index 56a02dfb20b6..feb75d45b354 100644 --- a/cpp/src/arrow/util/string.h +++ b/cpp/src/arrow/util/string.h @@ -57,6 +57,9 @@ bool AsciiEqualsCaseInsensitive(util::string_view left, util::string_view right) ARROW_EXPORT std::string AsciiToLower(util::string_view value); +ARROW_EXPORT +std::string AsciiToUpper(util::string_view value); + /// \brief Search for the first instance of a token and replace it or return nullopt if /// the token is not found. ARROW_EXPORT diff --git a/cpp/src/parquet/printer.cc b/cpp/src/parquet/printer.cc index a667a4857ec8..224a19dda6b6 100644 --- a/cpp/src/parquet/printer.cc +++ b/cpp/src/parquet/printer.cc @@ -25,6 +25,7 @@ #include #include "arrow/util/key_value_metadata.h" +#include "arrow/util/string.h" #include "parquet/column_scanner.h" #include "parquet/exception.h" @@ -121,7 +122,9 @@ void ParquetFilePrinter::DebugPrint(std::ostream& stream, std::list selecte stream << " Statistics Not Set"; } stream << std::endl - << " Compression: " << Codec::GetCodecAsString(column_chunk->compression()) + << " Compression: " + << arrow::internal::AsciiToUpper( + Codec::GetCodecAsString(column_chunk->compression())) << ", Encodings:"; for (auto encoding : column_chunk->encodings()) { stream << " " << EncodingToString(encoding); @@ -256,7 +259,8 @@ void ParquetFilePrinter::JSONPrint(std::ostream& stream, std::list selected stream << "\"False\","; } stream << "\n \"Compression\": \"" - << Codec::GetCodecAsString(column_chunk->compression()) + << arrow::internal::AsciiToUpper( + Codec::GetCodecAsString(column_chunk->compression())) << "\", \"Encodings\": \""; for (auto encoding : column_chunk->encodings()) { stream << EncodingToString(encoding) << " "; diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index dee022f5ca70..afcfacce29bc 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1337,7 +1337,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: c_bool write_legacy_ipc_format CMemoryPool* memory_pool CMetadataVersion metadata_version - CCompressionType compression + shared_ptr[CCodec] codec c_bool use_threads @staticmethod @@ -2055,7 +2055,7 @@ cdef extern from 'arrow/util/compression.h' namespace 'arrow' nogil: CResult[int64_t] Compress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) - const char* name() const + c_string name() const int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index a12915ec6161..3fc098478d61 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -1557,25 +1557,6 @@ cdef CCompressionType _ensure_compression(str name) except *: raise ValueError('Invalid value for compression: {!r}'.format(name)) -cdef str _compression_name(CCompressionType ctype): - if ctype == CCompressionType_GZIP: - return 'gzip' - elif ctype == CCompressionType_BROTLI: - return 'brotli' - elif ctype == CCompressionType_BZ2: - return 'bz2' - elif ctype == CCompressionType_LZ4_FRAME: - return 'lz4' - elif ctype == CCompressionType_LZ4: - return 'lz4_raw' - elif ctype == CCompressionType_SNAPPY: - return 'snappy' - elif ctype == CCompressionType_ZSTD: - return 'zstd' - else: - raise RuntimeError('Unexpected CCompressionType value') - - cdef class Codec(_Weakrefable): """ Compression codec. diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index 74a81c60ecff..5c8194d10c7c 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -94,17 +94,18 @@ cdef class IpcWriteOptions(_Weakrefable): @property def compression(self): - if self.c_options.compression == CCompressionType_UNCOMPRESSED: + if self.c_options.codec == nullptr: return None else: - return _compression_name(self.c_options.compression) + return frombytes(self.c_options.codec.get().name()) @compression.setter def compression(self, value): if value is None: - self.c_options.compression = CCompressionType_UNCOMPRESSED + self.c_options.codec.reset() else: - self.c_options.compression = _ensure_compression(value) + self.c_options.codec = shared_ptr[CCodec](GetResultValue( + CCodec.Create(_ensure_compression(value))).release()) @property def use_threads(self): From 4fa0574d931af78e66646bfff556a139a8727ae9 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Thu, 8 Oct 2020 16:05:58 -0400 Subject: [PATCH 03/14] add binding and test for compression in R --- cpp/src/arrow/dataset/file_ipc_test.cc | 4 +++ r/R/arrowExports.R | 4 +-- r/R/dataset-format.R | 1 + r/R/dataset-write.R | 3 ++- r/configure | 5 +++- r/src/Makevars.in | 1 + r/src/arrowExports.cpp | 11 ++++---- r/src/arrow_cpp11.h | 37 +++++++++++++++++++++----- r/src/dataset.cpp | 2 ++ r/tests/testthat/test-dataset.R | 23 +++++++++++++--- 10 files changed, 72 insertions(+), 19 deletions(-) diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index 218d3e1b50bd..25bbe559cf40 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -179,6 +179,10 @@ TEST_F(TestIpcFileFormat, WriteRecordBatchReaderCustomOptions) { auto ipc_options = checked_pointer_cast(format_->DefaultWriteOptions()); + if (util::Codec::IsAvailable(Compression::ZSTD)) { + EXPECT_OK_AND_ASSIGN(ipc_options->options->codec, + util::Codec::Create(Compression::ZSTD)); + } ipc_options->metadata = key_value_metadata({{"hello", "world"}}); EXPECT_OK_AND_ASSIGN(auto writer, format_->MakeWriter(sink, reader->schema(), ipc_options)); diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 29f13e853ea6..eb0dfe6d2898 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -432,8 +432,8 @@ dataset___ParquetFileWriteOptions__update <- function(options, writer_props, arr invisible(.Call(`_arrow_dataset___ParquetFileWriteOptions__update` , options, writer_props, arrow_writer_props)) } -dataset___IpcFileWriteOptions__update <- function(ipc_options, use_legacy_format, metadata_version, metadata){ - invisible(.Call(`_arrow_dataset___IpcFileWriteOptions__update` , ipc_options, use_legacy_format, metadata_version, metadata)) +dataset___IpcFileWriteOptions__update <- function(ipc_options, use_legacy_format, codec, metadata_version, metadata){ + invisible(.Call(`_arrow_dataset___IpcFileWriteOptions__update` , ipc_options, use_legacy_format, codec, metadata_version, metadata)) } dataset___IpcFileFormat__Make <- function(){ diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R index 5d6e99313c63..0883c9be840b 100644 --- a/r/R/dataset-format.R +++ b/r/R/dataset-format.R @@ -143,6 +143,7 @@ FileWriteOptions <- R6Class("FileWriteOptions", inherit = ArrowObject, args <- list(...) dataset___IpcFileWriteOptions__update(self, get_ipc_use_legacy_format(args$use_legacy_format), + args$codec, get_ipc_metadata_version(args$metadata_version), prepare_key_value_metadata(args$metadata)) } diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index abeb0ce4393d..bf9083519f9a 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -44,7 +44,8 @@ #' @param filesystem A [FileSystem] where the dataset should be written if it is a #' string file path; default is the local file system #' @param ... additional format-specific arguments. For available Parquet -#' options, see [write_parquet()]. +#' options, see [write_parquet()]. For available Feather options, see +#' [RecordBatchFileWriter$create()]. #' @return The input `dataset`, invisibly #' @export write_dataset <- function(dataset, diff --git a/r/configure b/r/configure index 21bad6b1aa2f..b009e882c6a3 100755 --- a/r/configure +++ b/r/configure @@ -201,7 +201,10 @@ else fi # Write to Makevars -sed -e "s|@cflags@|$PKG_CFLAGS|" -e "s|@libs@|$PKG_LIBS|" src/Makevars.in > src/Makevars +sed -e "s|@cflags@|$PKG_CFLAGS|" \ + -e "s|@libs@|$PKG_LIBS|" \ + -e "s|@nproc@|$(nproc)|" \ + src/Makevars.in > src/Makevars # This is removed because a (bad?) CRAN check fails when arrow.so is stripped # # Add stripping diff --git a/r/src/Makevars.in b/r/src/Makevars.in index 7e20f9561ed7..49d1bec642dd 100644 --- a/r/src/Makevars.in +++ b/r/src/Makevars.in @@ -27,3 +27,4 @@ PKG_CPPFLAGS=@cflags@ # PKG_CXXFLAGS=$(CXX_VISIBILITY) CXX_STD=CXX11 PKG_LIBS=@libs@ +MAKEFLAGS=-j@nproc@ diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 1547f1526a32..07dde99a1eab 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1697,19 +1697,20 @@ extern "C" SEXP _arrow_dataset___ParquetFileWriteOptions__update(SEXP options_se // dataset.cpp #if defined(ARROW_R_WITH_ARROW) -void dataset___IpcFileWriteOptions__update(const std::shared_ptr& ipc_options, bool use_legacy_format, arrow::ipc::MetadataVersion metadata_version, cpp11::strings metadata); -extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP metadata_version_sexp, SEXP metadata_sexp){ +void dataset___IpcFileWriteOptions__update(const std::shared_ptr& ipc_options, bool use_legacy_format, const std::shared_ptr& codec, arrow::ipc::MetadataVersion metadata_version, cpp11::strings metadata); +extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP metadata_version_sexp, SEXP metadata_sexp){ BEGIN_CPP11 arrow::r::Input&>::type ipc_options(ipc_options_sexp); arrow::r::Input::type use_legacy_format(use_legacy_format_sexp); + arrow::r::Input&>::type codec(codec_sexp); arrow::r::Input::type metadata_version(metadata_version_sexp); arrow::r::Input::type metadata(metadata_sexp); - dataset___IpcFileWriteOptions__update(ipc_options, use_legacy_format, metadata_version, metadata); + dataset___IpcFileWriteOptions__update(ipc_options, use_legacy_format, codec, metadata_version, metadata); return R_NilValue; END_CPP11 } #else -extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP metadata_version_sexp, SEXP metadata_sexp){ +extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP metadata_version_sexp, SEXP metadata_sexp){ Rf_error("Cannot call dataset___IpcFileWriteOptions__update(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif @@ -6388,7 +6389,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 3}, { "_arrow_dataset___FileWriteOptions__type_name", (DL_FUNC) &_arrow_dataset___FileWriteOptions__type_name, 1}, { "_arrow_dataset___ParquetFileWriteOptions__update", (DL_FUNC) &_arrow_dataset___ParquetFileWriteOptions__update, 3}, - { "_arrow_dataset___IpcFileWriteOptions__update", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update, 4}, + { "_arrow_dataset___IpcFileWriteOptions__update", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update, 5}, { "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC) &_arrow_dataset___IpcFileFormat__Make, 0}, { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 1}, { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 1}, diff --git a/r/src/arrow_cpp11.h b/r/src/arrow_cpp11.h index 859b0491cd0e..001a34d72e21 100644 --- a/r/src/arrow_cpp11.h +++ b/r/src/arrow_cpp11.h @@ -157,19 +157,42 @@ struct ns { static SEXP arrow; }; +// Specialize this struct to define a default value to be used when NULL is given. +// NB: can't do this for unique_ptr because that requires that T be complete. +// We could make that happen but it'd probably slow compilation. +template +struct nil_value { + static const T* get() { return nullptr; } +}; + +template +struct nil_value> { + static const std::shared_ptr* get() { + static const std::shared_ptr null; + return &null; + } +}; + template Pointer r6_to_pointer(SEXP self) { + using pointed_type = cpp11::decay_t::type>; + if (self == R_NilValue) { + if (Pointer default_value = nil_value::get()) { + return default_value; + } + } + if (!Rf_inherits(self, "ArrowObject")) { - std::string type_name = arrow::util::nameof< - cpp11::decay_t::type>>(); + std::string type_name = arrow::util::nameof(); cpp11::stop("Invalid R object for %s, must be an ArrowObject", type_name.c_str()); } - void* p = R_ExternalPtrAddr(Rf_findVarInFrame(self, arrow::r::symbols::xp)); - if (p == nullptr) { - SEXP klass = Rf_getAttrib(self, R_ClassSymbol); - cpp11::stop("Invalid <%s>, external pointer to null", CHAR(STRING_ELT(klass, 0))); + + if (void* p = R_ExternalPtrAddr(Rf_findVarInFrame(self, arrow::r::symbols::xp))) { + return reinterpret_cast(p); } - return reinterpret_cast(p); + + SEXP klass = Rf_getAttrib(self, R_ClassSymbol); + cpp11::stop("Invalid <%s>, external pointer to null", CHAR(STRING_ELT(klass, 0))); } // T is either std::shared_ptr or std::unique_ptr diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index b766d11ebaf8..29445185801c 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -213,8 +213,10 @@ void dataset___ParquetFileWriteOptions__update( // [[arrow::export]] void dataset___IpcFileWriteOptions__update( const std::shared_ptr& ipc_options, bool use_legacy_format, + const std::shared_ptr& codec, arrow::ipc::MetadataVersion metadata_version, cpp11::strings metadata) { ipc_options->options->write_legacy_ipc_format = use_legacy_format; + ipc_options->options->codec = codec; ipc_options->options->metadata_version = metadata_version; ipc_options->metadata = KeyValueMetadata__Make(metadata); } diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 1c0ff7a5054c..937778dfbb84 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -943,18 +943,35 @@ test_that("Dataset writing: from RecordBatch", { ) }) -test_that("Writing a dataset: Ipc format options", { +test_that("Writing a dataset: Ipc format options & compression", { skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651 ds <- open_dataset(csv_dir, partitioning = "part", format = "csv") dst_dir <- make_temp_dir() metadata <- c(hello = "world", eh = "!") - write_dataset(ds, dst_dir, format = "feather", - metadata = metadata) + codec <- NULL + if (codec_is_available("zstd")) { + codec <- Codec$create("zstd") + } + + write_dataset(ds, dst_dir, format = "feather", metadata = metadata, codec = codec) expect_true(dir.exists(dst_dir)) file <- ds$filesystem$OpenInputStream(paste(dst_dir, dir(dst_dir)[[1]], sep = "/")) expect_equivalent(metadata, RecordBatchFileReader$create(file)$metadata) + + new_ds <- open_dataset(dst_dir, format = "feather") + expect_equivalent( + new_ds %>% + select(string = chr, integer = int) %>% + filter(integer > 6 & integer < 11) %>% + collect() %>% + summarize(mean = mean(integer)), + df1 %>% + select(string = chr, integer = int) %>% + filter(integer > 6) %>% + summarize(mean = mean(integer)) + ) }) test_that("Writing a dataset: Parquet format options", { From a277d4c58ecd7d6b0e532236a53a23409e4759d1 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Thu, 8 Oct 2020 17:16:28 -0400 Subject: [PATCH 04/14] try to fix glib --- c_glib/arrow-glib/codec.cpp | 2 +- c_glib/arrow-glib/ipc-options.cpp | 60 +++++++++++-------------------- cpp/src/arrow/util/compression.cc | 51 ++++++++++++++------------ cpp/src/arrow/util/compression.h | 8 ++--- 4 files changed, 53 insertions(+), 68 deletions(-) diff --git a/c_glib/arrow-glib/codec.cpp b/c_glib/arrow-glib/codec.cpp index fdd61e70a177..7975088217fa 100644 --- a/c_glib/arrow-glib/codec.cpp +++ b/c_glib/arrow-glib/codec.cpp @@ -151,7 +151,7 @@ const gchar * garrow_codec_get_name(GArrowCodec *codec) { auto arrow_codec = garrow_codec_get_raw(codec); - return arrow_codec->name(); + return arrow_codec->name().c_str(); } G_END_DECLS diff --git a/c_glib/arrow-glib/ipc-options.cpp b/c_glib/arrow-glib/ipc-options.cpp index 1cddd25bb6de..fbdf17a3de20 100644 --- a/c_glib/arrow-glib/ipc-options.cpp +++ b/c_glib/arrow-glib/ipc-options.cpp @@ -249,8 +249,7 @@ enum { PROP_WRITE_OPTIONS_MAX_RECURSION_DEPTH, PROP_WRITE_OPTIONS_ALIGNMENT, PROP_WRITE_OPTIONS_WRITE_LEGACY_IPC_FORMAT, - PROP_WRITE_OPTIONS_COMPRESSION, - PROP_WRITE_OPTIONS_COMPRESSION_LEVEL, + PROP_WRITE_OPTIONS_CODEC, PROP_WRITE_OPTIONS_USE_THREADS, }; @@ -294,13 +293,14 @@ garrow_write_options_set_property(GObject *object, case PROP_WRITE_OPTIONS_WRITE_LEGACY_IPC_FORMAT: priv->options.write_legacy_ipc_format = g_value_get_boolean(value); break; - case PROP_WRITE_OPTIONS_COMPRESSION: - priv->options.compression = - static_cast(g_value_get_enum(value)); - break; - case PROP_WRITE_OPTIONS_COMPRESSION_LEVEL: - priv->options.compression_level = g_value_get_int(value); + case PROP_WRITE_OPTIONS_CODEC: { + auto codec = g_value_dup_object(value); + priv->options.codec = std::shared_ptr{ + garrow_codec_get_raw(codec), + [codec](...) { g_object_unref(codec); } + }; break; + } case PROP_WRITE_OPTIONS_USE_THREADS: priv->options.use_threads = g_value_get_boolean(value); break; @@ -331,12 +331,12 @@ garrow_write_options_get_property(GObject *object, case PROP_WRITE_OPTIONS_WRITE_LEGACY_IPC_FORMAT: g_value_set_boolean(value, priv->options.write_legacy_ipc_format); break; - case PROP_WRITE_OPTIONS_COMPRESSION: - g_value_set_enum(value, priv->options.compression); - break; - case PROP_WRITE_OPTIONS_COMPRESSION_LEVEL: - g_value_set_int(value, priv->options.compression_level); + case PROP_WRITE_OPTIONS_CODEC: { + auto arrow_type = priv->options.codec->compression_type(); + auto type = garrow_compression_type_from_raw(arrow_type); + g_value_set_object(value, garrow_codec_new(type)); break; + } case PROP_WRITE_OPTIONS_USE_THREADS: g_value_set_boolean(value, priv->options.use_threads); break; @@ -441,7 +441,7 @@ garrow_write_options_class_init(GArrowWriteOptionsClass *klass) spec); /** - * GArrowWriteOptions:compression: + * GArrowWriteOptions:codec: * * Codec to use for compressing and decompressing record batch body * buffers. This is not part of the Arrow IPC protocol and only for @@ -450,33 +450,13 @@ garrow_write_options_class_init(GArrowWriteOptionsClass *klass) * * Since: 1.0.0 */ - spec = g_param_spec_enum("compression", - "Compression", - "Codec to use for " - "compressing record batch body buffers.", - GARROW_TYPE_COMPRESSION_TYPE, - options.compression, - static_cast(G_PARAM_READWRITE)); - g_object_class_install_property(gobject_class, - PROP_WRITE_OPTIONS_COMPRESSION, - spec); - - /** - * GArrowWriteOptions:compression-level: - * - * The level for compression. - * - * Since: 1.0.0 - */ - spec = g_param_spec_int("compression-level", - "Compression level", - "The level for compression", - G_MININT, - G_MAXINT, - options.compression_level, - static_cast(G_PARAM_READWRITE)); + spec = g_param_spec_pointer("codec", + "Codec", + "Codec to use for " + "compressing record batch body buffers.", + static_cast(G_PARAM_READWRITE)); g_object_class_install_property(gobject_class, - PROP_WRITE_OPTIONS_COMPRESSION_LEVEL, + PROP_WRITE_OPTIONS_CODEC, spec); /** diff --git a/cpp/src/arrow/util/compression.cc b/cpp/src/arrow/util/compression.cc index a796c955645f..442ac0763a21 100644 --- a/cpp/src/arrow/util/compression.cc +++ b/cpp/src/arrow/util/compression.cc @@ -33,53 +33,58 @@ int Codec::UseDefaultCompressionLevel() { return kUseDefaultCompressionLevel; } Status Codec::Init() { return Status::OK(); } -util::string_view Codec::GetCodecAsString(Compression::type t) { +static const std::string uncompressed = "uncompressed", snappy = "snappy", gzip = "gzip", + lzo = "lzo", brotli = "brotli", lz4_raw = "lz4_raw", lz4 = "lz4", + lz4_hadoop = "lz4_hadoop", zstd = "zstd", bz2 = "bz2", + unknown = "unknown"; + +const std::string& Codec::GetCodecAsString(Compression::type t) { switch (t) { case Compression::UNCOMPRESSED: - return "uncompressed"; + return uncompressed; case Compression::SNAPPY: - return "snappy"; + return snappy; case Compression::GZIP: - return "gzip"; + return gzip; case Compression::LZO: - return "lzo"; + return lzo; case Compression::BROTLI: - return "brotli"; + return brotli; case Compression::LZ4: - return "lz4_raw"; + return lz4_raw; case Compression::LZ4_FRAME: - return "lz4"; + return lz4; case Compression::LZ4_HADOOP: - return "lz4_hadoop"; + return lz4_hadoop; case Compression::ZSTD: - return "zstd"; + return zstd; case Compression::BZ2: - return "bz2"; + return bz2; default: - return "unknown"; + return unknown; } } -Result Codec::GetCompressionType(util::string_view name) { - if (name == "uncompressed") { +Result Codec::GetCompressionType(const std::string& name) { + if (name == uncompressed) { return Compression::UNCOMPRESSED; - } else if (name == "gzip") { + } else if (name == gzip) { return Compression::GZIP; - } else if (name == "snappy") { + } else if (name == snappy) { return Compression::SNAPPY; - } else if (name == "lzo") { + } else if (name == lzo) { return Compression::LZO; - } else if (name == "brotli") { + } else if (name == brotli) { return Compression::BROTLI; - } else if (name == "lz4_raw") { + } else if (name == lz4_raw) { return Compression::LZ4; - } else if (name == "lz4") { + } else if (name == lz4) { return Compression::LZ4_FRAME; - } else if (name == "lz4_hadoop") { + } else if (name == lz4_hadoop) { return Compression::LZ4_HADOOP; - } else if (name == "zstd") { + } else if (name == zstd) { return Compression::ZSTD; - } else if (name == "bz2") { + } else if (name == bz2) { return Compression::BZ2; } else { return Status::Invalid("Unrecognized compression type: ", name); diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h index 42cf9f0c7ec4..d3e8e1e62f1e 100644 --- a/cpp/src/arrow/util/compression.h +++ b/cpp/src/arrow/util/compression.h @@ -20,10 +20,10 @@ #include #include #include +#include #include "arrow/result.h" #include "arrow/status.h" -#include "arrow/util/string_view.h" #include "arrow/util/visibility.h" namespace arrow { @@ -135,10 +135,10 @@ class ARROW_EXPORT Codec { static int UseDefaultCompressionLevel(); /// \brief Return a string name for compression type - static util::string_view GetCodecAsString(Compression::type t); + static const std::string& GetCodecAsString(Compression::type t); /// \brief Return compression type for name (all upper case) - static Result GetCompressionType(util::string_view name); + static Result GetCompressionType(const std::string& name); /// \brief Create a codec for the given compression algorithm static Result> Create( @@ -185,7 +185,7 @@ class ARROW_EXPORT Codec { virtual Compression::type compression_type() const = 0; /// \brief The name of this Codec's compression type - std::string name() const { return GetCodecAsString(compression_type()).to_string(); } + const std::string& name() const { return GetCodecAsString(compression_type()); } /// \brief This Codec's compression level, if applicable virtual int compression_level() const { return UseDefaultCompressionLevel(); } From a9733e465f1e545c1ccf64e678c33e9829c0c0f7 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Thu, 8 Oct 2020 20:42:02 -0400 Subject: [PATCH 05/14] lint fix --- cpp/src/arrow/util/compression.cc | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/util/compression.cc b/cpp/src/arrow/util/compression.cc index 442ac0763a21..174512b9e433 100644 --- a/cpp/src/arrow/util/compression.cc +++ b/cpp/src/arrow/util/compression.cc @@ -33,12 +33,12 @@ int Codec::UseDefaultCompressionLevel() { return kUseDefaultCompressionLevel; } Status Codec::Init() { return Status::OK(); } +const std::string& Codec::GetCodecAsString(Compression::type t) { static const std::string uncompressed = "uncompressed", snappy = "snappy", gzip = "gzip", lzo = "lzo", brotli = "brotli", lz4_raw = "lz4_raw", lz4 = "lz4", lz4_hadoop = "lz4_hadoop", zstd = "zstd", bz2 = "bz2", unknown = "unknown"; -const std::string& Codec::GetCodecAsString(Compression::type t) { switch (t) { case Compression::UNCOMPRESSED: return uncompressed; @@ -66,25 +66,25 @@ const std::string& Codec::GetCodecAsString(Compression::type t) { } Result Codec::GetCompressionType(const std::string& name) { - if (name == uncompressed) { + if (name == "uncompressed") { return Compression::UNCOMPRESSED; - } else if (name == gzip) { + } else if (name == "gzip") { return Compression::GZIP; - } else if (name == snappy) { + } else if (name == "snappy") { return Compression::SNAPPY; - } else if (name == lzo) { + } else if (name == "lzo") { return Compression::LZO; - } else if (name == brotli) { + } else if (name == "brotli") { return Compression::BROTLI; - } else if (name == lz4_raw) { + } else if (name == "lz4_raw") { return Compression::LZ4; - } else if (name == lz4) { + } else if (name == "lz4") { return Compression::LZ4_FRAME; - } else if (name == lz4_hadoop) { + } else if (name == "lz4_hadoop") { return Compression::LZ4_HADOOP; - } else if (name == zstd) { + } else if (name == "zstd") { return Compression::ZSTD; - } else if (name == bz2) { + } else if (name == "bz2") { return Compression::BZ2; } else { return Status::Invalid("Unrecognized compression type: ", name); From 618ba2af5ef1ecb3962c4a91fcaaee2c3832528e Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Fri, 9 Oct 2020 11:26:16 +0900 Subject: [PATCH 06/14] Fix format --- cpp/src/arrow/util/compression.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/util/compression.cc b/cpp/src/arrow/util/compression.cc index 174512b9e433..f9c084f6c266 100644 --- a/cpp/src/arrow/util/compression.cc +++ b/cpp/src/arrow/util/compression.cc @@ -34,10 +34,10 @@ int Codec::UseDefaultCompressionLevel() { return kUseDefaultCompressionLevel; } Status Codec::Init() { return Status::OK(); } const std::string& Codec::GetCodecAsString(Compression::type t) { -static const std::string uncompressed = "uncompressed", snappy = "snappy", gzip = "gzip", - lzo = "lzo", brotli = "brotli", lz4_raw = "lz4_raw", lz4 = "lz4", - lz4_hadoop = "lz4_hadoop", zstd = "zstd", bz2 = "bz2", - unknown = "unknown"; + static const std::string uncompressed = "uncompressed", snappy = "snappy", + gzip = "gzip", lzo = "lzo", brotli = "brotli", + lz4_raw = "lz4_raw", lz4 = "lz4", lz4_hadoop = "lz4_hadoop", + zstd = "zstd", bz2 = "bz2", unknown = "unknown"; switch (t) { case Compression::UNCOMPRESSED: From ebb41ce74a2f56ab659f390796c03140f5e2119d Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Fri, 9 Oct 2020 12:03:47 +0900 Subject: [PATCH 07/14] Adjust GLib implementation --- c_glib/arrow-glib/codec.cpp | 58 ++++++++++++++++++++++++---- c_glib/arrow-glib/codec.h | 7 ++++ c_glib/arrow-glib/codec.hpp | 4 +- c_glib/arrow-glib/input-stream.cpp | 2 +- c_glib/arrow-glib/ipc-options.cpp | 59 +++++++++++++++++++---------- c_glib/arrow-glib/output-stream.cpp | 2 +- c_glib/test/test-codec.rb | 10 +++++ c_glib/test/test-write-options.rb | 22 +++-------- 8 files changed, 116 insertions(+), 48 deletions(-) diff --git a/c_glib/arrow-glib/codec.cpp b/c_glib/arrow-glib/codec.cpp index 7975088217fa..33b3d1c9149e 100644 --- a/c_glib/arrow-glib/codec.cpp +++ b/c_glib/arrow-glib/codec.cpp @@ -38,7 +38,7 @@ G_BEGIN_DECLS */ typedef struct GArrowCodecPrivate_ { - arrow::util::Codec *codec; + std::shared_ptr codec; } GArrowCodecPrivate; enum { @@ -57,7 +57,7 @@ garrow_codec_finalize(GObject *object) { auto priv = GARROW_CODEC_GET_PRIVATE(object); - delete priv->codec; + priv->codec.~shared_ptr(); G_OBJECT_CLASS(garrow_codec_parent_class)->finalize(object); } @@ -72,7 +72,8 @@ garrow_codec_set_property(GObject *object, switch (prop_id) { case PROP_CODEC: - priv->codec = static_cast(g_value_get_pointer(value)); + priv->codec = + *static_cast *>(g_value_get_pointer(value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); @@ -96,6 +97,8 @@ garrow_codec_get_property(GObject *object, static void garrow_codec_init(GArrowCodec *object) { + auto priv = GARROW_CODEC_GET_PRIVATE(object); + new(&priv->codec) std::shared_ptr; } static void @@ -111,7 +114,7 @@ garrow_codec_class_init(GArrowCodecClass *klass) spec = g_param_spec_pointer("codec", "Codec", - "The raw arrow::util::Codec *", + "The raw std::shared_ptr *", static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_CODEC, spec); @@ -133,7 +136,9 @@ garrow_codec_new(GArrowCompressionType type, auto arrow_type = garrow_compression_type_to_raw(type); auto arrow_codec = arrow::util::Codec::Create(arrow_type); if (garrow::check(error, arrow_codec, "[codec][new]")) { - return garrow_codec_new_raw(arrow_codec.ValueOrDie().release()); + std::shared_ptr arrow_codec_shared = + std::move(*arrow_codec); + return garrow_codec_new_raw(&arrow_codec_shared); } else { return NULL; } @@ -151,9 +156,48 @@ const gchar * garrow_codec_get_name(GArrowCodec *codec) { auto arrow_codec = garrow_codec_get_raw(codec); + if (!arrow_codec) { + return NULL; + } return arrow_codec->name().c_str(); } +/** + * garrow_codec_get_compression_type: + * @codec: A #GArrowCodec. + * + * Returns: The compression type of the codec. + * + * Since: 2.0.0 + */ +GArrowCompressionType +garrow_codec_get_compression_type(GArrowCodec *codec) +{ + auto arrow_codec = garrow_codec_get_raw(codec); + if (!arrow_codec) { + return GARROW_COMPRESSION_TYPE_UNCOMPRESSED; + } + return garrow_compression_type_from_raw(arrow_codec->compression_type()); +} + +/** + * garrow_codec_get_compression_level: + * @codec: A #GArrowCodec. + * + * Returns: The compression level of the codec. + * + * Since: 2.0.0 + */ +gint +garrow_codec_get_compression_level(GArrowCodec *codec) +{ + auto arrow_codec = garrow_codec_get_raw(codec); + if (!arrow_codec) { + return arrow::util::Codec::UseDefaultCompressionLevel(); + } + return arrow_codec->compression_level(); +} + G_END_DECLS GArrowCompressionType @@ -207,7 +251,7 @@ garrow_compression_type_to_raw(GArrowCompressionType type) } GArrowCodec * -garrow_codec_new_raw(arrow::util::Codec *arrow_codec) +garrow_codec_new_raw(std::shared_ptr *arrow_codec) { auto codec = GARROW_CODEC(g_object_new(GARROW_TYPE_CODEC, "codec", arrow_codec, @@ -215,7 +259,7 @@ garrow_codec_new_raw(arrow::util::Codec *arrow_codec) return codec; } -arrow::util::Codec * +std::shared_ptr garrow_codec_get_raw(GArrowCodec *codec) { auto priv = GARROW_CODEC_GET_PRIVATE(codec); diff --git a/c_glib/arrow-glib/codec.h b/c_glib/arrow-glib/codec.h index 5feab2b7d4d1..6e177af9eede 100644 --- a/c_glib/arrow-glib/codec.h +++ b/c_glib/arrow-glib/codec.h @@ -20,6 +20,7 @@ #pragma once #include +#include G_BEGIN_DECLS @@ -63,5 +64,11 @@ GArrowCodec *garrow_codec_new(GArrowCompressionType type, GError **error); const gchar *garrow_codec_get_name(GArrowCodec *codec); +GARROW_AVAILABLE_IN_2_0 +GArrowCompressionType +garrow_codec_get_compression_type(GArrowCodec *codec); +GARROW_AVAILABLE_IN_2_0 +gint +garrow_codec_get_compression_level(GArrowCodec *codec); G_END_DECLS diff --git a/c_glib/arrow-glib/codec.hpp b/c_glib/arrow-glib/codec.hpp index 14c3ad77ccf1..f4cfaba18a00 100644 --- a/c_glib/arrow-glib/codec.hpp +++ b/c_glib/arrow-glib/codec.hpp @@ -29,6 +29,6 @@ arrow::Compression::type garrow_compression_type_to_raw(GArrowCompressionType type); GArrowCodec * -garrow_codec_new_raw(arrow::util::Codec *arrow_codec); -arrow::util::Codec * +garrow_codec_new_raw(std::shared_ptr *arrow_codec); +std::shared_ptr garrow_codec_get_raw(GArrowCodec *codec); diff --git a/c_glib/arrow-glib/input-stream.cpp b/c_glib/arrow-glib/input-stream.cpp index 3751d41ad3ab..84904b742658 100644 --- a/c_glib/arrow-glib/input-stream.cpp +++ b/c_glib/arrow-glib/input-stream.cpp @@ -1132,7 +1132,7 @@ garrow_compressed_input_stream_new(GArrowCodec *codec, GArrowInputStream *raw, GError **error) { - auto arrow_codec = garrow_codec_get_raw(codec); + auto arrow_codec = garrow_codec_get_raw(codec).get(); auto arrow_raw = garrow_input_stream_get_raw(raw); auto arrow_stream = arrow::io::CompressedInputStream::Make(arrow_codec, arrow_raw); diff --git a/c_glib/arrow-glib/ipc-options.cpp b/c_glib/arrow-glib/ipc-options.cpp index fbdf17a3de20..b9b2c4143486 100644 --- a/c_glib/arrow-glib/ipc-options.cpp +++ b/c_glib/arrow-glib/ipc-options.cpp @@ -21,6 +21,7 @@ # include #endif +#include #include #include @@ -242,6 +243,7 @@ garrow_read_options_set_included_fields(GArrowReadOptions *options, typedef struct GArrowWriteOptionsPrivate_ { arrow::ipc::IpcWriteOptions options; + GArrowCodec *codec; } GArrowWriteOptionsPrivate; enum { @@ -262,6 +264,19 @@ G_DEFINE_TYPE_WITH_PRIVATE(GArrowWriteOptions, garrow_write_options_get_instance_private( \ GARROW_WRITE_OPTIONS(obj))) +static void +garrow_write_options_dispose(GObject *object) +{ + auto priv = GARROW_WRITE_OPTIONS_GET_PRIVATE(object); + + if (priv->codec) { + g_object_unref(priv->codec); + priv->codec = NULL; + } + + G_OBJECT_CLASS(garrow_write_options_parent_class)->dispose(object); +} + static void garrow_write_options_finalize(GObject *object) { @@ -293,14 +308,13 @@ garrow_write_options_set_property(GObject *object, case PROP_WRITE_OPTIONS_WRITE_LEGACY_IPC_FORMAT: priv->options.write_legacy_ipc_format = g_value_get_boolean(value); break; - case PROP_WRITE_OPTIONS_CODEC: { - auto codec = g_value_dup_object(value); - priv->options.codec = std::shared_ptr{ - garrow_codec_get_raw(codec), - [codec](...) { g_object_unref(codec); } - }; + case PROP_WRITE_OPTIONS_CODEC: + if (priv->codec) { + g_object_unref(priv->codec); + } + priv->codec = GARROW_CODEC(g_value_dup_object(value)); + priv->options.codec = garrow_codec_get_raw(priv->codec); break; - } case PROP_WRITE_OPTIONS_USE_THREADS: priv->options.use_threads = g_value_get_boolean(value); break; @@ -331,12 +345,9 @@ garrow_write_options_get_property(GObject *object, case PROP_WRITE_OPTIONS_WRITE_LEGACY_IPC_FORMAT: g_value_set_boolean(value, priv->options.write_legacy_ipc_format); break; - case PROP_WRITE_OPTIONS_CODEC: { - auto arrow_type = priv->options.codec->compression_type(); - auto type = garrow_compression_type_from_raw(arrow_type); - g_value_set_object(value, garrow_codec_new(type)); + case PROP_WRITE_OPTIONS_CODEC: + g_value_set_object(value, priv->codec); break; - } case PROP_WRITE_OPTIONS_USE_THREADS: g_value_set_boolean(value, priv->options.use_threads); break; @@ -352,6 +363,11 @@ garrow_write_options_init(GArrowWriteOptions *object) auto priv = GARROW_WRITE_OPTIONS_GET_PRIVATE(object); new(&priv->options) arrow::ipc::IpcWriteOptions; priv->options = arrow::ipc::IpcWriteOptions::Defaults(); + if (priv->options.codec) { + priv->codec = garrow_codec_new_raw(&(priv->options.codec)); + } else { + priv->codec = NULL; + } } static void @@ -359,6 +375,7 @@ garrow_write_options_class_init(GArrowWriteOptionsClass *klass) { auto gobject_class = G_OBJECT_CLASS(klass); + gobject_class->dispose = garrow_write_options_dispose; gobject_class->finalize = garrow_write_options_finalize; gobject_class->set_property = garrow_write_options_set_property; gobject_class->get_property = garrow_write_options_get_property; @@ -445,16 +462,18 @@ garrow_write_options_class_init(GArrowWriteOptionsClass *klass) * * Codec to use for compressing and decompressing record batch body * buffers. This is not part of the Arrow IPC protocol and only for - * internal use (e.g. Feather files). May only be LZ4_FRAME and - * ZSTD. + * internal use (e.g. Feather files). * - * Since: 1.0.0 + * May only be UNCOMPRESSED, LZ4_FRAME and ZSTD. + * + * Since: 2.0.0 */ - spec = g_param_spec_pointer("codec", - "Codec", - "Codec to use for " - "compressing record batch body buffers.", - static_cast(G_PARAM_READWRITE)); + spec = g_param_spec_object("codec", + "Codec", + "Codec to use for " + "compressing record batch body buffers.", + GARROW_TYPE_CODEC, + static_cast(G_PARAM_READWRITE)); g_object_class_install_property(gobject_class, PROP_WRITE_OPTIONS_CODEC, spec); diff --git a/c_glib/arrow-glib/output-stream.cpp b/c_glib/arrow-glib/output-stream.cpp index 2c3ccafdb138..1619bac45d4d 100644 --- a/c_glib/arrow-glib/output-stream.cpp +++ b/c_glib/arrow-glib/output-stream.cpp @@ -688,7 +688,7 @@ garrow_compressed_output_stream_new(GArrowCodec *codec, GArrowOutputStream *raw, GError **error) { - auto arrow_codec = garrow_codec_get_raw(codec); + auto arrow_codec = garrow_codec_get_raw(codec).get(); auto arrow_raw = garrow_output_stream_get_raw(raw); auto arrow_stream = arrow::io::CompressedOutputStream::Make(arrow_codec, arrow_raw); diff --git a/c_glib/test/test-codec.rb b/c_glib/test/test-codec.rb index 6617815df9b8..a32ec4dc7579 100644 --- a/c_glib/test/test-codec.rb +++ b/c_glib/test/test-codec.rb @@ -20,4 +20,14 @@ def test_name codec = Arrow::Codec.new(:gzip) assert_equal("gzip", codec.name) end + + def test_compression_type + codec = Arrow::Codec.new(:gzip) + assert_equal(Arrow::CompressionType::GZIP, codec.compression_type) + end + + def test_compression_level + codec = Arrow::Codec.new(:gzip) + assert_equal(9, codec.compression_level) + end end diff --git a/c_glib/test/test-write-options.rb b/c_glib/test/test-write-options.rb index d30b78b9cdbf..c528ce673d45 100644 --- a/c_glib/test/test-write-options.rb +++ b/c_glib/test/test-write-options.rb @@ -73,27 +73,15 @@ def test_accessor end end - sub_test_case("compression") do + sub_test_case("codec") do def test_default - assert_equal(Arrow::CompressionType::UNCOMPRESSED, - @options.compression) + assert_nil(@options.codec) end def test_accessor - @options.compression = :zstd - assert_equal(Arrow::CompressionType::ZSTD, - @options.compression) - end - end - - sub_test_case("compression-level") do - def test_default - assert_equal(-(2 ** 31), @options.compression_level) - end - - def test_accessor - @options.compression_level = 8 - assert_equal(8, @options.compression_level) + @options.codec = Arrow::Codec.new(:zstd) + assert_equal("zstd", + @options.codec.name) end end From 6f50f07583393d573c632e3daa8a9c68c97c425e Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 9 Oct 2020 12:23:14 -0400 Subject: [PATCH 08/14] update R RecordBatchReader/Writer doc --- cpp/src/arrow/dataset/scanner.cc | 2 +- r/R/arrowExports.R | 4 ++-- r/R/record-batch-reader.R | 7 +++++-- r/R/record-batch-writer.R | 8 ++++++-- r/src/arrowExports.cpp | 12 +++++++----- r/src/recordbatchwriter.cpp | 9 +++++++-- r/tests/testthat/test-record-batch-reader.R | 2 +- 7 files changed, 29 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 017e545ff23d..019416041aa2 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -114,7 +114,7 @@ Status ScannerBuilder::Project(std::vector columns) { Status ScannerBuilder::Filter(std::shared_ptr filter) { RETURN_NOT_OK(schema()->CanReferenceFieldsByNames(FieldsInExpression(*filter))); - RETURN_NOT_OK(filter->Validate(*schema()).status()); + RETURN_NOT_OK(filter->Validate(*schema())); scan_options_->filter = std::move(filter); return Status::OK(); } diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index eb0dfe6d2898..8a6d8012e904 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -1424,8 +1424,8 @@ ipc___RecordBatchWriter__Close <- function(batch_writer){ invisible(.Call(`_arrow_ipc___RecordBatchWriter__Close` , batch_writer)) } -ipc___RecordBatchFileWriter__Open <- function(stream, schema, use_legacy_format, metadata_version){ - .Call(`_arrow_ipc___RecordBatchFileWriter__Open` , stream, schema, use_legacy_format, metadata_version) +ipc___RecordBatchFileWriter__Open <- function(stream, schema, use_legacy_format, codec, metadata_version, metadata){ + .Call(`_arrow_ipc___RecordBatchFileWriter__Open` , stream, schema, use_legacy_format, codec, metadata_version, metadata) } ipc___RecordBatchStreamWriter__Open <- function(stream, schema, use_legacy_format, metadata_version){ diff --git a/r/R/record-batch-reader.R b/r/R/record-batch-reader.R index 6cf53a654f83..15441e78abfa 100644 --- a/r/R/record-batch-reader.R +++ b/r/R/record-batch-reader.R @@ -79,9 +79,12 @@ #' # then pass it to a RecordBatchReader #' read_file_obj <- ReadableFile$create(tf) #' reader <- RecordBatchFileReader$create(read_file_obj) -#' # RecordBatchFileReader knows how many batches it has (StreamReader does not) +#' # RecordBatchFileReader knows how many batches it has (StreamReader does +#' # not), and has access to the custom_metadata serialized in the file's +#' footer (the stream format does not include a footer). #' reader$num_record_batches -#' # We could consume the Reader by calling $read_next_batch() until all are, +#' reader$metadata +#' # We could consume the Reader by calling $read_next_batch() until all are #' # consumed, or we can call $read_table() to pull them all into a Table #' tab <- reader$read_table() #' # Call as.data.frame to turn that Table into an R data.frame diff --git a/r/R/record-batch-writer.R b/r/R/record-batch-writer.R index 1bc928b99bb6..5b81369fab7d 100644 --- a/r/R/record-batch-writer.R +++ b/r/R/record-batch-writer.R @@ -151,7 +151,9 @@ RecordBatchFileWriter <- R6Class("RecordBatchFileWriter", inherit = RecordBatchS RecordBatchFileWriter$create <- function(sink, schema, use_legacy_format = NULL, - metadata_version = NULL) { + codec = NULL, + metadata_version = NULL, + metadata = NULL) { if (is.string(sink)) { stop( "RecordBatchFileWriter$create() requires an Arrow InputStream. ", @@ -167,7 +169,9 @@ RecordBatchFileWriter$create <- function(sink, sink, schema, get_ipc_use_legacy_format(use_legacy_format), - get_ipc_metadata_version(metadata_version) + codec, + get_ipc_metadata_version(metadata_version), + prepare_key_value_metadata(metadata) ) ) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 07dde99a1eab..eb6bf0ced655 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -5587,18 +5587,20 @@ extern "C" SEXP _arrow_ipc___RecordBatchWriter__Close(SEXP batch_writer_sexp){ // recordbatchwriter.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr ipc___RecordBatchFileWriter__Open(const std::shared_ptr& stream, const std::shared_ptr& schema, bool use_legacy_format, arrow::ipc::MetadataVersion metadata_version); -extern "C" SEXP _arrow_ipc___RecordBatchFileWriter__Open(SEXP stream_sexp, SEXP schema_sexp, SEXP use_legacy_format_sexp, SEXP metadata_version_sexp){ +std::shared_ptr ipc___RecordBatchFileWriter__Open(const std::shared_ptr& stream, const std::shared_ptr& schema, bool use_legacy_format, const std::shared_ptr& codec, arrow::ipc::MetadataVersion metadata_version, cpp11::strings metadata); +extern "C" SEXP _arrow_ipc___RecordBatchFileWriter__Open(SEXP stream_sexp, SEXP schema_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP metadata_version_sexp, SEXP metadata_sexp){ BEGIN_CPP11 arrow::r::Input&>::type stream(stream_sexp); arrow::r::Input&>::type schema(schema_sexp); arrow::r::Input::type use_legacy_format(use_legacy_format_sexp); + arrow::r::Input&>::type codec(codec_sexp); arrow::r::Input::type metadata_version(metadata_version_sexp); - return cpp11::as_sexp(ipc___RecordBatchFileWriter__Open(stream, schema, use_legacy_format, metadata_version)); + arrow::r::Input::type metadata(metadata_sexp); + return cpp11::as_sexp(ipc___RecordBatchFileWriter__Open(stream, schema, use_legacy_format, codec, metadata_version, metadata)); END_CPP11 } #else -extern "C" SEXP _arrow_ipc___RecordBatchFileWriter__Open(SEXP stream_sexp, SEXP schema_sexp, SEXP use_legacy_format_sexp, SEXP metadata_version_sexp){ +extern "C" SEXP _arrow_ipc___RecordBatchFileWriter__Open(SEXP stream_sexp, SEXP schema_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP metadata_version_sexp, SEXP metadata_sexp){ Rf_error("Cannot call ipc___RecordBatchFileWriter__Open(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif @@ -6637,7 +6639,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ipc___RecordBatchWriter__WriteRecordBatch", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__WriteRecordBatch, 2}, { "_arrow_ipc___RecordBatchWriter__WriteTable", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__WriteTable, 2}, { "_arrow_ipc___RecordBatchWriter__Close", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__Close, 1}, - { "_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 4}, + { "_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 6}, { "_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamWriter__Open, 4}, { "_arrow_Array__GetScalar", (DL_FUNC) &_arrow_Array__GetScalar, 2}, { "_arrow_Scalar__ToString", (DL_FUNC) &_arrow_Scalar__ToString, 1}, diff --git a/r/src/recordbatchwriter.cpp b/r/src/recordbatchwriter.cpp index 4714c1d104aa..92a9ad8bfeb7 100644 --- a/r/src/recordbatchwriter.cpp +++ b/r/src/recordbatchwriter.cpp @@ -17,6 +17,8 @@ #include "./arrow_types.h" +#include "./arrow_metadata.h" + #if defined(ARROW_R_WITH_ARROW) #include @@ -44,11 +46,14 @@ void ipc___RecordBatchWriter__Close( std::shared_ptr ipc___RecordBatchFileWriter__Open( const std::shared_ptr& stream, const std::shared_ptr& schema, bool use_legacy_format, - arrow::ipc::MetadataVersion metadata_version) { + const std::shared_ptr& codec, + arrow::ipc::MetadataVersion metadata_version, cpp11::strings metadata) { auto options = arrow::ipc::IpcWriteOptions::Defaults(); options.write_legacy_ipc_format = use_legacy_format; + options.codec = codec; options.metadata_version = metadata_version; - return ValueOrStop(arrow::ipc::MakeFileWriter(stream, schema, options)); + return ValueOrStop(arrow::ipc::MakeFileWriter(stream, schema, options, + KeyValueMetadata__Make(metadata))); } // [[arrow::export]] diff --git a/r/tests/testthat/test-record-batch-reader.R b/r/tests/testthat/test-record-batch-reader.R index e03664e82f76..d9c34068425d 100644 --- a/r/tests/testthat/test-record-batch-reader.R +++ b/r/tests/testthat/test-record-batch-reader.R @@ -82,7 +82,7 @@ test_that("MetadataFormat", { Sys.setenv(ARROW_PRE_0_15_IPC_FORMAT = 1) expect_identical(get_ipc_metadata_version(NULL), 3L) Sys.setenv(ARROW_PRE_0_15_IPC_FORMAT = "") - + expect_identical(get_ipc_metadata_version(NULL), 4L) Sys.setenv(ARROW_PRE_1_0_METADATA_VERSION = 1) expect_identical(get_ipc_metadata_version(NULL), 3L) From 1d2f3461fc4be1c140402aa1b4f41a20d9ff3b69 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 9 Oct 2020 16:44:43 -0400 Subject: [PATCH 09/14] revert exposure of footer metadata --- r/R/arrowExports.R | 12 ++++------ r/R/dataset-format.R | 3 +-- r/src/arrowExports.cpp | 39 +++++++++------------------------ r/src/arrow_metadata.h | 38 -------------------------------- r/src/dataset.cpp | 5 +---- r/src/recordbatchreader.cpp | 8 ------- r/src/recordbatchwriter.cpp | 9 ++------ r/tests/testthat/test-dataset.R | 6 +---- 8 files changed, 19 insertions(+), 101 deletions(-) delete mode 100644 r/src/arrow_metadata.h diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 8a6d8012e904..6a7f26aa57cf 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -432,8 +432,8 @@ dataset___ParquetFileWriteOptions__update <- function(options, writer_props, arr invisible(.Call(`_arrow_dataset___ParquetFileWriteOptions__update` , options, writer_props, arrow_writer_props)) } -dataset___IpcFileWriteOptions__update <- function(ipc_options, use_legacy_format, codec, metadata_version, metadata){ - invisible(.Call(`_arrow_dataset___IpcFileWriteOptions__update` , ipc_options, use_legacy_format, codec, metadata_version, metadata)) +dataset___IpcFileWriteOptions__update <- function(ipc_options, use_legacy_format, codec, metadata_version){ + invisible(.Call(`_arrow_dataset___IpcFileWriteOptions__update` , ipc_options, use_legacy_format, codec, metadata_version)) } dataset___IpcFileFormat__Make <- function(){ @@ -1388,10 +1388,6 @@ ipc___RecordBatchFileReader__num_record_batches <- function(reader){ .Call(`_arrow_ipc___RecordBatchFileReader__num_record_batches` , reader) } -ipc___RecordBatchFileReader__metadata <- function(reader){ - .Call(`_arrow_ipc___RecordBatchFileReader__metadata` , reader) -} - ipc___RecordBatchFileReader__ReadRecordBatch <- function(reader, i){ .Call(`_arrow_ipc___RecordBatchFileReader__ReadRecordBatch` , reader, i) } @@ -1424,8 +1420,8 @@ ipc___RecordBatchWriter__Close <- function(batch_writer){ invisible(.Call(`_arrow_ipc___RecordBatchWriter__Close` , batch_writer)) } -ipc___RecordBatchFileWriter__Open <- function(stream, schema, use_legacy_format, codec, metadata_version, metadata){ - .Call(`_arrow_ipc___RecordBatchFileWriter__Open` , stream, schema, use_legacy_format, codec, metadata_version, metadata) +ipc___RecordBatchFileWriter__Open <- function(stream, schema, use_legacy_format, metadata_version){ + .Call(`_arrow_ipc___RecordBatchFileWriter__Open` , stream, schema, use_legacy_format, metadata_version) } ipc___RecordBatchStreamWriter__Open <- function(stream, schema, use_legacy_format, metadata_version){ diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R index 0883c9be840b..c53a6821dc75 100644 --- a/r/R/dataset-format.R +++ b/r/R/dataset-format.R @@ -144,8 +144,7 @@ FileWriteOptions <- R6Class("FileWriteOptions", inherit = ArrowObject, dataset___IpcFileWriteOptions__update(self, get_ipc_use_legacy_format(args$use_legacy_format), args$codec, - get_ipc_metadata_version(args$metadata_version), - prepare_key_value_metadata(args$metadata)) + get_ipc_metadata_version(args$metadata_version)) } invisible(self) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index eb6bf0ced655..d301b0dc191f 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1697,20 +1697,19 @@ extern "C" SEXP _arrow_dataset___ParquetFileWriteOptions__update(SEXP options_se // dataset.cpp #if defined(ARROW_R_WITH_ARROW) -void dataset___IpcFileWriteOptions__update(const std::shared_ptr& ipc_options, bool use_legacy_format, const std::shared_ptr& codec, arrow::ipc::MetadataVersion metadata_version, cpp11::strings metadata); -extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP metadata_version_sexp, SEXP metadata_sexp){ +void dataset___IpcFileWriteOptions__update(const std::shared_ptr& ipc_options, bool use_legacy_format, const std::shared_ptr& codec, arrow::ipc::MetadataVersion metadata_version); +extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP metadata_version_sexp){ BEGIN_CPP11 arrow::r::Input&>::type ipc_options(ipc_options_sexp); arrow::r::Input::type use_legacy_format(use_legacy_format_sexp); arrow::r::Input&>::type codec(codec_sexp); arrow::r::Input::type metadata_version(metadata_version_sexp); - arrow::r::Input::type metadata(metadata_sexp); - dataset___IpcFileWriteOptions__update(ipc_options, use_legacy_format, codec, metadata_version, metadata); + dataset___IpcFileWriteOptions__update(ipc_options, use_legacy_format, codec, metadata_version); return R_NilValue; END_CPP11 } #else -extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP metadata_version_sexp, SEXP metadata_sexp){ +extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP metadata_version_sexp){ Rf_error("Cannot call dataset___IpcFileWriteOptions__update(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif @@ -5444,21 +5443,6 @@ extern "C" SEXP _arrow_ipc___RecordBatchFileReader__num_record_batches(SEXP read } #endif -// recordbatchreader.cpp -#if defined(ARROW_R_WITH_ARROW) -cpp11::strings ipc___RecordBatchFileReader__metadata(const std::shared_ptr& reader); -extern "C" SEXP _arrow_ipc___RecordBatchFileReader__metadata(SEXP reader_sexp){ -BEGIN_CPP11 - arrow::r::Input&>::type reader(reader_sexp); - return cpp11::as_sexp(ipc___RecordBatchFileReader__metadata(reader)); -END_CPP11 -} -#else -extern "C" SEXP _arrow_ipc___RecordBatchFileReader__metadata(SEXP reader_sexp){ - Rf_error("Cannot call ipc___RecordBatchFileReader__metadata(). Please use arrow::install_arrow() to install required runtime libraries. "); -} -#endif - // recordbatchreader.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr ipc___RecordBatchFileReader__ReadRecordBatch(const std::shared_ptr& reader, int i); @@ -5587,20 +5571,18 @@ extern "C" SEXP _arrow_ipc___RecordBatchWriter__Close(SEXP batch_writer_sexp){ // recordbatchwriter.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr ipc___RecordBatchFileWriter__Open(const std::shared_ptr& stream, const std::shared_ptr& schema, bool use_legacy_format, const std::shared_ptr& codec, arrow::ipc::MetadataVersion metadata_version, cpp11::strings metadata); -extern "C" SEXP _arrow_ipc___RecordBatchFileWriter__Open(SEXP stream_sexp, SEXP schema_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP metadata_version_sexp, SEXP metadata_sexp){ +std::shared_ptr ipc___RecordBatchFileWriter__Open(const std::shared_ptr& stream, const std::shared_ptr& schema, bool use_legacy_format, arrow::ipc::MetadataVersion metadata_version); +extern "C" SEXP _arrow_ipc___RecordBatchFileWriter__Open(SEXP stream_sexp, SEXP schema_sexp, SEXP use_legacy_format_sexp, SEXP metadata_version_sexp){ BEGIN_CPP11 arrow::r::Input&>::type stream(stream_sexp); arrow::r::Input&>::type schema(schema_sexp); arrow::r::Input::type use_legacy_format(use_legacy_format_sexp); - arrow::r::Input&>::type codec(codec_sexp); arrow::r::Input::type metadata_version(metadata_version_sexp); - arrow::r::Input::type metadata(metadata_sexp); - return cpp11::as_sexp(ipc___RecordBatchFileWriter__Open(stream, schema, use_legacy_format, codec, metadata_version, metadata)); + return cpp11::as_sexp(ipc___RecordBatchFileWriter__Open(stream, schema, use_legacy_format, metadata_version)); END_CPP11 } #else -extern "C" SEXP _arrow_ipc___RecordBatchFileWriter__Open(SEXP stream_sexp, SEXP schema_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP metadata_version_sexp, SEXP metadata_sexp){ +extern "C" SEXP _arrow_ipc___RecordBatchFileWriter__Open(SEXP stream_sexp, SEXP schema_sexp, SEXP use_legacy_format_sexp, SEXP metadata_version_sexp){ Rf_error("Cannot call ipc___RecordBatchFileWriter__Open(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif @@ -6391,7 +6373,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 3}, { "_arrow_dataset___FileWriteOptions__type_name", (DL_FUNC) &_arrow_dataset___FileWriteOptions__type_name, 1}, { "_arrow_dataset___ParquetFileWriteOptions__update", (DL_FUNC) &_arrow_dataset___ParquetFileWriteOptions__update, 3}, - { "_arrow_dataset___IpcFileWriteOptions__update", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update, 5}, + { "_arrow_dataset___IpcFileWriteOptions__update", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update, 4}, { "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC) &_arrow_dataset___IpcFileFormat__Make, 0}, { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 1}, { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 1}, @@ -6630,7 +6612,6 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ipc___RecordBatchStreamReader__batches", (DL_FUNC) &_arrow_ipc___RecordBatchStreamReader__batches, 1}, { "_arrow_ipc___RecordBatchFileReader__schema", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__schema, 1}, { "_arrow_ipc___RecordBatchFileReader__num_record_batches", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__num_record_batches, 1}, - { "_arrow_ipc___RecordBatchFileReader__metadata", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__metadata, 1}, { "_arrow_ipc___RecordBatchFileReader__ReadRecordBatch", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__ReadRecordBatch, 2}, { "_arrow_ipc___RecordBatchFileReader__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__Open, 1}, { "_arrow_Table__from_RecordBatchFileReader", (DL_FUNC) &_arrow_Table__from_RecordBatchFileReader, 1}, @@ -6639,7 +6620,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ipc___RecordBatchWriter__WriteRecordBatch", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__WriteRecordBatch, 2}, { "_arrow_ipc___RecordBatchWriter__WriteTable", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__WriteTable, 2}, { "_arrow_ipc___RecordBatchWriter__Close", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__Close, 1}, - { "_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 6}, + { "_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 4}, { "_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamWriter__Open, 4}, { "_arrow_Array__GetScalar", (DL_FUNC) &_arrow_Array__GetScalar, 2}, { "_arrow_Scalar__ToString", (DL_FUNC) &_arrow_Scalar__ToString, 1}, diff --git a/r/src/arrow_metadata.h b/r/src/arrow_metadata.h deleted file mode 100644 index 7bb2155ccaa2..000000000000 --- a/r/src/arrow_metadata.h +++ /dev/null @@ -1,38 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "./arrow_types.h" - -#if defined(ARROW_R_WITH_ARROW) -#include - -inline std::shared_ptr KeyValueMetadata__Make( - const cpp11::strings& metadata) { - // TODO(bkietz): Make this a custom conversion after - // https://github.com/r-lib/cpp11/pull/104 - return std::make_shared( - cpp11::as_cpp>(metadata.names()), - cpp11::as_cpp>(metadata)); -} - -inline cpp11::writable::strings KeyValueMetadata__as_vector( - const std::shared_ptr& metadata) { - cpp11::writable::strings metadata_vector(metadata->values()); - metadata_vector.names() = cpp11::writable::strings(metadata->keys()); - return metadata_vector; -} -#endif diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 29445185801c..5dca19a1107c 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -17,8 +17,6 @@ #include "./arrow_types.h" -#include "./arrow_metadata.h" - #if defined(ARROW_R_WITH_ARROW) #include @@ -214,11 +212,10 @@ void dataset___ParquetFileWriteOptions__update( void dataset___IpcFileWriteOptions__update( const std::shared_ptr& ipc_options, bool use_legacy_format, const std::shared_ptr& codec, - arrow::ipc::MetadataVersion metadata_version, cpp11::strings metadata) { + arrow::ipc::MetadataVersion metadata_version) { ipc_options->options->write_legacy_ipc_format = use_legacy_format; ipc_options->options->codec = codec; ipc_options->options->metadata_version = metadata_version; - ipc_options->metadata = KeyValueMetadata__Make(metadata); } // [[arrow::export]] diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp index 433b779883b7..7ecb42002a90 100644 --- a/r/src/recordbatchreader.cpp +++ b/r/src/recordbatchreader.cpp @@ -21,8 +21,6 @@ #include #include -#include "./arrow_metadata.h" - // [[arrow::export]] std::shared_ptr RecordBatchReader__schema( const std::shared_ptr& reader) { @@ -76,12 +74,6 @@ int ipc___RecordBatchFileReader__num_record_batches( return reader->num_record_batches(); } -// [[arrow::export]] -cpp11::strings ipc___RecordBatchFileReader__metadata( - const std::shared_ptr& reader) { - return KeyValueMetadata__as_vector(reader->metadata()); -} - // [[arrow::export]] std::shared_ptr ipc___RecordBatchFileReader__ReadRecordBatch( const std::shared_ptr& reader, int i) { diff --git a/r/src/recordbatchwriter.cpp b/r/src/recordbatchwriter.cpp index 92a9ad8bfeb7..4714c1d104aa 100644 --- a/r/src/recordbatchwriter.cpp +++ b/r/src/recordbatchwriter.cpp @@ -17,8 +17,6 @@ #include "./arrow_types.h" -#include "./arrow_metadata.h" - #if defined(ARROW_R_WITH_ARROW) #include @@ -46,14 +44,11 @@ void ipc___RecordBatchWriter__Close( std::shared_ptr ipc___RecordBatchFileWriter__Open( const std::shared_ptr& stream, const std::shared_ptr& schema, bool use_legacy_format, - const std::shared_ptr& codec, - arrow::ipc::MetadataVersion metadata_version, cpp11::strings metadata) { + arrow::ipc::MetadataVersion metadata_version) { auto options = arrow::ipc::IpcWriteOptions::Defaults(); options.write_legacy_ipc_format = use_legacy_format; - options.codec = codec; options.metadata_version = metadata_version; - return ValueOrStop(arrow::ipc::MakeFileWriter(stream, schema, options, - KeyValueMetadata__Make(metadata))); + return ValueOrStop(arrow::ipc::MakeFileWriter(stream, schema, options)); } // [[arrow::export]] diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 937778dfbb84..31e76981c614 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -948,18 +948,14 @@ test_that("Writing a dataset: Ipc format options & compression", { ds <- open_dataset(csv_dir, partitioning = "part", format = "csv") dst_dir <- make_temp_dir() - metadata <- c(hello = "world", eh = "!") codec <- NULL if (codec_is_available("zstd")) { codec <- Codec$create("zstd") } - write_dataset(ds, dst_dir, format = "feather", metadata = metadata, codec = codec) + write_dataset(ds, dst_dir, format = "feather", codec = codec) expect_true(dir.exists(dst_dir)) - file <- ds$filesystem$OpenInputStream(paste(dst_dir, dir(dst_dir)[[1]], sep = "/")) - expect_equivalent(metadata, RecordBatchFileReader$create(file)$metadata) - new_ds <- open_dataset(dst_dir, format = "feather") expect_equivalent( new_ds %>% From 1c69a62bf6ca22750ce81cb2a78c87636e962a6b Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 9 Oct 2020 16:46:36 -0400 Subject: [PATCH 10/14] revert configure/Makevars change --- r/configure | 5 +---- r/src/Makevars.in | 1 - 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/r/configure b/r/configure index b009e882c6a3..21bad6b1aa2f 100755 --- a/r/configure +++ b/r/configure @@ -201,10 +201,7 @@ else fi # Write to Makevars -sed -e "s|@cflags@|$PKG_CFLAGS|" \ - -e "s|@libs@|$PKG_LIBS|" \ - -e "s|@nproc@|$(nproc)|" \ - src/Makevars.in > src/Makevars +sed -e "s|@cflags@|$PKG_CFLAGS|" -e "s|@libs@|$PKG_LIBS|" src/Makevars.in > src/Makevars # This is removed because a (bad?) CRAN check fails when arrow.so is stripped # # Add stripping diff --git a/r/src/Makevars.in b/r/src/Makevars.in index 49d1bec642dd..7e20f9561ed7 100644 --- a/r/src/Makevars.in +++ b/r/src/Makevars.in @@ -27,4 +27,3 @@ PKG_CPPFLAGS=@cflags@ # PKG_CXXFLAGS=$(CXX_VISIBILITY) CXX_STD=CXX11 PKG_LIBS=@libs@ -MAKEFLAGS=-j@nproc@ From 5176fec3ff4046260f8659a52c2b30944194ccbb Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 9 Oct 2020 16:51:15 -0400 Subject: [PATCH 11/14] break dataset___IpcFileWriteOptions__update into two overloads --- r/R/arrowExports.R | 8 ++++++-- r/R/dataset-format.R | 14 ++++++++++---- r/src/arrowExports.cpp | 31 +++++++++++++++++++++++++------ r/src/dataset.cpp | 10 +++++++++- 4 files changed, 50 insertions(+), 13 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 6a7f26aa57cf..6210dd11943c 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -432,8 +432,12 @@ dataset___ParquetFileWriteOptions__update <- function(options, writer_props, arr invisible(.Call(`_arrow_dataset___ParquetFileWriteOptions__update` , options, writer_props, arrow_writer_props)) } -dataset___IpcFileWriteOptions__update <- function(ipc_options, use_legacy_format, codec, metadata_version){ - invisible(.Call(`_arrow_dataset___IpcFileWriteOptions__update` , ipc_options, use_legacy_format, codec, metadata_version)) +dataset___IpcFileWriteOptions__update2 <- function(ipc_options, use_legacy_format, codec, metadata_version){ + invisible(.Call(`_arrow_dataset___IpcFileWriteOptions__update2` , ipc_options, use_legacy_format, codec, metadata_version)) +} + +dataset___IpcFileWriteOptions__update1 <- function(ipc_options, use_legacy_format, metadata_version){ + invisible(.Call(`_arrow_dataset___IpcFileWriteOptions__update1` , ipc_options, use_legacy_format, metadata_version)) } dataset___IpcFileFormat__Make <- function(){ diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R index c53a6821dc75..8300e415e2c9 100644 --- a/r/R/dataset-format.R +++ b/r/R/dataset-format.R @@ -141,10 +141,16 @@ FileWriteOptions <- R6Class("FileWriteOptions", inherit = ArrowObject, ParquetArrowWriterProperties$create(...)) } else if (self$type == "ipc") { args <- list(...) - dataset___IpcFileWriteOptions__update(self, - get_ipc_use_legacy_format(args$use_legacy_format), - args$codec, - get_ipc_metadata_version(args$metadata_version)) + if (is.null(args$codec)) { + dataset___IpcFileWriteOptions__update1(self, + get_ipc_use_legacy_format(args$use_legacy_format), + get_ipc_metadata_version(args$metadata_version)) + } else { + dataset___IpcFileWriteOptions__update2(self, + get_ipc_use_legacy_format(args$use_legacy_format), + args$codec, + get_ipc_metadata_version(args$metadata_version)) + } } invisible(self) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index d301b0dc191f..c5983128efab 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1697,20 +1697,38 @@ extern "C" SEXP _arrow_dataset___ParquetFileWriteOptions__update(SEXP options_se // dataset.cpp #if defined(ARROW_R_WITH_ARROW) -void dataset___IpcFileWriteOptions__update(const std::shared_ptr& ipc_options, bool use_legacy_format, const std::shared_ptr& codec, arrow::ipc::MetadataVersion metadata_version); -extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP metadata_version_sexp){ +void dataset___IpcFileWriteOptions__update2(const std::shared_ptr& ipc_options, bool use_legacy_format, const std::shared_ptr& codec, arrow::ipc::MetadataVersion metadata_version); +extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update2(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP metadata_version_sexp){ BEGIN_CPP11 arrow::r::Input&>::type ipc_options(ipc_options_sexp); arrow::r::Input::type use_legacy_format(use_legacy_format_sexp); arrow::r::Input&>::type codec(codec_sexp); arrow::r::Input::type metadata_version(metadata_version_sexp); - dataset___IpcFileWriteOptions__update(ipc_options, use_legacy_format, codec, metadata_version); + dataset___IpcFileWriteOptions__update2(ipc_options, use_legacy_format, codec, metadata_version); return R_NilValue; END_CPP11 } #else -extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP metadata_version_sexp){ - Rf_error("Cannot call dataset___IpcFileWriteOptions__update(). Please use arrow::install_arrow() to install required runtime libraries. "); +extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update2(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP metadata_version_sexp){ + Rf_error("Cannot call dataset___IpcFileWriteOptions__update2(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +void dataset___IpcFileWriteOptions__update1(const std::shared_ptr& ipc_options, bool use_legacy_format, arrow::ipc::MetadataVersion metadata_version); +extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update1(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP metadata_version_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type ipc_options(ipc_options_sexp); + arrow::r::Input::type use_legacy_format(use_legacy_format_sexp); + arrow::r::Input::type metadata_version(metadata_version_sexp); + dataset___IpcFileWriteOptions__update1(ipc_options, use_legacy_format, metadata_version); + return R_NilValue; +END_CPP11 +} +#else +extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update1(SEXP ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP metadata_version_sexp){ + Rf_error("Cannot call dataset___IpcFileWriteOptions__update1(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif @@ -6373,7 +6391,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 3}, { "_arrow_dataset___FileWriteOptions__type_name", (DL_FUNC) &_arrow_dataset___FileWriteOptions__type_name, 1}, { "_arrow_dataset___ParquetFileWriteOptions__update", (DL_FUNC) &_arrow_dataset___ParquetFileWriteOptions__update, 3}, - { "_arrow_dataset___IpcFileWriteOptions__update", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update, 4}, + { "_arrow_dataset___IpcFileWriteOptions__update2", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update2, 4}, + { "_arrow_dataset___IpcFileWriteOptions__update1", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update1, 3}, { "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC) &_arrow_dataset___IpcFileFormat__Make, 0}, { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 1}, { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 1}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 5dca19a1107c..8a88ab02f87f 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -209,7 +209,7 @@ void dataset___ParquetFileWriteOptions__update( } // [[arrow::export]] -void dataset___IpcFileWriteOptions__update( +void dataset___IpcFileWriteOptions__update2( const std::shared_ptr& ipc_options, bool use_legacy_format, const std::shared_ptr& codec, arrow::ipc::MetadataVersion metadata_version) { @@ -218,6 +218,14 @@ void dataset___IpcFileWriteOptions__update( ipc_options->options->metadata_version = metadata_version; } +// [[arrow::export]] +void dataset___IpcFileWriteOptions__update1( + const std::shared_ptr& ipc_options, bool use_legacy_format, + arrow::ipc::MetadataVersion metadata_version) { + ipc_options->options->write_legacy_ipc_format = use_legacy_format; + ipc_options->options->metadata_version = metadata_version; +} + // [[arrow::export]] std::shared_ptr dataset___IpcFileFormat__Make() { return std::make_shared(); From 25adaf45b3b47a12b4384b7aa8b35883e071bc72 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 9 Oct 2020 17:05:36 -0400 Subject: [PATCH 12/14] revert nil_value --- r/man/RecordBatchReader.Rd | 7 +++++-- r/man/write_dataset.Rd | 3 ++- r/src/arrow_cpp11.h | 37 +++++++------------------------------ 3 files changed, 14 insertions(+), 33 deletions(-) diff --git a/r/man/RecordBatchReader.Rd b/r/man/RecordBatchReader.Rd index 6b204b0aae28..7adabf1594d1 100644 --- a/r/man/RecordBatchReader.Rd +++ b/r/man/RecordBatchReader.Rd @@ -66,9 +66,12 @@ file_obj$close() # then pass it to a RecordBatchReader read_file_obj <- ReadableFile$create(tf) reader <- RecordBatchFileReader$create(read_file_obj) -# RecordBatchFileReader knows how many batches it has (StreamReader does not) +# RecordBatchFileReader knows how many batches it has (StreamReader does +# not), and has access to the custom_metadata serialized in the file's +footer (the stream format does not include a footer). reader$num_record_batches -# We could consume the Reader by calling $read_next_batch() until all are, +reader$metadata +# We could consume the Reader by calling $read_next_batch() until all are # consumed, or we can call $read_table() to pull them all into a Table tab <- reader$read_table() # Call as.data.frame to turn that Table into an R data.frame diff --git a/r/man/write_dataset.Rd b/r/man/write_dataset.Rd index e12c4287266d..6412426c966a 100644 --- a/r/man/write_dataset.Rd +++ b/r/man/write_dataset.Rd @@ -46,7 +46,8 @@ will yield \verb{"part-0.feather", ...}.} string file path; default is the local file system} \item{...}{additional format-specific arguments. For available Parquet -options, see \code{\link[=write_parquet]{write_parquet()}}.} +options, see \code{\link[=write_parquet]{write_parquet()}}. For available Feather options, see +\code{\link[=RecordBatchFileWriter$create]{RecordBatchFileWriter$create()}}.} } \value{ The input \code{dataset}, invisibly diff --git a/r/src/arrow_cpp11.h b/r/src/arrow_cpp11.h index 001a34d72e21..859b0491cd0e 100644 --- a/r/src/arrow_cpp11.h +++ b/r/src/arrow_cpp11.h @@ -157,42 +157,19 @@ struct ns { static SEXP arrow; }; -// Specialize this struct to define a default value to be used when NULL is given. -// NB: can't do this for unique_ptr because that requires that T be complete. -// We could make that happen but it'd probably slow compilation. -template -struct nil_value { - static const T* get() { return nullptr; } -}; - -template -struct nil_value> { - static const std::shared_ptr* get() { - static const std::shared_ptr null; - return &null; - } -}; - template Pointer r6_to_pointer(SEXP self) { - using pointed_type = cpp11::decay_t::type>; - if (self == R_NilValue) { - if (Pointer default_value = nil_value::get()) { - return default_value; - } - } - if (!Rf_inherits(self, "ArrowObject")) { - std::string type_name = arrow::util::nameof(); + std::string type_name = arrow::util::nameof< + cpp11::decay_t::type>>(); cpp11::stop("Invalid R object for %s, must be an ArrowObject", type_name.c_str()); } - - if (void* p = R_ExternalPtrAddr(Rf_findVarInFrame(self, arrow::r::symbols::xp))) { - return reinterpret_cast(p); + void* p = R_ExternalPtrAddr(Rf_findVarInFrame(self, arrow::r::symbols::xp)); + if (p == nullptr) { + SEXP klass = Rf_getAttrib(self, R_ClassSymbol); + cpp11::stop("Invalid <%s>, external pointer to null", CHAR(STRING_ELT(klass, 0))); } - - SEXP klass = Rf_getAttrib(self, R_ClassSymbol); - cpp11::stop("Invalid <%s>, external pointer to null", CHAR(STRING_ELT(klass, 0))); + return reinterpret_cast(p); } // T is either std::shared_ptr or std::unique_ptr From b95b42caeb0378ebd6220b66ecd08206a367cdb2 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 9 Oct 2020 17:06:37 -0400 Subject: [PATCH 13/14] document available options for IPC writing --- r/R/dataset-write.R | 12 ++++++++++-- r/R/record-batch-reader.R | 8 ++------ r/R/record-batch-writer.R | 8 +++----- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index bf9083519f9a..cd30c3d38d08 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -44,8 +44,16 @@ #' @param filesystem A [FileSystem] where the dataset should be written if it is a #' string file path; default is the local file system #' @param ... additional format-specific arguments. For available Parquet -#' options, see [write_parquet()]. For available Feather options, see -#' [RecordBatchFileWriter$create()]. +#' options, see [write_parquet()]. The available Feather options are +#' - `use_legacy_format` logical: write data formatted so that Arrow libraries +#' versions 0.14 and lower can read it. Default is `FALSE`. You can also +#' enable this by setting the environment variable `ARROW_PRE_0_15_IPC_FORMAT=1`. +#' - `metadata_version`: A string like "V5" or the equivalent integer indicating +#' the Arrow IPC MetadataVersion. Default (NULL) will use the latest version, +#' unless the environment variable `ARROW_PRE_1_0_METADATA_VERSION=1`, in +#' which case it will be V4. +#' - `codec`: A [Codec] which will be used to compress body buffers of written +#' files. Default (NULL) will not compress body buffers. #' @return The input `dataset`, invisibly #' @export write_dataset <- function(dataset, diff --git a/r/R/record-batch-reader.R b/r/R/record-batch-reader.R index 15441e78abfa..85ce839d0ce5 100644 --- a/r/R/record-batch-reader.R +++ b/r/R/record-batch-reader.R @@ -79,12 +79,9 @@ #' # then pass it to a RecordBatchReader #' read_file_obj <- ReadableFile$create(tf) #' reader <- RecordBatchFileReader$create(read_file_obj) -#' # RecordBatchFileReader knows how many batches it has (StreamReader does -#' # not), and has access to the custom_metadata serialized in the file's -#' footer (the stream format does not include a footer). +#' # RecordBatchFileReader knows how many batches it has (StreamReader does not) #' reader$num_record_batches -#' reader$metadata -#' # We could consume the Reader by calling $read_next_batch() until all are +#' # We could consume the Reader by calling $read_next_batch() until all are, #' # consumed, or we can call $read_table() to pull them all into a Table #' tab <- reader$read_table() #' # Call as.data.frame to turn that Table into an R data.frame @@ -143,7 +140,6 @@ RecordBatchFileReader <- R6Class("RecordBatchFileReader", inherit = ArrowObject, ), active = list( num_record_batches = function() ipc___RecordBatchFileReader__num_record_batches(self), - metadata = function() ipc___RecordBatchFileReader__metadata(self), schema = function() shared_ptr(Schema, ipc___RecordBatchFileReader__schema(self)) ) ) diff --git a/r/R/record-batch-writer.R b/r/R/record-batch-writer.R index 5b81369fab7d..1d969172d45a 100644 --- a/r/R/record-batch-writer.R +++ b/r/R/record-batch-writer.R @@ -39,7 +39,7 @@ #' - `sink` An `OutputStream` #' - `schema` A [Schema] for the data to be written #' - `use_legacy_format` logical: write data formatted so that Arrow libraries -#' versions 0.14 and lower can read it? Default is `FALSE`. You can also +#' versions 0.14 and lower can read it. Default is `FALSE`. You can also #' enable this by setting the environment variable `ARROW_PRE_0_15_IPC_FORMAT=1`. #' - `metadata_version`: A string like "V5" or the equivalent integer indicating #' the Arrow IPC MetadataVersion. Default (NULL) will use the latest version, @@ -152,8 +152,7 @@ RecordBatchFileWriter$create <- function(sink, schema, use_legacy_format = NULL, codec = NULL, - metadata_version = NULL, - metadata = NULL) { + metadata_version = NULL) { if (is.string(sink)) { stop( "RecordBatchFileWriter$create() requires an Arrow InputStream. ", @@ -170,8 +169,7 @@ RecordBatchFileWriter$create <- function(sink, schema, get_ipc_use_legacy_format(use_legacy_format), codec, - get_ipc_metadata_version(metadata_version), - prepare_key_value_metadata(metadata) + get_ipc_metadata_version(metadata_version) ) ) } From 6467dc194a1b6418f07321637c47f0ce3b13f453 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Fri, 9 Oct 2020 15:45:58 -0700 Subject: [PATCH 14/14] Fix tests and rerender docs --- r/R/record-batch-writer.R | 2 -- r/man/RecordBatchReader.Rd | 7 ++----- r/man/RecordBatchWriter.Rd | 2 +- r/man/write_dataset.Rd | 14 ++++++++++++-- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/r/R/record-batch-writer.R b/r/R/record-batch-writer.R index 1d969172d45a..8b51603110d9 100644 --- a/r/R/record-batch-writer.R +++ b/r/R/record-batch-writer.R @@ -151,7 +151,6 @@ RecordBatchFileWriter <- R6Class("RecordBatchFileWriter", inherit = RecordBatchS RecordBatchFileWriter$create <- function(sink, schema, use_legacy_format = NULL, - codec = NULL, metadata_version = NULL) { if (is.string(sink)) { stop( @@ -168,7 +167,6 @@ RecordBatchFileWriter$create <- function(sink, sink, schema, get_ipc_use_legacy_format(use_legacy_format), - codec, get_ipc_metadata_version(metadata_version) ) ) diff --git a/r/man/RecordBatchReader.Rd b/r/man/RecordBatchReader.Rd index 7adabf1594d1..6b204b0aae28 100644 --- a/r/man/RecordBatchReader.Rd +++ b/r/man/RecordBatchReader.Rd @@ -66,12 +66,9 @@ file_obj$close() # then pass it to a RecordBatchReader read_file_obj <- ReadableFile$create(tf) reader <- RecordBatchFileReader$create(read_file_obj) -# RecordBatchFileReader knows how many batches it has (StreamReader does -# not), and has access to the custom_metadata serialized in the file's -footer (the stream format does not include a footer). +# RecordBatchFileReader knows how many batches it has (StreamReader does not) reader$num_record_batches -reader$metadata -# We could consume the Reader by calling $read_next_batch() until all are +# We could consume the Reader by calling $read_next_batch() until all are, # consumed, or we can call $read_table() to pull them all into a Table tab <- reader$read_table() # Call as.data.frame to turn that Table into an R data.frame diff --git a/r/man/RecordBatchWriter.Rd b/r/man/RecordBatchWriter.Rd index 0422da6caaee..038653b9e241 100644 --- a/r/man/RecordBatchWriter.Rd +++ b/r/man/RecordBatchWriter.Rd @@ -23,7 +23,7 @@ factory methods instantiate the object and take the following arguments: \item \code{sink} An \code{OutputStream} \item \code{schema} A \link{Schema} for the data to be written \item \code{use_legacy_format} logical: write data formatted so that Arrow libraries -versions 0.14 and lower can read it? Default is \code{FALSE}. You can also +versions 0.14 and lower can read it. Default is \code{FALSE}. You can also enable this by setting the environment variable \code{ARROW_PRE_0_15_IPC_FORMAT=1}. \item \code{metadata_version}: A string like "V5" or the equivalent integer indicating the Arrow IPC MetadataVersion. Default (NULL) will use the latest version, diff --git a/r/man/write_dataset.Rd b/r/man/write_dataset.Rd index 6412426c966a..4f1c89ffeccf 100644 --- a/r/man/write_dataset.Rd +++ b/r/man/write_dataset.Rd @@ -46,8 +46,18 @@ will yield \verb{"part-0.feather", ...}.} string file path; default is the local file system} \item{...}{additional format-specific arguments. For available Parquet -options, see \code{\link[=write_parquet]{write_parquet()}}. For available Feather options, see -\code{\link[=RecordBatchFileWriter$create]{RecordBatchFileWriter$create()}}.} +options, see \code{\link[=write_parquet]{write_parquet()}}. The available Feather options are +\itemize{ +\item \code{use_legacy_format} logical: write data formatted so that Arrow libraries +versions 0.14 and lower can read it. Default is \code{FALSE}. You can also +enable this by setting the environment variable \code{ARROW_PRE_0_15_IPC_FORMAT=1}. +\item \code{metadata_version}: A string like "V5" or the equivalent integer indicating +the Arrow IPC MetadataVersion. Default (NULL) will use the latest version, +unless the environment variable \code{ARROW_PRE_1_0_METADATA_VERSION=1}, in +which case it will be V4. +\item \code{codec}: A \link{Codec} which will be used to compress body buffers of written +files. Default (NULL) will not compress body buffers. +}} } \value{ The input \code{dataset}, invisibly